In [1]:
import csv

def data_reader(f_name):
    f = open(f_name)
    try:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        reader = csv.reader(f,dialect = dialect)
        yield from reader
    finally:
        f.close()

In [2]:
for row in data_reader('cars.csv'):
    print(row)

['Car', 'MPG', 'Cylinders', 'Displacement', 'Horsepower', 'Weight', 'Acceleration', 'Model', 'Origin']
['Chevrolet Chevelle Malibu', '18.0', '8', '307.0', '130.0', '3504.', '12.0', '70', 'US']
['Buick Skylark 320', '15.0', '8', '350.0', '165.0', '3693.', '11.5', '70', 'US']
['Plymouth Satellite', '18.0', '8', '318.0', '150.0', '3436.', '11.0', '70', 'US']
['AMC Rebel SST', '16.0', '8', '304.0', '150.0', '3433.', '12.0', '70', 'US']
['Ford Torino', '17.0', '8', '302.0', '140.0', '3449.', '10.5', '70', 'US']
['Ford Galaxie 500', '15.0', '8', '429.0', '198.0', '4341.', '10.0', '70', 'US']
['Chevrolet Impala', '14.0', '8', '454.0', '220.0', '4354.', '9.0', '70', 'US']
['Plymouth Fury iii', '14.0', '8', '440.0', '215.0', '4312.', '8.5', '70', 'US']
['Pontiac Catalina', '14.0', '8', '455.0', '225.0', '4425.', '10.0', '70', 'US']
['AMC Ambassador DPL', '15.0', '8', '390.0', '190.0', '3850.', '8.5', '70', 'US']
['Citroen DS-21 Pallas', '0', '4', '133.0', '115.0', '3090.', '17.5', '70', 'Europe

In [8]:
input_file = 'car_data.csv'

idx_make = 0
idx_model = 1
idx_year = 2
idx_vin = 3
idx_color = 4

headers = ('make','model','year','vin','color')

converters = (str,str,int,str,str)


In [9]:
def data_parser():
    data = data_reader(input_file)
    next(data)
    for row in data:
        parsed_row = [converter(item)
                      for converter,item in zip(converters,row)]
        yield parsed_row

In [10]:
data  = data_parser()
for _ in range(5):
    print(next(data))

['Mitsubishi', 'Outlander', 2011, 'WBAEV33453K542952', 'Indigo']
['Pontiac', 'Sunfire', 2001, 'SCFAD06D99G713780', 'Maroon']
['Pontiac', 'Grand Am', 1994, 'WBA6B8C59ED852919', 'Red']
['Chrysler', 'Town & Country', 2008, '1GD422CGXEF757763', 'Violet']
['Isuzu', 'Trooper', 1999, '3GTU2YEJ6CG150061', 'Red']


In [11]:
def coroutine(fn):
    def inner(*arg,**kwargs):
        g = fn(*arg,**kwargs)
        next(g)
        return g
    return inner

In [14]:
@coroutine

def save_data(f_name,headers):
    with open(f_name,'w',newline='') as f:
        write = csv.writer(f)
        write.writerow(headers)
        while True:
            data_row = yield
            write.writerow(data_row)

In [15]:
@coroutine
def filter_data(filter_predicate,target):
    while True:
        data_row = yield
        if filter_predicate(data_row):
            target.send(data_row)

In [16]:
@coroutine

def broadcast(targets):
    while True:
        data_row = yield
        for target in targets:
            target.send(data_row)

In [20]:
def process_data():
    out_pink_cars = save_data('pink_cars.csv',headers)
    out_ford_green = save_data('ford_green.csv',headers)
    out_older = save_data('older.csv',headers)
    
    filter_pink_cars = filter_data(lambda d :d [idx_color].lower()=='pink',out_pink_cars)
    
    def pred_ford_green(d):
        return (d[idx_make].lower() == 'ford' and d[idx_color].lower()=='green')
    filter_ford_green = filter_data(pred_ford_green,out_ford_green)
    
    filter_old = filter_data(lambda d :d[idx_year]<=2010, out_older)
    
    filters = (filter_pink_cars,filter_ford_green,filter_old)
    
    broadcaster = broadcast(filters)
    
    for row in data_parser():
        broadcaster.send(row)
    
    print('Finished processing')

In [21]:
process_data()

Finished processing


In [25]:
def print_file_data():
    for file_name in ('pink_cars.csv','ford_green.csv','older.csv'):
        print(f'*****{file_name}********')
        for row in data_reader(file_name):
            print(row)
        print('\n\n\n')

In [24]:
@coroutine

def pipeline_coro():
    out_pink_cars = save_data('pink_cars.csv',headers)
    out_ford_green = save_data('ford_green.csv',headers)
    out_older = save_data('older.csv',headers)
    
    filter_pink_cars = filter_data(lambda d :d [idx_color].lower()=='pink',out_pink_cars)
    
    def pred_ford_green(d):
        return (d[idx_make].lower() == 'ford' and d[idx_color].lower()=='green')
    
    filter_ford_green = filter_data(pred_ford_green,out_ford_green)
    filter_old = filter_data(lambda d :d[idx_year]<=2010, out_older)
    filters = (filter_pink_cars,filter_ford_green,filter_old)
    
    broadcaster = broadcast(filters)
    
    while True:
        row = yield
        broadcaster.send(row)
    
    

In [26]:
pipe = pipeline_coro()
data = data_parser()
for row in data:
    pipe.send(row)

In [27]:
print_file_data()

*****pink_cars.csv********


Error: Could not determine delimiter

In [28]:
with open('pink_cars.csv') as f:
    for row in f:
        print('row:',row)

In [29]:
pipe.close()

In [30]:
print_file_data()

*****pink_cars.csv********
['make', 'model', 'year', 'vin', 'color']
['Land Rover', 'LR3', '2008', 'WBANE53507B229964', 'Pink']
['Aston Martin', 'DBS', '2008', 'WAUFMBFC7EN209268', 'Pink']
['Ford', 'F150', '1993', 'JN8AF5MR5BT143315', 'Pink']
['Nissan', 'Murano', '2012', '1D7RB1CP2AS005941', 'Pink']
['Chevrolet', 'HHR Panel', '2006', 'WA1VMAFE4BD281230', 'Pink']
['Chrysler', 'New Yorker', '1996', '19UUA8F31DA965112', 'Pink']
['Dodge', 'Viper', '2005', 'WA1VGBFPXEA735467', 'Pink']
['Maserati', 'Karif', '1990', '3LN6L2LU5FR389539', 'Pink']
['Chrysler', 'Voyager', '2001', 'SCFHDDAJ7BA859249', 'Pink']
['Ford', 'Expedition', '2002', 'JTJHY7AX8D4370558', 'Pink']
['Suzuki', 'Grand Vitara', '2010', '1GYUCHEF9AR508723', 'Pink']
['GMC', '2500', '1995', 'JH4DC53004S939059', 'Pink']
['GMC', 'Yukon XL 1500', '2009', '3D4PG4FB1BT559916', 'Pink']
['Dodge', 'Avenger', '2009', 'SCBCR7ZA0AC733739', 'Pink']
['GMC', 'Jimmy', '2001', '2C4RRGAGXDR589104', 'Pink']
['Subaru', 'Legacy', '1998', 'WA1CM74L39D188

In [31]:
pipe = pipeline_coro()
data = data_parser()
for row in data:
    pipe.send(row)
pipe.close()

In [32]:
from contextlib import contextmanager

In [33]:
@contextmanager
def pipeline():
    p = pipeline_coro()
    try:
        yield p
    finally:
        p.close()

In [34]:
with pipeline()as pipe:
    data = data_parser()
    for row in data:
        pipe.send(row)

In [35]:
print_file_data()

*****pink_cars.csv********
['make', 'model', 'year', 'vin', 'color']
['Land Rover', 'LR3', '2008', 'WBANE53507B229964', 'Pink']
['Aston Martin', 'DBS', '2008', 'WAUFMBFC7EN209268', 'Pink']
['Ford', 'F150', '1993', 'JN8AF5MR5BT143315', 'Pink']
['Nissan', 'Murano', '2012', '1D7RB1CP2AS005941', 'Pink']
['Chevrolet', 'HHR Panel', '2006', 'WA1VMAFE4BD281230', 'Pink']
['Chrysler', 'New Yorker', '1996', '19UUA8F31DA965112', 'Pink']
['Dodge', 'Viper', '2005', 'WA1VGBFPXEA735467', 'Pink']
['Maserati', 'Karif', '1990', '3LN6L2LU5FR389539', 'Pink']
['Chrysler', 'Voyager', '2001', 'SCFHDDAJ7BA859249', 'Pink']
['Ford', 'Expedition', '2002', 'JTJHY7AX8D4370558', 'Pink']
['Suzuki', 'Grand Vitara', '2010', '1GYUCHEF9AR508723', 'Pink']
['GMC', '2500', '1995', 'JH4DC53004S939059', 'Pink']
['GMC', 'Yukon XL 1500', '2009', '3D4PG4FB1BT559916', 'Pink']
['Dodge', 'Avenger', '2009', 'SCBCR7ZA0AC733739', 'Pink']
['GMC', 'Jimmy', '2001', '2C4RRGAGXDR589104', 'Pink']
['Subaru', 'Legacy', '1998', 'WA1CM74L39D188