# Generators as Coroutines

We will be using some Abstract Data Structures:
- queue is a data structure that supports (FIFO) First-in first out.
    - We add elements to back of queue
    - We remove elements from front of queue
- stack is a data structure that supports (LIFO) Last-in First out.
    - We add elements to top of stack
    - We remove elements from top of stack

In [11]:
# deque is very efficient at adding/removing items from both front and end of a collection
from collections import deque # double-ended queue

print("---- DEQUE ----")
dq = deque([1, 2, 3, 4, 5])
print(f"deque object: {dq}")
dq.append(6)
print(f"append to the right of the deque -> {dq}")
dq.appendleft(0)
print(f"append to the left of the deque -> {dq}")
rv = dq.pop()
print(f"pop from the right of the deque -> {dq}, value {rv}")
lf = dq.popleft()
print(f"pop from the left of the deque -> {dq}, value {lf}")

print("---- DEQUE MAX LEN ----")
dq_ml = deque([1, 2, 3, 4, 5], maxlen=5)
print(f"deque object: {dq_ml}, with max length: {dq_ml.maxlen}")
dq_ml.append(6)
print(f"append to the right of the deque -> {dq_ml}")

# as we can see when a max len is set when we append elements they push the existing ones.

---- DEQUE ----
deque object: deque([1, 2, 3, 4, 5])
append to the right of the deque -> deque([1, 2, 3, 4, 5, 6])
append to the left of the deque -> deque([0, 1, 2, 3, 4, 5, 6])
pop from the right of the deque -> deque([0, 1, 2, 3, 4, 5]), value 6
pop from the left of the deque -> deque([1, 2, 3, 4, 5]), value 0
---- DEQUE MAX LEN ----
deque object: deque([1, 2, 3, 4, 5], maxlen=5), with max length: 5
append to the right of the deque -> deque([2, 3, 4, 5, 6], maxlen=5)


## What is a coroutine?

they are cooperative routines/functions, we have functions that yield control so other functions can run in time.
- They always yield voluntarily, Threading on the other hand does not require that the function finishes or yields.
- the are cooperative multitasking.


In [17]:
from collections import deque
from math import prod

print("---- COROUTINE BASE ----")

def produce_elements(dq):
    for i in range(1, 10):
        dq.appendleft(i)

def consume_elements(dq):
    while len(dq) > 0:
        item = dq.pop()
        print("procesing item", item)

def coordinator():
    dq = deque()
    produce_elements(dq)
    consume_elements(dq)

coordinator()

print("---- COROUTINE GENERATOR ----")

def produce_elements(dq, n):
    for i in range(1, n):
        dq.appendleft(i)
        if len(dq) == dq.maxlen:
            print('queue full - yielding control')
            yield

def consume_elements(dq):
    while True:
        while len(dq) > 0:
            print('processing', dq.pop())
        print("queue empty - yielding control")
        yield

def coordinator():
    dq = deque(maxlen=10)
    producer = produce_elements(dq, 24)
    consumer = consume_elements(dq)
    while True:
        try:
            print('producing')
            next(producer)
        except StopIteration:
            break
        finally:
            print("consuming")
            next(consumer)


coordinator()

---- COROUTINE BASE ----
procesing item 1
procesing item 2
procesing item 3
procesing item 4
procesing item 5
procesing item 6
procesing item 7
procesing item 8
procesing item 9
---- COROUTINE GENERATOR ----
producing
queue full - yielding control
consuming
processing 1
processing 2
processing 3
processing 4
processing 5
processing 6
processing 7
processing 8
processing 9
processing 10
queue empty - yielding control
producing
queue full - yielding control
consuming
processing 11
processing 12
processing 13
processing 14
processing 15
processing 16
processing 17
processing 18
processing 19
processing 20
queue empty - yielding control
producing
consuming
processing 21
processing 22
processing 23
queue empty - yielding control


## Generator States

A generetator can be in any of the following states:
- CREATED, when the generator is instantiated
- RUNNING, when the generator is called
- SUSPENDED, when the generator yields
- CLOSED, when the generator finishes execution.

we can use the `inspect.getgeneratorstate` to see the current state of a generator.

In [30]:
from inspect import getgeneratorstate

print(f"---- GENERATOR STATES ----")

def gen(s):
    print(getgeneratorstate(g))
    for c in s: 
        yield c

g = gen('Python')

print(f"the state of the generator g: {g} is -> {getgeneratorstate(g)}")
next(g)
print(f"the state of the generator g: {g} is -> {getgeneratorstate(g)}")
list(g)
print(f"the state of the generator g: {g} is -> {getgeneratorstate(g)}")

---- GENERATOR STATES ----
the state of the generator g: <generator object gen at 0x10a8540b0> is -> GEN_CREATED
GEN_RUNNING
the state of the generator g: <generator object gen at 0x10a8540b0> is -> GEN_SUSPENDED
the state of the generator g: <generator object gen at 0x10a8540b0> is -> GEN_CLOSED


## Sending Data to Generators

The `yield` statement is actually an expression, that means that it can.
- yield a value (like the normal usage we see)
- it can also receive values, it is used like an expression would
    - To receive we can assign yield as an expression -> `var = yield`
    - For us to send data to the generator it must be in a SUSPENDED state since that means it is waiting at a yield.
    - That means we must PRIME (call next()) on the generator and then we can send data.
    - to send we use `generator.send(data)`.
- We can also mix both but it gets, REALLY confusing

Note: It is important to note that when python encounters a yield statement, it pauses right at the left of the yield.   
That means, when python finds:
- `my_var = yield 'hello'`, it pauses(yields 'hello') and it resumes by assigning the state to my_var.

In [13]:
from inspect import getgeneratorstate

print("---- YIELD AS EXPRESSION ----")

def echo():
    while True:
        received = yield
        print(f'you said: {received}')

ec = echo()
print(f"first we instantiate the generator: {ec} -> {getgeneratorstate(ec)}")
print("Then we prime it by calling next(ec)")
next(ec)
print(f"The state should now be -> {getgeneratorstate(ec)}")
print(f"Now we can send the string hello to the generator: {ec.send('hello')}")

print("---- PRIMING GENERATOR WITH NONE ----")
ec = echo()
print(f"first we instantiate the generator: {ec} -> {getgeneratorstate(ec)}")
print("Then we prime it by calling ec.send(None)")
ec.send(None)
print(f"The state should now be -> {getgeneratorstate(ec)}")
print(f"Now we can send the string hello to the generator: {ec.send('hello')}")

print("---- RECEIVE/YIELD GENERATOR ----")

def running_averager():
    total = 0
    count = 0
    running_average = None
    while True:
        value = yield running_average
        total += value
        count += 1
        running_average = total / count

def running_averages(iterable):
    averager = running_averager()
    next(averager)
    for value in iterable:
        running_average = averager.send(value)
        print(running_average)

running_averages([1, 2, 3, 4])
        

---- YIELD AS EXPRESSION ----
first we instantiate the generator: <generator object echo at 0x1098cbc30> -> GEN_CREATED
Then we prime it by calling next(ec)
The state should now be -> GEN_SUSPENDED
you said: hello
Now we can send the string hello to the generator: None
---- PRIMING GENERATOR WITH NONE ----
first we instantiate the generator: <generator object echo at 0x1098cbca0> -> GEN_CREATED
Then we prime it by calling ec.send(None)
The state should now be -> GEN_SUSPENDED
you said: hello
Now we can send the string hello to the generator: None
---- RECEIVE/YIELD GENERATOR ----
1.0
1.5
2.0
2.5


## Closing a Generator

We as the caller can explicitly close a generator in suspended state by calling it's `close` method.
- The `close` method triggers an exception called `GeneratorExit`.
    - it is ok not to catch the exception, python handles it.
    - it is advised against to try to catch and ignore the exception.
    - `GeneratorExit` does not inherit from `Exception`.

In [19]:
import csv
import itertools

from inspect import getgeneratorstate

print("---- GENERATOR DOES NOT CLOSE FILE ----")

def parse_file(f_name):
    print("opening file....")
    f = open(f_name, 'r')
    try:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        reader = csv.reader(f, dialect=dialect)
        for row in reader:
            yield row
    finally:
        print("Closing file....")
        f.close()

parser = parse_file("cars.csv")
for row in itertools.islice(parser, 10):
    print(row)
    # The file never closes!!!
# We have to explicitly tell it to close
parser.close()

print("---- GENERATOR CLOSE EXCEPTION ----")

def parse_file(f_name):
    print("opening file....")
    f = open(f_name, 'r')
    try:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        reader = csv.reader(f, dialect=dialect)
        for row in reader:
            yield row
    except Exception as ex:
        print("some exception occurred", ex)
    except GeneratorExit:
        print("The Generator Exit exception got raised!")
    finally:
        print("Closing file....")
        f.close()

parser = parse_file("cars.csv")
for row in itertools.islice(parser, 10):
    print(row)
    # The file never closes!!!
# We have to explicitly tell it to close
parser.close()

print("---- GENERATOR RAISING CUSTOM EXCEPTION ----")

def parse_file(f_name):
    print("opening file....")
    f = open(f_name, 'r')
    try:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        reader = csv.reader(f, dialect=dialect)
        for row in reader:
            try:
                yield row
            except GeneratorExit:
                print("The Generator Exit exception got raised!")
                raise Exception("this is my own exception") from None
    finally:
        print("Closing file....")
        f.close()

parser = parse_file("cars.csv")
for row in itertools.islice(parser, 10):
    print(row)
    # The file never closes!!!
# We have to explicitly tell it to close
parser.close()

---- GENERATOR DOES NOT CLOSE FILE ----
opening file....
['Car', 'MPG', 'Cylinders', 'Displacement', 'Horsepower', 'Weight', 'Acceleration', 'Model', 'Origin']
['Chevrolet Chevelle Malibu', '18.0', '8', '307.0', '130.0', '3504.', '12.0', '70', 'US']
['Buick Skylark 320', '15.0', '8', '350.0', '165.0', '3693.', '11.5', '70', 'US']
['Plymouth Satellite', '18.0', '8', '318.0', '150.0', '3436.', '11.0', '70', 'US']
['AMC Rebel SST', '16.0', '8', '304.0', '150.0', '3433.', '12.0', '70', 'US']
['Ford Torino', '17.0', '8', '302.0', '140.0', '3449.', '10.5', '70', 'US']
['Ford Galaxie 500', '15.0', '8', '429.0', '198.0', '4341.', '10.0', '70', 'US']
['Chevrolet Impala', '14.0', '8', '454.0', '220.0', '4354.', '9.0', '70', 'US']
['Plymouth Fury iii', '14.0', '8', '440.0', '215.0', '4312.', '8.5', '70', 'US']
['Pontiac Catalina', '14.0', '8', '455.0', '225.0', '4425.', '10.0', '70', 'US']
Closing file....
---- GENERATOR CLOSE EXCEPTION ----
opening file....
['Car', 'MPG', 'Cylinders', 'Displacem

Exception: this is my own exception

In [33]:
print("---- GENERATOR AS COROUTINE EXAMPLE ----")
class TransactionAborted(Exception):
    ...

def save_to_db():
    print("starting new transaction")
    is_abort = False # in case we abort we need to close the generator
    try:
        while True:
            data = yield
            print('sending data to database', eval(data))
    except Exception as ex:
        is_abort = True
        raise TransactionAborted(str(ex))
    finally:
        if is_abort:
            print('abort transaction')
        else:
            print('commit transaction')
trans = save_to_db()
next(trans)

# Add transactions
trans.send("1")
#trans.send("1 + 'a'")

trans.close()

---- GENERATOR AS COROUTINE EXAMPLE ----
starting new transaction
sending data to database 1
commit transaction


## Send Exceptions to Generators

We can also send any exception to a coroutine, which means that it will throw the exception.
- We can use the method `throw(exception)`.
- The exception is raised at the point where the coroutine is suspended.
- `throw` can be thought of as a `send` but for exceptions.

> Note: When we use `close()` to stop the generator the stopiteration is silenced by python since it expected it

In [42]:
print("---- CLOSE/THROW DIFFERENCE ----")

# Only way to stop the while is to get an exception
def gen():
    try:
        while True:
            received = yield
            print(received)
    except ValueError:
        print('received a value error....')
    except GeneratorExit:
        print('now i think i can leave')
    finally:
        print('exception must have happened')
        
print("---- CLOSE ----")
g = gen()
next(g)
g.send("hello")
g.close() # throws a GeneratorExit exception
print("no exception raised")

print("---- THROW ----")
g = gen()
next(g)
g.send("hello")
#g.throw(ValueError, "My custom Message")

print("---- THROW GENERATOREXIT ----")
g = gen()
next(g)
g.send("hello")
g.throw(GeneratorExit, "My custom Message")
# It is important to note that we got a stopIteration and tha's normal since we actually stoped the generator

---- CLOSE/THROW DIFFERENCE ----
---- CLOSE ----
hello
now i think i can leave
exception must have happened
no exception raised
---- THROW ----
hello
---- THROW GENERATOREXIT ----
now i think i can leave
exception must have happened
hello
now i think i can leave
exception must have happened


StopIteration: 

In [37]:
from inspect import getgeneratorstate

def gen():
    while True:
        try:
            received = yield
            print(received)
        except ValueError as ex:
            print('value error received:', str(ex))

g = gen()
next(g)
g.send("hello")
g.throw(ValueError, "My custom Message")

print(f"We did not get a StopIteration error since our generator is in state -> {getgeneratorstate(g)}") 

hello
value error received: My custom Message
We did not get a StopIteration error since our generator is in state -> GEN_SUSPENDED


In [38]:
class CommitException(Exception):
    pass

class RollBackException(Exception):
    pass

def write_to_db():
    print('opening database connection')
    print('start transaction')
    try:
        while True:
            try:
                data = yield
                print("writing data to database...", data)
            except CommitException:
                print("comitting transaction")
                print('opening next transaction....')
            except RollBackException:
                print('aborting transaction...')
                print('opening next transaction')
    finally:
        print('generator closing....')
        print('abort transaction...')
        print('closing database connection')

sql = write_to_db()
next(sql)
sql.send("pokemon.csv")
sql.throw(CommitException, "oh no")
sql.throw(RollBackException, "oh no no")
sql.send("pokemon2.csv")
sql.throw(ValueError, "oh crap")

opening database connection
start transaction
writing data to database... pokemon.csv
comitting transaction
opening next transaction....
aborting transaction...
opening next transaction
writing data to database... pokemon2.csv
generator closing....
abort transaction...
closing database connection


ValueError: oh crap

## Prime Coroutines

The pattern to prime a coroutine gets repeatitive so we can design a decorator to do it, we would need to:
- Create Coroutine instance
- Call next/send(None) -> Prime it
- Return it

In [48]:
import math
from typing import Type


def coroutine(gen_fn):
    def inner(*args, **kwargs):
        gen = gen_fn(*args, **kwargs)
        next(gen)
        return gen
    return inner

@coroutine
def echo():
    while True:
        received = yield
        print(received)

e = echo()
e.send("hello")

@coroutine
def power_up(p):
    result = None
    while True:
        received = yield result
        try:
            result = math.pow(received, p)
        except TypeError:
            result = None

g = power_up(2)
g.send(2)
g.send('3')
g.send(3)
g.close()

hello


## Async Await

In Python < 3.8, we used generator based coroutines, to achieve this we used the `yield from` operator. With the `yield from` we basically establish
a 2-way communication channel betweeen the base generator and a generator delegator(The one that has the yield from).

Remember that `yield from` works in a way that delegates to the base generator.
- when we call `next` it passes that next to the generator object.
- when we call `send`, `throw`, `close` it passes those to the generator object.
- When using `throw` the exception is sent to the base generator, and so the handling should be done there and not on the delegator
- when it receives the returned state from the generator it continues running, (moves from the `yield from` in the code)

`yield from` automatically primes the base generator once it is primed.


In [55]:
from inspect import getgeneratorlocals, getgeneratorstate

def echo():
    try:
        while True:
            value = yield "Thank you"
            print(value)
    finally:
        print("subgen: closing ....")
        return 'subgen: return value' # raises no stopiteration exception is a return is used

def delegator():
    s = echo()
    rv = yield from s
    print(rv)
    yield 'delegator: subgenclosed'
    print('delegator: closing ....')

g = delegator()
next(g)
s = getgeneratorlocals(g)['s']

print(f"subgen status: {getgeneratorstate(s)}, delegator status: {getgeneratorstate(g)}")

message = g.send("hello world!")
print(f"message returned from echo through the delegator -> {message}")


try:
    g.throw(ValueError)
except StopIteration as ex:
    print(ex.value)
print(next(g)) 

subgen status: GEN_SUSPENDED, delegator status: GEN_SUSPENDED
hello world!
message returned from echo through the delegator -> Thank you
subgen: closing ....
subgen: return value
delegator: closing ....


StopIteration: 

In [24]:
l = [1, 2, [3, 4, [5, 6]], [7, [8, 9, 10]]]
i = [1, 2, (3, 4, {5, 6}), [7, [8, 'abc']]]

def is_iterable(item, *, str_is_iterable=True):
    try:
        iter(item)
    except:
        return False
    else:
        if isinstance(item, str):
            if str_is_iterable and len(item) > 1:
                return True
            else:
                return False
        else:
            return True

def flatten_gen(curr_item):
    if is_iterable(curr_item):
        for item in curr_item:
            yield from flatten_gen(item)
    else:
        yield curr_item

flattened_list = flatten_gen(l)

print(flattened_list)
print(list(flattened_list))

flattened_iter = flatten_gen(l)
print(flattened_iter)
print(list(flattened_iter))

<generator object flatten_gen at 0x10d178510>
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
<generator object flatten_gen at 0x10d178660>
[1, 2, 3, 4, 5, 6, 7, 8, 'a', 'b', 'c']


In [9]:
def echo():
    try:
        while True:
            result = yield
            print(result)
    except ZeroDivisionError:
        print("sub gen got the exception")

def delegator():
    try:
        yield from echo()
    except ValueError: # exceptions first get sent to the delegator and if it does not handle it
        # They get propagated to the subgenerator
        print("delegator got the value error")

d = delegator()
next(d)
d.send("hello")
d.throw(ZeroDivisionError)

hello
sub gen got the exception


StopIteration: 

In [10]:
def echo():
    try:
        while True:
            result = yield
            print(result)
    except ZeroDivisionError:
        print("sub gen got the exception")

def delegator():
    try:
        yield from echo()
    except ValueError: # exceptions first get sent to the delegator and if it does not handle it
        # They get propagated to the subgenerator
        print("delegator got the value error")

d = delegator()
next(d)
d.send("hello")
d.throw(ValueError)

hello
delegator got the value error


StopIteration: 

In [6]:
class CloseCoroutine(Exception):
    pass

class IgnoreMe(Exception):
    pass

def echo():
    try:
        while True:
            try:
                received = yield
                print(received)
            except IgnoreMe:
                yield "I'm ignoring you!!!"
    except CloseCoroutine:
        return 'coroutine was closed'
    except GeneratorExit:
        # if we call close then no exception is thrown, however if we throw the GeneratorExit
        # then a StopIteration get's raised and not silenced by python
        print('closed method was called/or GeneratorExit thrown')

def delegator():
    result = yield from echo()
    yield 'subgen closed and returned', result
    print('delegator closing...')

# create the delegator
d = delegator()
# Prime the delegator
next(d)
# test it
d.send("hello world")
# if we throw the IgnoreMe exception
result = d.throw(IgnoreMe)
print(result)
next(d) # we need to send next because it is waiting at the yield in the IgnoreMe Exception
d.send("hello world again")
# if we throw the CloseCoroutine we get the message
d.throw(CloseCoroutine)


hello world
I'm ignoring you!!!
hello world again


('subgen closed and returned', 'coroutine was closed')

## Application Pipelines

In [18]:
import csv

def parse_data(f_name):
    with open(f_name) as f:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        next(f)
        yield from csv.reader(f, dialect=dialect)

def filter_data(rows, contains):
    for row in rows:
        if contains in row[0]:
            yield row

def output(f_name, *filter_words):
    data = parse_data(f_name)
    for filter_word in filter_words:
        data = filter_data(data, filter_word)
    yield from data

pipeline = output("cars.csv", "Chevrolet", "Impala")

for row in pipeline:
    print(row)

['Chevrolet Impala', '14.0', '8', '454.0', '220.0', '4354.', '9.0', '70', 'US']
['Chevrolet Impala', '14.0', '8', '350.0', '165.0', '4209.', '12.0', '71', 'US']
['Chevrolet Impala', '13.0', '8', '350.0', '165.0', '4274.', '12.0', '72', 'US']
['Chevrolet Impala', '11.0', '8', '400.0', '150.0', '4997.', '14.0', '73', 'US']


In [21]:
import math

def coroutine(coro):
    def inner(*args, **kwargs):
        gen = coro(*args, **kwargs)
        next(gen)
        return gen
    return inner

@coroutine
def handle_data():
    while True:
        received = yield
        print(received)

@coroutine
def power_up(n, next_gen):
    while True:
        received = yield
        output = math.pow(received, n)
        next_gen.send(output)

@coroutine
def filter_even(next_gen):
    while True:
        received = yield
        if received % 2 == 0:
            next_gen.send(received)

print_data = handle_data()
filtered_data = filter_even(print_data)
gen = power_up(2, filtered_data)

for i in range(10):
    gen.send(i)

0.0
4.0
16.0
36.0
64.0


In [31]:
import csv

def data_reader(f_name):
    with open(f_name) as f:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        yield from csv.reader(f, dialect=dialect)

def coroutine(coro):
    def inner(*args, **kwargs):
        gen = coro(*args, **kwargs)
        next(gen)
        return gen
    return inner

input_file = 'car_data.csv'
idx_make = 0
idx_model = 1
idx_year = 2
idx_vin = 3
idx_color = 4

headers = ('make', 'model', 'year', 'vin', 'color')
converters = (str, str, int, str, str)

def data_parser():
    data = data_reader(input_file)
    next(data)
    for row in data:
        parsed_row = [converter(item) for converter, item in zip(converters, row)]
        yield parsed_row

@coroutine
def save_data(f_name, headers):
    with open(f_name, newline="", mode="w") as f:
        writer = csv.writer(f)
        writer.writerow(headers)
        while True:
            data_row = yield
            writer.writerow(data_row)

@coroutine
def filter_data(filter_predicate, target):
    while True:
        data_row = yield
        if filter_predicate(data_row):
            target.send(data_row)

@coroutine
def broadcast(targets):
    while True:
        data_row = yield
        for target in targets:
            target.send(data_row)

@coroutine
def pipeline_coro():
    out_pink = save_data("pink_cars.csv", headers)
    out_green_ford = save_data('ford_green.csv', headers)
    out_older = save_data('older.csv', headers)

    filter_pink_cars = filter_data(lambda d: d[idx_color].lower() == 'pink', out_pink)

    def pred_ford_green(d):
        return (d[idx_make].lower() == 'ford' and d[idx_color].lower() == 'green')
    filter_green_ford = filter_data(pred_ford_green, out_green_ford)

    filter_old = filter_data(lambda d: d[idx_year] <= 2010, out_older)

    filters = (filter_pink_cars, filter_green_ford, filter_old)

    broadcaster = broadcast(filters)

    while True:
        row = yield
        broadcaster.send(row)

from contextlib import contextmanager

@contextmanager
def pipeline():
    pipe = pipeline_coro()
    try:
        yield pipe
    finally:
        pipe.close()

with pipeline() as pipe:
    data = data_parser()
    for row in data:
        pipe.send(row)

# Exercise

## Goal 1

The goal of this project is to rewrite the pull pipeline we created in the **Application - Pipelines - Pulling** video in the **Generators as Coroutines** section.

You should look at the techniques we used in the **Application - Pipelines - Broadcasting** video and apply them here.

The goal is to write a pipeline that will push data from the source file, `cars.csv`, and push it through some filters and a save coroutine to ultimately save the results as a csv file.

Try to make your code as generic as possible, and don't worry about column headers in the output file (unless you really want to!).

When you are done with your solution you should be able to specify an arbitrary number of filters on the name field.

If you specify `Chevrolet`, `Carlo` and `Landau` for three filters, your output file should contain two lines of data only:

```
Chevrolet Monte Carlo Landau,15.5,8,350.0,170.0,4165.,11.4,77,US
Chevrolet Monte Carlo Landau,19.2,8,305.0,145.0,3425.,13.2,78,US
```

In [28]:
import csv

def parse_data(f_name):
    f = open(f_name)
    try:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        next(f)  # skip header row
        yield from csv.reader(f, dialect=dialect)
    finally:
        f.close()

def coroutine(fn):
    def inner(*args, **kwargs):
        coro = fn(*args, **kwargs)
        next(coro)
        return coro
    return inner

@coroutine
def save_csv(f_name):
    with open(f_name, 'w', newline='') as f:
        writer = csv.writer(f)
        while True:
            row = yield
            writer.writerow(row)

@coroutine
def filter_data(filter_pred, target):
    while True:
        row = yield
        if filter_pred(row):
            target.send(row)

@coroutine
def pipeline_coro(out_file, name_filters):
    save = save_csv(out_file)
    
    target = save
    for name_filter in name_filters:
        target = filter_data(lambda d, v=name_filter: v in d[0], target)
        # warning: we have to use the trick above because
        # lambdas are actually closures and the free variable name_filter
        # is a shared free variable - we have seen this problem before!
    while True:
        received = yield
        target.send(received)


from contextlib import contextmanager

@contextmanager
def pipeline(out_file, name_filters):
    p = pipeline_coro(out_file, name_filters)
    try:
        yield p
    finally:
        p.close()

with pipeline('out.csv', ('Chevrolet', 'Landau', 'Carlo')) as p:
    for row in parse_data('cars.csv'):
        p.send(row)