### Pipelines - Broadcasting

To work along with this notebook you'll need the included data file, `car_data.csv`.

We are going to want to split the data into different files based on some criteria of our choosing.

For example, we may want to create a file that contains all pink cars, another file that contains all Mercedes brands, and another that contains only blue cars of a specific model year, etc.

Let's first write a generator to parse the data for us:

In [1]:
import csv

def data_reader(f_name):
    f = open(f_name)
    try:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        reader = csv.reader(f, dialect=dialect)
        yield from reader
    finally:
        f.close()

In [2]:
for row in data_reader('car_data.csv'):
    print(row)

['car_make', 'car_model', 'model_year', 'vin', 'color']
['Mitsubishi', 'Outlander', '2011', 'WBAEV33453K542952', 'Indigo']
['Pontiac', 'Sunfire', '2001', 'SCFAD06D99G713780', 'Maroon']
['Pontiac', 'Grand Am', '1994', 'WBA6B8C59ED852919', 'Red']
['Chrysler', 'Town & Country', '2008', '1GD422CGXEF757763', 'Violet']
['Isuzu', 'Trooper', '1999', '3GTU2YEJ6CG150061', 'Red']
['Acura', 'NSX', '2002', 'WA1DGBFP6DA002021', 'Orange']
['Oldsmobile', 'Cutlass Supreme', '1997', '5TFCW5F1XBX807662', 'Red']
['Ford', 'F-Series', '1995', 'WAUDF98E55A083878', 'Red']
['Saab', '900', '1998', '1C3ADEAZXDV389424', 'Indigo']
['Land Rover', 'LR3', '2008', 'WBANE53507B229964', 'Pink']
['Audi', 'V8', '1994', 'WVWAN7ANXDE961674', 'Mauv']
['Isuzu', 'Stylus', '1993', '1VWAH7A39EC443135', 'Purple']
['Dodge', 'Ramcharger', '1993', 'WAUSH78E07A410079', 'Yellow']
['Aston Martin', 'DBS', '2008', 'WAUFMBFC7EN209268', 'Pink']
['Ford', 'Fusion', '2010', '2HNYD2H83DH124143', 'Violet']
['GMC', 'Yukon XL 2500', '2000', 'WBAS

Let's create our indices, output headers and data converters for this file - basically these are our configuration parameters for this data file.

In [3]:
input_file = 'car_data.csv'

idx_make = 0
idx_model = 1
idx_year = 2
idx_vin = 3
idx_color = 4

headers = ('make', 'model', 'year', 'vin', 'color')

converters = (str, str, int, str, str)

Now let's create a generator that will return the parsed data:

In [4]:
def data_parser():
    data = data_reader(input_file)
    next(data)  # skip header row
    for row in data:
        parsed_row = [converter(item)
                      for converter, item in zip(converters, row)]
        yield parsed_row

Let's just make sure this is working properly:

In [5]:
data = data_parser()
for _ in range(5):
    print(next(data))

['Mitsubishi', 'Outlander', 2011, 'WBAEV33453K542952', 'Indigo']
['Pontiac', 'Sunfire', 2001, 'SCFAD06D99G713780', 'Maroon']
['Pontiac', 'Grand Am', 1994, 'WBA6B8C59ED852919', 'Red']
['Chrysler', 'Town & Country', 2008, '1GD422CGXEF757763', 'Violet']
['Isuzu', 'Trooper', 1999, '3GTU2YEJ6CG150061', 'Red']


Let's also write our coroutine decorator that will auto prime coroutines:

In [6]:
def coroutine(fn):
    def inner(*args, **kwargs):
        g = fn(*args, **kwargs)
        next(g)
        return g
    return inner

Next we are going to write a coroutine that will create and write data to a file. We'll need to pass the output file name to the coroutine, and the coroutine will assume that the data is being passed in as a list (basically whatever is coming back from `data_parser`). To make it easier, we'll also pass it the column headers so we can include that in the output file.

In [7]:
@coroutine
def save_data(f_name, headers):
    with open(f_name, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(headers)
        while True:
            data_row = yield
            writer.writerow(data_row)

Now we're going to create a filter coroutine that will have the following parameters:
* filter function (predicate)
* next coroutine to send to in the pipeline

That filter coroutine will receive a data row and test if the predicate applied to that data row is True. If it is, it will send the row to the next stage (target) of the pipeline, otherwise it just ignores the data row.

In [8]:
@coroutine
def filter_data(filter_predicate, target):
    while True:
        data_row = yield
        if filter_predicate(data_row):
            target.send(data_row)

Next, let's write our broadcaster. It just sends received data to all the generators specified in the `targets` argument:

In [9]:
@coroutine
def broadcast(targets):
    while True:
        data_row = yield
        for target in targets:
            target.send(data_row)

OK, we're now ready to put all this together.

```
     data                      
      |                      |--> filter --> save
      v                      |
process_data --> broadcast --|--> filter --> save
                             |
                             |--> filter --> save
```

In [10]:
def process_data():
    data = data_parser()
    
    out_pink_cars = save_data('pink_cars.csv', headers)
    out_ford_green = save_data('ford_green.csv', headers)
    out_older = save_data('older.csv', headers)
    
    filter_pink_cars = filter_data(lambda d: d[idx_color].lower() == 'pink',
                                   out_pink_cars)
    
    def pred_ford_green(data_row):
        return (data_row[idx_make].lower() == 'ford' 
                and data_row[idx_color].lower() == 'green')
    
    filter_ford_green = filter_data(pred_ford_green, out_ford_green)
    filter_older = filter_data(lambda d: d[idx_year] <= 2010, out_older)
    filters = (filter_pink_cars, filter_ford_green, filter_older)
    broadcaster = broadcast(filters)
    
    for row in data:
        broadcaster.send(row)
    
    print('Finished processing.')

And now let's call it and see what happens!

In [11]:
process_data()

Finished processing.


Let's see what those files contain:

In [12]:
def print_file_data():
    for file_name in ('pink_cars.csv', 'ford_green.csv', 'older.csv'):
        print(f'***** {file_name} *****')
        for row in data_reader(file_name):
            print(row)
        print('\n\n\n')

print_file_data()

***** pink_cars.csv *****
['make', 'model', 'year', 'vin', 'color']
['Land Rover', 'LR3', '2008', 'WBANE53507B229964', 'Pink']
['Aston Martin', 'DBS', '2008', 'WAUFMBFC7EN209268', 'Pink']
['Ford', 'F150', '1993', 'JN8AF5MR5BT143315', 'Pink']
['Nissan', 'Murano', '2012', '1D7RB1CP2AS005941', 'Pink']
['Chevrolet', 'HHR Panel', '2006', 'WA1VMAFE4BD281230', 'Pink']
['Chrysler', 'New Yorker', '1996', '19UUA8F31DA965112', 'Pink']
['Dodge', 'Viper', '2005', 'WA1VGBFPXEA735467', 'Pink']
['Maserati', 'Karif', '1990', '3LN6L2LU5FR389539', 'Pink']
['Chrysler', 'Voyager', '2001', 'SCFHDDAJ7BA859249', 'Pink']
['Ford', 'Expedition', '2002', 'JTJHY7AX8D4370558', 'Pink']
['Suzuki', 'Grand Vitara', '2010', '1GYUCHEF9AR508723', 'Pink']
['GMC', '2500', '1995', 'JH4DC53004S939059', 'Pink']
['GMC', 'Yukon XL 1500', '2009', '3D4PG4FB1BT559916', 'Pink']
['Dodge', 'Avenger', '2009', 'SCBCR7ZA0AC733739', 'Pink']
['GMC', 'Jimmy', '2001', '2C4RRGAGXDR589104', 'Pink']
['Subaru', 'Legacy', '1998', 'WA1CM74L39D1881

There's one more bit of cleanup I want to do though.

I would prefer to have the definition of my pipeline not also be the consumer of the data. Just trying to keep functionality more separated.

So let's rewrite change `process_data` to just be another step in the pipeline.

In [13]:
@coroutine
def pipeline_coro():
    out_pink_cars = save_data('pink_cars.csv', headers)
    out_ford_green = save_data('ford_green.csv', headers)
    out_older = save_data('older.csv', headers)
    
    filter_pink_cars = filter_data(lambda d: d[idx_color].lower() == 'pink',
                                  out_pink_cars)
    
    def pred_ford_green(data_row):
        return (data_row[idx_make].lower() == 'ford'
               and data_row[idx_color].lower() == 'green')
    filter_ford_green = filter_data(pred_ford_green, out_ford_green)
    filter_older = filter_data(lambda d: d[idx_year] <= 2010, out_older)
    
    filters = (filter_pink_cars, filter_ford_green, filter_older)
    
    broadcaster = broadcast(filters)
    
    while True:
        data_row = yield
        broadcaster.send(data_row)    

And now we can use the pipeline this way:

In [14]:
pipe = pipeline_coro()
data = data_parser()
for row in data:
    pipe.send(row)

OK, so now let's make sure the correct data is in those output files:

In [15]:
print_file_data()

***** pink_cars.csv *****


Error: Could not determine delimiter

Uh-oh, we get an exception. Why did the parser fail to figure out the dialect of the file?

Let's see what's in the file:

In [16]:
with open('pink_cars.csv') as f:
    for row in f:
        print('row', row)

The file is empty!!

The issue is that our files have not been closed yet!

The pipeline coroutine is still active, so nothing go released or closed - including the endpoints of our pipeline.

Fortunately this is easy to do - we just need to close the pipeline.

In [17]:
pipe.close()

And now we should be able to read those files:

In [18]:
print_file_data()

***** pink_cars.csv *****
['make', 'model', 'year', 'vin', 'color']
['Land Rover', 'LR3', '2008', 'WBANE53507B229964', 'Pink']
['Aston Martin', 'DBS', '2008', 'WAUFMBFC7EN209268', 'Pink']
['Ford', 'F150', '1993', 'JN8AF5MR5BT143315', 'Pink']
['Nissan', 'Murano', '2012', '1D7RB1CP2AS005941', 'Pink']
['Chevrolet', 'HHR Panel', '2006', 'WA1VMAFE4BD281230', 'Pink']
['Chrysler', 'New Yorker', '1996', '19UUA8F31DA965112', 'Pink']
['Dodge', 'Viper', '2005', 'WA1VGBFPXEA735467', 'Pink']
['Maserati', 'Karif', '1990', '3LN6L2LU5FR389539', 'Pink']
['Chrysler', 'Voyager', '2001', 'SCFHDDAJ7BA859249', 'Pink']
['Ford', 'Expedition', '2002', 'JTJHY7AX8D4370558', 'Pink']
['Suzuki', 'Grand Vitara', '2010', '1GYUCHEF9AR508723', 'Pink']
['GMC', '2500', '1995', 'JH4DC53004S939059', 'Pink']
['GMC', 'Yukon XL 1500', '2009', '3D4PG4FB1BT559916', 'Pink']
['Dodge', 'Avenger', '2009', 'SCBCR7ZA0AC733739', 'Pink']
['GMC', 'Jimmy', '2001', '2C4RRGAGXDR589104', 'Pink']
['Subaru', 'Legacy', '1998', 'WA1CM74L39D1881

Perfect, so just to recap, here's how we would use our pipeline:

In [19]:
pipe = pipeline_coro()
data = data_parser()
for row in data:
    pipe.send(row)
pipe.close()

Hmm... Notice how we open the pipeline, and then close it?
Does this remind you of a context manager?

Let's write a context manager for our pipeline - that way we'll never forget to close it!

In [20]:
from contextlib import contextmanager

In [21]:
@contextmanager
def pipeline():
    p = pipeline_coro()
    try:
        yield p
    finally:
        p.close()

And now we can use it this way:

In [22]:
with pipeline() as pipe:
    data = data_parser()
    for row in data:
        pipe.send(row)

And again, let's just make sure the files are OK:

In [23]:
print_file_data()

***** pink_cars.csv *****
['make', 'model', 'year', 'vin', 'color']
['Land Rover', 'LR3', '2008', 'WBANE53507B229964', 'Pink']
['Aston Martin', 'DBS', '2008', 'WAUFMBFC7EN209268', 'Pink']
['Ford', 'F150', '1993', 'JN8AF5MR5BT143315', 'Pink']
['Nissan', 'Murano', '2012', '1D7RB1CP2AS005941', 'Pink']
['Chevrolet', 'HHR Panel', '2006', 'WA1VMAFE4BD281230', 'Pink']
['Chrysler', 'New Yorker', '1996', '19UUA8F31DA965112', 'Pink']
['Dodge', 'Viper', '2005', 'WA1VGBFPXEA735467', 'Pink']
['Maserati', 'Karif', '1990', '3LN6L2LU5FR389539', 'Pink']
['Chrysler', 'Voyager', '2001', 'SCFHDDAJ7BA859249', 'Pink']
['Ford', 'Expedition', '2002', 'JTJHY7AX8D4370558', 'Pink']
['Suzuki', 'Grand Vitara', '2010', '1GYUCHEF9AR508723', 'Pink']
['GMC', '2500', '1995', 'JH4DC53004S939059', 'Pink']
['GMC', 'Yukon XL 1500', '2009', '3D4PG4FB1BT559916', 'Pink']
['Dodge', 'Avenger', '2009', 'SCBCR7ZA0AC733739', 'Pink']
['GMC', 'Jimmy', '2001', '2C4RRGAGXDR589104', 'Pink']
['Subaru', 'Legacy', '1998', 'WA1CM74L39D1881

Perfect!