In [None]:
import csv
from contextlib import contextmanager

In [None]:
def data_reader(filename):
    with open(filename, 'r') as file:
        dialect = csv.Sniffer().sniff(file.read(2000))
        file.seek(0)
        reader = csv.reader(file, dialect=dialect)
        yield from reader
        
def get_header(filename):
    data = data_reader(filename)
    return next(data)

def data_parser(filename):
    
    mapping_dict = {
        'Car' : str,
        'MPG' : float,
        'Cylinders' : int,
        'Displacement' : float,
        'Horsepower' : float,
        'Weight' : float,
        'Acceleration' : float,
        'Model' : int,
        'Origin' : str    
    }
    
    data = data_reader(filename)
    next(data)
    headers = get_header(filename)
    for row in data:
        yield [mapping_dict[header](datum) for header, datum in zip(headers, row)]
    
def coroutine(gen_fn):
    def inner(*args, **kwargs):
        gen = gen_fn(*args, **kwargs)
        next(gen)
        return gen
    return inner
    
@coroutine
def save_data(filename, header):
    with open(filename, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(header)
        while True:
            row = yield
            writer.writerow(row)
            
@coroutine
def filter_data(filter_predicate, target):
    while True:
        row = yield
        if filter_predicate(row):
            target.send(row)
            
@coroutine
def broadcaster(*targets):
    while True:
        row = yield
        for target in targets:
            target.send(row)
            

In [None]:
filename = 'cars.csv'
header = get_header(filename)

In [None]:
header

In [None]:
for row in data_parser(filename):
    print(row)

In [None]:
@coroutine
def pipeline_coroutine(file_to_process, output_filename, *filter_predicates):
    headers = get_header(file_to_process)
    output_data = save_data(output_filename, headers)
    
    for filter_predicate in filter_predicates:
        output_data = filter_data(filter_predicate, output_data)
        
    while True:
        row = yield
        output_data.send(row)
    

In [None]:
pipe = pipeline_coroutine(filename, 'output.csv', lambda row: 'Landau' in row[0], lambda row: row[2]==8 )

In [None]:
data = data_parser(filename)

In [None]:
for row in data:
    pipe.send(row)

In [None]:
pipe.close()

In [None]:
for row in data_reader('output.csv'):
    print(row)

In [None]:
@contextmanager
def pipeline(file_to_process, output_filename, *filter_predicates):
    pipe = pipeline_coroutine(file_to_process, output_filename, *filter_predicates)
    try:
        yield pipe
    finally:
        pipe.close()

In [None]:
with pipeline(filename, 'output.csv', 
              lambda row: 'Chevrolet Monte' in row[0] , 
              lambda row: 'Landau' in row[0]) as pipe:
    data = data_parser(filename)
    for datum in data:
        pipe.send(datum)

In [None]:
for row in data_parser('output.csv'):
    print(row)

In [None]:
def send_to_pipeline(filename, output_filename, *filter_predicates):
    with pipeline(filename, 'output.csv', *filter_predicates) as pipe:
    data = data_parser(filename)
    for datum in data:
        pipe.send(datum)

# Solution

In [4]:
import csv
from contextlib import contextmanager

def data_reader(filename):
    with open(filename, 'r') as file:
        dialect = csv.Sniffer().sniff(file.read(2000))
        file.seek(0)
        reader = csv.reader(file, dialect=dialect)
        yield from reader
        
def get_header(filename):
    data = data_reader(filename)
    return next(data)

def data_parser(filename):
    
    mapping_dict = {
        'Car' : str,
        'MPG' : float,
        'Cylinders' : int,
        'Displacement' : float,
        'Horsepower' : float,
        'Weight' : float,
        'Acceleration' : float,
        'Model' : int,
        'Origin' : str    
    }
    
    data = data_reader(filename)
    next(data)
    headers = get_header(filename)
    for row in data:
        yield [mapping_dict[header](datum) for header, datum in zip(headers, row)]
    
def coroutine(gen_fn):
    def inner(*args, **kwargs):
        gen = gen_fn(*args, **kwargs)
        next(gen)
        return gen
    return inner
    
@coroutine
def save_data(filename, header):
    with open(filename, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(header)
        while True:
            row = yield
            writer.writerow(row)
            
@coroutine
def filter_data(filter_predicate, target):
    while True:
        row = yield
        if filter_predicate(row):
            target.send(row)
            
@coroutine
def pipeline_coroutine(file_to_process, output_filename, *filter_predicates):
    headers = get_header(file_to_process)
    output_data = save_data(output_filename, headers)
    
    for filter_predicate in filter_predicates:
        output_data = filter_data(filter_predicate, output_data)
        
    while True:
        row = yield
        output_data.send(row)
            
@contextmanager
def pipeline(file_to_process, output_filename, *filter_predicates):
    pipe = pipeline_coroutine(file_to_process, output_filename, *filter_predicates)
    try:
        yield pipe
    finally:
        pipe.close()
        
def send_to_pipeline(filename, output_filename, *filter_predicates):
    with pipeline(filename, 'output.csv', *filter_predicates) as pipe:
        data = data_parser(filename)
        for datum in data:
            pipe.send(datum)

In [5]:
filename = 'cars.csv'
output_filename = 'output.csv'
filter_predicates = (lambda row: 'Chevrolet' in row[0] , 
                     lambda row: 'Landau' in row[0],
                     lambda row: 'Monte' in row[0],
                    )

In [6]:
send_to_pipeline(filename, output_filename, *filter_predicates)

In [7]:
for data in data_reader('output.csv'):
    print(data)

['Car', 'MPG', 'Cylinders', 'Displacement', 'Horsepower', 'Weight', 'Acceleration', 'Model', 'Origin']
['Chevrolet Monte Carlo Landau', '15.5', '8', '350.0', '170.0', '4165.0', '11.4', '77', 'US']
['Chevrolet Monte Carlo Landau', '19.2', '8', '305.0', '145.0', '3425.0', '13.2', '78', 'US']


In [8]:
send_to_pipeline(filename, output_filename, *filter_predicates)