In [17]:
from collections import deque
from inspect import getgeneratorstate, getgeneratorlocals

In [2]:
def produce_elements(dq):
    for i in range(1, 6):
        dq.appendleft(i)
        
def consume_elements(dq):
    while len(dq) > 0:
        item = dq.pop()
        print('processing item: ', item)
        
def coordinator():
    dq = deque()
    produce_elements(dq)
    consume_elements(dq)
    
coordinator()

processing item:  1
processing item:  2
processing item:  3
processing item:  4
processing item:  5


In [3]:
def produce_elements(dq, n):
    for i in range(1, n):
        dq.appendleft(i)
        if len(dq) == dq.maxlen:
            print('queue full - yielding control')
            yield
            
def consume_elements(dq):
    while True:
        while len(dq) > 0:
            print('processing ', dq.pop())
        print('queue empty - yielding control')
        yield
        

def coordinator():
    dq = deque(maxlen=10)
    producer = produce_elements(dq, 18)
    consumer = consume_elements(dq)
    while True:
        try:
            print('processing...')
            next(producer)
        except StopIteration:
            # producer finished
            break
        finally:
            print('consuming')
            next(consumer)

coordinator()        

processing...
queue full - yielding control
consuming
processing  1
processing  2
processing  3
processing  4
processing  5
processing  6
processing  7
processing  8
processing  9
processing  10
queue empty - yielding control
processing...
consuming
processing  11
processing  12
processing  13
processing  14
processing  15
processing  16
processing  17
queue empty - yielding control


## **<p style="color: #FF3232"> Generator States </p>**

In [4]:


def gen(s):
    yield from s

g = gen('abc')

print(getgeneratorstate(g))
print(next(g))
print(getgeneratorstate(g))
print(list(g))
print(getgeneratorstate(g))

GEN_CREATED
a
GEN_SUSPENDED
['b', 'c']
GEN_CLOSED


## **<p style="color: darkorange"> Sending to generators </p>**

In [5]:
def echo():
    while True:
        received = yield
        print('you said: ', received)
        
e = echo()
print(getgeneratorstate(e))
next(e)
print(getgeneratorstate(e))
# we have to wait sending something until the generator is suspended
e.send('python')
print(getgeneratorstate(e))


GEN_CREATED
GEN_SUSPENDED
you said:  python
GEN_SUSPENDED


In [6]:
def averager():
    total = 0
    count = 0
    
    def inner(value):
        nonlocal total
        nonlocal count
        total += value
        count += 1
        return total / count
    return inner

def running_averages(iterable):
    avg = averager()
    for value in iterable:
        running_average = avg(value)
        print(running_average)
        

running_averages([1, 2, 3, 4])

1.0
1.5
2.0
2.5


### Doing the same thing with a co-routine generator

In [7]:
# This is a generator
def running_averager():
    total = 0
    count = 0
    running_average = None
    while True:
        value = yield running_average
        total += value
        count += 1
        running_average = total / count
        
def running_averages(iterable):
    # initializing the generator
    averager = running_averager() 
    
    # priming the generator
    next(averager)
    
    # sending values to the generator
    for value in iterable:
        running_average = averager.send(value)
        print(running_average)
    

running_averages([1, 2, 3, 4])

1.0
1.5
2.0
2.5


## Closing generators

In [8]:
import csv
import itertools

def parse_file(f_name):
    print('opening file')
    f = open(f_name, 'r')
    try:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        next(f)
        reader = csv.reader(f, dialect=dialect)
        for row in reader:
            try:
                yield row
            except GeneratorExit:
                print('got call to close generator')   
                raise Exception('something happened') from None
    finally:
        print('closing file')
        f.close()
        
parser = parse_file('../../../01-datasets/python-deep-dive/cars.csv')
for row in itertools.islice(parser, 10):
    print(row)
parser.close()

opening file
['STRING', 'DOUBLE', 'INT', 'DOUBLE', 'DOUBLE', 'DOUBLE', 'DOUBLE', 'INT', 'CAT']
['Chevrolet Chevelle Malibu', '18.0', '8', '307.0', '130.0', '3504.', '12.0', '70', 'US']
['Buick Skylark 320', '15.0', '8', '350.0', '165.0', '3693.', '11.5', '70', 'US']
['Plymouth Satellite', '18.0', '8', '318.0', '150.0', '3436.', '11.0', '70', 'US']
['AMC Rebel SST', '16.0', '8', '304.0', '150.0', '3433.', '12.0', '70', 'US']
['Ford Torino', '17.0', '8', '302.0', '140.0', '3449.', '10.5', '70', 'US']
['Ford Galaxie 500', '15.0', '8', '429.0', '198.0', '4341.', '10.0', '70', 'US']
['Chevrolet Impala', '14.0', '8', '454.0', '220.0', '4354.', '9.0', '70', 'US']
['Plymouth Fury iii', '14.0', '8', '440.0', '215.0', '4312.', '8.5', '70', 'US']
['Pontiac Catalina', '14.0', '8', '455.0', '225.0', '4425.', '10.0', '70', 'US']
got call to close generator
closing file


Exception: something happened

In [9]:
class TransactionAborted(Exception):
    pass

def save_to_db():
    print('starting new transaction')
    is_abort = False
    try:
        while True:
            data = yield
            print('sending data to database: ', eval(data))
    except Exception as ex:
        is_abort = True
        raise TransactionAborted(str(ex))
    except GeneratorExit:
        print('committing transaction')
        raise
    finally:
        if is_abort:
            print('rollback transaction')
        else:
            print('commit transaction')
            

trans = save_to_db()
next(trans)
trans.send('1 + 1')
trans.send('1 / 0')
trans.close()
                

starting new transaction
sending data to database:  2
rollback transaction


TransactionAborted: division by zero

## Sending exceptions to generators

In [10]:
def gen():
    try:
        while True:
            received = yield
            print(received)
    except ValueError:
        print('received a value error')
    finally:
        print('exception must have happened')
        


In [11]:
def gen():
    while True:
        try:
            received = yield
            print(received)
        except ValueError as ex:
            print('value error received: ', str(ex))
            
g = gen()
next(g)
g.send('hello')
g.throw(ValueError, 'custom message')
g.send('python')
            

hello
value error received:  custom message
python


In [12]:
class CommitException(Exception):
    pass

class RollbackException(Exception):
    pass

def write_to_db():
    print('opening database connection')
    print('start transaction')
    try:
        while True:
            try:
                data = yield
                print('writing data to database...', data)
            except CommitException:
                print('comitting transaction')
                print('opening next transaction')
            except RollbackException:
                print('aborting transaction')
                print('opening next transaction')
    finally:
        print('generator closing')
        print('abort transaction')
        print('closing database connection')
                

# Using decorators to prime Coroutines

In [13]:
import math

def coroutine(gen_fn):
    def inner(*args, **kwargs):
        gen = gen_fn(*args, **kwargs)
        next(gen)
        return gen
    return inner


@coroutine
def power_up(p):
    result = None
    while True:
        received = yield result
        result = math.pow(received, p)

         
squares = power_up(2)
squares.send(9)

81.0

# Yield from - two-way communications

In [15]:
def squares(n):
    for i in range(n):
        yield i **2
        
def delegator(n):
    yield from squares(n)
    
gen = delegator(5)

for _ in range(5):
    print(next(gen), end=', ')

0, 1, 4, 9, 16, 

- caller next --> delegator --> subgen
- subgen yields --> delegator yields --> caller

In [22]:
def song():
    yield 'I am a lumberjack and I am ok'
    yield 'I sleep all night and I work all day'
    
def play_song():
    count = 0
    s = song()
    yield from s
    yield 'song finished'
    print('player is exiting')
    
player = play_song()
print('getgeneratorstate(player) ', getgeneratorstate(player))
print('getgeneratorlocals(player)', getgeneratorlocals(player))
print(next(player))

print('getgeneratorstate(player) ', getgeneratorstate(player))
print('getgeneratorlocals(player)', getgeneratorlocals(player))

s = getgeneratorlocals(player)['s']
print('getgeneratorstate(s) ', getgeneratorstate(s))
print('getgeneratorlocals(s)', getgeneratorlocals(s))
print(next(player))
print(next(player))

getgeneratorstate(player)  GEN_CREATED
getgeneratorlocals(player) {}
I am a lumberjack and I am ok
getgeneratorstate(player)  GEN_SUSPENDED
getgeneratorlocals(player) {'count': 0, 's': <generator object song at 0x0000021173A62510>}
getgeneratorstate(s)  GEN_SUSPENDED
getgeneratorlocals(s) {}
I sleep all night and I work all day
song finished


In [27]:
def song():
    yield 'I am a lumberjack and I am ok'
    yield 'I sleep all night and I work all day'
    
def player():
    count = 1
    while True:
        print('run count: ', count)
        yield from song()
        count += 1
        
p = player()
next(p), next(p)

run count:  1


('I am a lumberjack and I am ok', 'I sleep all night and I work all day')

In [28]:
next(p), next(p)

run count:  2


('I am a lumberjack and I am ok', 'I sleep all night and I work all day')

# Yield from - sending data

In [33]:
def echo():
    output = None
    while True:
        received = yield output
        output = received[::-1]
        
def delegator():
    yield from echo()
    
d = delegator()
next(d)
d.send('python')

'nohtyp'

# Flatten a list
## First iteration

In [34]:
def flatten(curr_item, output):
    if isinstance(curr_item, list):
        for item in curr_item:
            flatten(item, output)
    else:
        output.append(curr_item)
 
lst = [1, 2, [3, 4], [[5, 6], [7, 8]]]       
output = []
flatten(lst, output)
print(output)

[1, 2, 3, 4, 5, 6, 7, 8]


## Second iteration (as iterator)

In [35]:
def flatten_gen(curr_item):
    if isinstance(curr_item, list):
        for item in curr_item:
            yield from flatten_gen(item)
    else:
        yield curr_item
        
lst = [1, 2, [3, 4], [[5, 6], [7, 8]]]       
gen = flatten_gen(lst)
list(gen)

[1, 2, 3, 4, 5, 6, 7, 8]

## Third iteration

In [37]:
def is_iterable(item, *, str_is_iterable=True):
    try:
        iter(item)
    except:
        return False
    else:
        if isinstance(item, str):
            if str_is_iterable and len(item) > 1:
                return True
            else:
                return False
        else:
            return True
        
def flatten_gen(curr_item):
    if is_iterable(curr_item):
        for item in curr_item:
            yield from flatten_gen(item)
    else:
        yield curr_item
        
lst = ['abc', 1, 2, (3, 4), [{5, 6}, [7, 8]]]       
gen = flatten_gen(lst)
list(gen)

['a', 'b', 'c', 1, 2, 3, 4, 5, 6, 7, 8]

## Fourth iteration
With refactoring suggestion from sourcery

In [38]:
def is_iterable(item, *, str_is_iterable=True):
    try:
        iter(item)
    except:
        return False
    else:
        return bool(
            isinstance(item, str)
            and str_is_iterable
            and len(item) > 1
            or not isinstance(item, str)
        )
        
def flatten_gen(curr_item):
    if is_iterable(curr_item):
        for item in curr_item:
            yield from flatten_gen(item)
    else:
        yield curr_item
        
lst = ['abc', 1, 2, (3, 4), [{5, 6}, [7, 8]]]       
gen = flatten_gen(lst)
list(gen)

['a', 'b', 'c', 1, 2, 3, 4, 5, 6, 7, 8]

## Fifth iteration
Add 'str_is_iterable' argument to flatten_gen

In [39]:
def is_iterable(item, *, str_is_iterable=True):
    try:
        iter(item)
    except:
        return False
    else:
        return bool(
            isinstance(item, str)
            and str_is_iterable
            and len(item) > 1
            or not isinstance(item, str)
        )
        
def flatten_gen(curr_item,  *, str_is_iterable=True):
    if is_iterable(curr_item, str_is_iterable=str_is_iterable):
        for item in curr_item:
            yield from flatten_gen(item)
    else:
        yield curr_item
        
lst = ['abc', 1, 2, (3, 4), [{5, 6}, [7, 8]]]       
gen = flatten_gen(lst)
list(gen)

['a', 'b', 'c', 1, 2, 3, 4, 5, 6, 7, 8]

# Yield from - closing and return

In [44]:
def subgen():
    try:
        while True:
            received = yield
            print(received)
    finally:
        print('subgen: closing')
        
def delegator():
    s = subgen()
    yield from s
    yield 'delegator: subgen closing'
    print('delegator closing')
    
d = delegator()
next(d)
s = getgeneratorlocals(d)['s']
print('getgeneratorstate(d)', getgeneratorstate(d))
print('getgeneratorstate(s)', getgeneratorstate(s))
print('getgeneratorlocals(d) ', getgeneratorlocals(d))
print('getgeneratorlocals(s) ', getgeneratorlocals(s))  

d.send('hello')
d.close()
print('getgeneratorstate(d)', getgeneratorstate(d))
print('getgeneratorstate(s)', getgeneratorstate(s))

subgen: closing
getgeneratorstate(d) GEN_SUSPENDED
getgeneratorstate(s) GEN_SUSPENDED
getgeneratorlocals(d)  {'s': <generator object subgen at 0x0000021173A8F2E0>}
getgeneratorlocals(s)  {}
hello
subgen: closing
getgeneratorstate(d) GEN_CLOSED
getgeneratorstate(s) GEN_CLOSED


In [49]:
def subgen():
    try:
        yield 1
        yield 2
    finally:
        print('subgen closing...')
        return 100
    

def delegator():
    s = subgen()
    result = yield from s
    print('subgen returned: ', result)
    yield 'delegator suspended'
    print('delegator closing...Now you will get a StopIteration exception')
    
d = delegator()
print(next(d))
print(next(d))
print(next(d))
print(next(d))

1
2
subgen closing...
subgen returned:  100
delegator suspended
delegator closing...Now you will get a StopIteration exception


StopIteration: 

# Yield From - Throwing Exceptions

In [59]:
class CloseCoroutine(Exception):
    pass

def echo():
    try:
        while True:
            received = yield
            print(received)
    except CloseCoroutine:
        return 'coro was closed'
    except GeneratorExit:
        print('closed method was called/or GeneratorExit thrown')
        return 'From a GeneratorExit'
        
        
e = echo()
next(e)
e.throw(CloseCoroutine)

StopIteration: coro was closed

In [60]:
e = echo()
next(e)
e.close()

closed method was called/or GeneratorExit thrown


In [61]:
e = echo()
next(e)
e.throw(GeneratorExit)

closed method was called/or GeneratorExit thrown


StopIteration: From a GeneratorExit

In [62]:
class CloseCoroutine(Exception):
    pass

def echo():
    try:
        while True:
            received = yield
            print(received)
    except CloseCoroutine:
        return 'coro was closed'
    except GeneratorExit:
        print('closed method was called/or GeneratorExit thrown')
        return 'From a GeneratorExit'
        

def delegator():
    result = yield from echo()
    yield 'subgen closed and returned', result
    print('delegator closing')
     
d = delegator()
next(d)
d.send('hello')
d.throw(CloseCoroutine)

hello


('subgen closed and returned', 'coro was closed')

In [68]:
class CloseCoroutine(Exception):
    pass

class IgnoreMe(Exception):
    pass

def echo():
    output = None
    try:
        while True:
            try:
                received = yield output
                print(received)
            except IgnoreMe:
                output = 'I am ignoring you'
            else:
                output = None
    except CloseCoroutine:
        return 'coro was closed'
    except GeneratorExit:
        print('closed method was called/or GeneratorExit thrown')
        return 'From a GeneratorExit'
        

def delegator():
    result = yield from echo()
    yield 'subgen closed and returned', result
    print('delegator closing')
     
d = delegator()
next(d)
d.send('hello')
result = d.throw(IgnoreMe, 1000)
print(result)
d.send('hello again..')
d.close()

hello
I am ignoring you
hello again..
closed method was called/or GeneratorExit thrown


# Application - Pipelines

## Pulling data

In [72]:
import csv
import itertools

def parse_data(f_name):
    with open(f_name, 'r') as f:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        next(f)
        next(f)
        yield from csv.reader(f, dialect=dialect)
        
        
parser = parse_data('../../../01-datasets/python-deep-dive/cars.csv')
for row in itertools.islice(parser, 10):
    print(row)


['Chevrolet Chevelle Malibu', '18.0', '8', '307.0', '130.0', '3504.', '12.0', '70', 'US']
['Buick Skylark 320', '15.0', '8', '350.0', '165.0', '3693.', '11.5', '70', 'US']
['Plymouth Satellite', '18.0', '8', '318.0', '150.0', '3436.', '11.0', '70', 'US']
['AMC Rebel SST', '16.0', '8', '304.0', '150.0', '3433.', '12.0', '70', 'US']
['Ford Torino', '17.0', '8', '302.0', '140.0', '3449.', '10.5', '70', 'US']
['Ford Galaxie 500', '15.0', '8', '429.0', '198.0', '4341.', '10.0', '70', 'US']
['Chevrolet Impala', '14.0', '8', '454.0', '220.0', '4354.', '9.0', '70', 'US']
['Plymouth Fury iii', '14.0', '8', '440.0', '215.0', '4312.', '8.5', '70', 'US']
['Pontiac Catalina', '14.0', '8', '455.0', '225.0', '4425.', '10.0', '70', 'US']
['AMC Ambassador DPL', '15.0', '8', '390.0', '190.0', '3850.', '8.5', '70', 'US']


In [74]:
def parse_data(f_name):
    with open(f_name, 'r') as f:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        next(f)
        next(f)
        yield from csv.reader(f, dialect=dialect)

def filter_data(rows, contains):
    for row in rows:
        if contains in row[0]:
            yield row
            
data = parse_data('../../../01-datasets/python-deep-dive/cars.csv')
filtered_data_1 = filter_data(data, 'Chevrolet')
filtered_data_2 = filter_data(filtered_data_1, 'Carlo')

for row in itertools.islice(filtered_data_2, 5):
    print(row)

['Chevrolet Monte Carlo', '15.0', '8', '400.0', '150.0', '3761.', '9.5', '70', 'US']
['Chevrolet Monte Carlo S', '15.0', '8', '350.0', '145.0', '4082.', '13.0', '73', 'US']
['Chevrolet Monte Carlo Landau', '15.5', '8', '350.0', '170.0', '4165.', '11.4', '77', 'US']
['Chevrolet Monte Carlo Landau', '19.2', '8', '305.0', '145.0', '3425.', '13.2', '78', 'US']


In [80]:
def parse_data(f_name):
    with open(f_name, 'r') as f:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        next(f)
        next(f)
        yield from csv.reader(f, dialect=dialect)

def filter_data(rows, contains):
    for row in rows:
        if contains in row[0]:
            yield row
            
def output(f_name, *filter_words):
    data = parse_data(f_name)
    for filter_word in filter_words:
        data = filter_data(data, filter_word)
    yield from data

f_name_path = '../../../01-datasets/python-deep-dive/cars.csv'          
results = output(f_name_path, 'Chevrolet', 'Carlo')
for row in itertools.islice(results, 5):
    print(row)

['Chevrolet Monte Carlo', '15.0', '8', '400.0', '150.0', '3761.', '9.5', '70', 'US']
['Chevrolet Monte Carlo S', '15.0', '8', '350.0', '145.0', '4082.', '13.0', '73', 'US']
['Chevrolet Monte Carlo Landau', '15.5', '8', '350.0', '170.0', '4165.', '11.4', '77', 'US']
['Chevrolet Monte Carlo Landau', '19.2', '8', '305.0', '145.0', '3425.', '13.2', '78', 'US']


## Pushing data

In [92]:
import math

def coroutine(coro):
    def inner(*args, **kwargs):
        gen = coro(*args, **kwargs)
        next(gen)
        return gen
    return inner

@coroutine
def handle_data():
    while True:
        received = yield
        print(received)
        
@coroutine
def power_up(n, next_gen):
    while True:
        received = yield
        output = math.pow(received, n)
        next_gen.send(output)
        
@coroutine
def filter_even(next_gen):
    while True:
        received = yield
        if received % 2 == 0:
            next_gen.send(received)


print_data = handle_data()
filtered = filter_even(print_data)
gen2 = power_up(3, filtered)
gen1 = power_up(2, gen2)
for i in range(1, 6):
    gen1.send(i)              

1.0
4.0
9.0
16.0
25.0


## Broadcasting

In [97]:
import csv 

def data_reader(f_name):
    f = open(f_name)
    try:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        reader = csv.reader(f, dialect=dialect)
        yield from reader
    finally:
        f.close()

idx_make = 0
idx_model = 1
idx_year = 2
idx_vin = 3
idx_color = 4

headers = ('make', 'model', 'year', 'vin', 'color') 
converters = (str, str, int, str, str)

def data_parser():
    f_name_path = '../../../01-datasets/python-deep-dive/cars.csv'       
    data = data_reader(f_name_path)
    next(data)
    next(data)
    for row in data:
        parsed_row = [converter(item) for converter, item in zip(converters, row)]
        yield parsed_row
 
data = data_parser()
for _ in range(5):
    print(next(data))

['Chevrolet Chevelle Malibu', '18.0', 8, '307.0', '130.0']
['Buick Skylark 320', '15.0', 8, '350.0', '165.0']
['Plymouth Satellite', '18.0', 8, '318.0', '150.0']
['AMC Rebel SST', '16.0', 8, '304.0', '150.0']
['Ford Torino', '17.0', 8, '302.0', '140.0']


In [101]:
import csv 

def data_reader(f_name):
    f = open(f_name)
    try:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        reader = csv.reader(f, dialect=dialect)
        yield from reader
    finally:
        f.close()

idx_make = 0
idx_model = 1
idx_year = 2
idx_vin = 3
idx_color = 4

headers = ('make', 'model', 'year', 'vin', 'color') 
converters = (str, str, int, str, str)

def data_parser():
    f_name_path = '../../../01-datasets/python-deep-dive/cars.csv'       
    data = data_reader(f_name_path)
    next(data)
    next(data)
    for row in data:
        parsed_row = [converter(item) for converter, item in zip(converters, row)]
        yield parsed_row

def coroutine(coro):
    def inner(*args, **kwargs):
        gen = coro(*args, **kwargs)
        next(gen)
        return gen
    return inner

@coroutine
def save_data(f_name, headers):
    """Write data to file"""
    with open(f_name, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(headers)
        while True:
            data_row = yield
            writer.writerow(data_row)
            
@coroutine
def filter_data(filter_predicate, target):
    while True:
        data_row = yield
        if filter_predicate(data_row):
            target.send(data_row)
            
@coroutine
def broadcast(targets):
    while True:
        data_row = yield
        for target in targets:
            target.send(data_row)

def process_data():
    out_pink_cars = save_data('../../../01-datasets/python-deep-dive/pink_cars.csv', headers)
    out_ford_green = save_data('../../../01-datasets/python-deep-dive/ford_green.csv', headers)   
    out_older = save_data('../../../01-datasets/python-deep-dive/older.csv', headers)
    
    filter_pink_cars = filter_data(lambda d: d[idx_color].lower() == 'pink', out_pink_cars)   
    
    def pred_ford_green(d):
        return (d[idx_make].lower() == 'ford' and d[idx_color].lower() == 'green')
    
    filter_ford_green = filter_data(pred_ford_green, out_ford_green)
    filter_old = filter_data(lambda d: d[idx_year] <= 2010, out_older)  
    filters = (filter_pink_cars, filter_ford_green, filter_old) 
    
    broadcaster= broadcast(filters)
    
    for row in data_parser():
        broadcaster.send(row)
        
    print('Finished processing')
    

process_data()
    

Finished processing


## Second iteration

In [103]:
import csv 

def data_reader(f_name):
    f = open(f_name)
    try:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        reader = csv.reader(f, dialect=dialect)
        yield from reader
    finally:
        f.close()

idx_make = 0
idx_model = 1
idx_year = 2
idx_vin = 3
idx_color = 4

headers = ('make', 'model', 'year', 'vin', 'color') 
converters = (str, str, int, str, str)

def data_parser():
    f_name_path = '../../../01-datasets/python-deep-dive/cars.csv'       
    data = data_reader(f_name_path)
    next(data)
    next(data)
    for row in data:
        parsed_row = [converter(item) for converter, item in zip(converters, row)]
        yield parsed_row

def coroutine(coro):
    def inner(*args, **kwargs):
        gen = coro(*args, **kwargs)
        next(gen)
        return gen
    return inner

@coroutine
def save_data(f_name, headers):
    """Write data to file"""
    with open(f_name, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(headers)
        while True:
            data_row = yield
            writer.writerow(data_row)
            
@coroutine
def filter_data(filter_predicate, target):
    while True:
        data_row = yield
        if filter_predicate(data_row):
            target.send(data_row)
            
@coroutine
def broadcast(targets):
    while True:
        data_row = yield
        for target in targets:
            target.send(data_row)

@coroutine
def pipeline_coro():
    out_pink_cars = save_data('../../../01-datasets/python-deep-dive/pink_cars.csv', headers)
    out_ford_green = save_data('../../../01-datasets/python-deep-dive/ford_green.csv', headers)   
    out_older = save_data('../../../01-datasets/python-deep-dive/older.csv', headers)
    
    filter_pink_cars = filter_data(lambda d: d[idx_color].lower() == 'pink', out_pink_cars)   
    
    def pred_ford_green(d):
        return (d[idx_make].lower() == 'ford' and d[idx_color].lower() == 'green')
    
    filter_ford_green = filter_data(pred_ford_green, out_ford_green)
    filter_old = filter_data(lambda d: d[idx_year] <= 2010, out_older)  
    filters = (filter_pink_cars, filter_ford_green, filter_old) 
    
    broadcaster= broadcast(filters)
    
    while True:
        row = yield
        broadcaster.send(row)
    
        
    print('Finished processing')
    

pipe = pipeline_coro()
data = data_parser()
for row in data:
    pipe.send(row)
pipe.close()

## Third iteration

In [None]:
import csv 
from contextlib import contextmanager

def data_reader(f_name):
    f = open(f_name)
    try:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        reader = csv.reader(f, dialect=dialect)
        yield from reader
    finally:
        f.close()

idx_make = 0
idx_model = 1
idx_year = 2
idx_vin = 3
idx_color = 4

headers = ('make', 'model', 'year', 'vin', 'color') 
converters = (str, str, int, str, str)

def data_parser():
    f_name_path = '../../../01-datasets/python-deep-dive/cars.csv'       
    data = data_reader(f_name_path)
    next(data)
    next(data)
    for row in data:
        parsed_row = [converter(item) for converter, item in zip(converters, row)]
        yield parsed_row

def coroutine(coro):
    def inner(*args, **kwargs):
        gen = coro(*args, **kwargs)
        next(gen)
        return gen
    return inner

@coroutine
def save_data(f_name, headers):
    """Write data to file"""
    with open(f_name, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(headers)
        while True:
            data_row = yield
            writer.writerow(data_row)
            
@coroutine
def filter_data(filter_predicate, target):
    while True:
        data_row = yield
        if filter_predicate(data_row):
            target.send(data_row)
            
@coroutine
def broadcast(targets):
    while True:
        data_row = yield
        for target in targets:
            target.send(data_row)

@coroutine
def pipeline_coro():
    out_pink_cars = save_data('../../../01-datasets/python-deep-dive/pink_cars.csv', headers)
    out_ford_green = save_data('../../../01-datasets/python-deep-dive/ford_green.csv', headers)   
    out_older = save_data('../../../01-datasets/python-deep-dive/older.csv', headers)
    
    filter_pink_cars = filter_data(lambda d: d[idx_color].lower() == 'pink', out_pink_cars)   
    
    def pred_ford_green(d):
        return (d[idx_make].lower() == 'ford' and d[idx_color].lower() == 'green')
    
    filter_ford_green = filter_data(pred_ford_green, out_ford_green)
    filter_old = filter_data(lambda d: d[idx_year] <= 2010, out_older)  
    filters = (filter_pink_cars, filter_ford_green, filter_old) 
    
    broadcaster= broadcast(filters)
    
    while True:
        row = yield
        broadcaster.send(row)
    
        
    print('Finished processing')
    

@contextmanager
def pipeline():
    p = pipeline_coro()
    try:
        yield p
    finally:
        p.close()
        
        
with pipeline as pipe:
    data = data_parser()
    for row in data:
        pipe.send(row)
