In [10]:
# Create a utility to parse the data from a given .csv file.
import csv

def data_reader(f_name):
    f = open(f_name)
    try:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        next(f)
        yield from csv.reader(f, dialect=dialect)
    finally:
        f.close()

In [11]:
# Create our coroutine decorator to prime coroutines.
def coroutine(fn):
    def inner(*args, **kwargs):
        g = fn(*args, **kwargs)
        next(g)
        return g
    return inner

In [12]:
# Create a coroutine to write received rows to a new file.
@coroutine
def save_data(f_name):
    with open(f_name, 'w', newline='') as f:
        writer = csv.writer(f)
        #writer.writerow(headers)
        while True:
            data_row = yield
            writer.writerow(data_row)

In [13]:
@coroutine
def filter_data(filter_pred, target):
    while True:
        row = yield
        if filter_pred(row):
            target.send(row)

In [14]:
@coroutine
def pipeline_coro(out_file, name_filters):
    save = save_data(out_file)
    
    target = save
    for name_filter in name_filters:
        target = filter_data(lambda d, v=name_filter: v in d[0], target)
        
    while True:
        received = yield
        target.send(received)

In [15]:
from contextlib import contextmanager

@contextmanager
def pipeline(out_file, name_filters):
    p = pipeline_coro(out_file, name_filters)
    try:
        yield p
    finally:
        p.close()

In [16]:
with pipeline('out.csv', ('Chevrolet', 'Carlo', 'Landau')) as p:
    for row in data_reader('cars.csv'):
        p.send(row)

In [18]:
with open('out.csv') as f:
    reader = csv.reader(f)
    for row in reader:
        print(row)

['Chevrolet Monte Carlo Landau', '15.5', '8', '350.0', '170.0', '4165.', '11.4', '77', 'US']
['Chevrolet Monte Carlo Landau', '19.2', '8', '305.0', '145.0', '3425.', '13.2', '78', 'US']
