In [1]:
from inspect import getgeneratorstate
import csv
import itertools
import math

### producer-consumer patterns

In [2]:
from collections import deque

In [3]:
# simple interaction

def produce_elements(dq):
    for i in range(1, 4):
        dq.appendleft(i)

        
def consume_elements(dq):
    while len(dq) > 0:
        item = dq.pop()
        print('processing item', item)
        
        
def coordinator():
    dq = deque()
    produce_elements(dq)
    consume_elements(dq)
    
    
coordinator()

processing item 1
processing item 2
processing item 3


In [4]:
# more complicate interaction

def produce_elements(dq, n):
    for i in range(1, n+1):
        dq.appendleft(i)
        if len(dq) == dq.maxlen:
            print('queue full - yielding control')
            yield
            

def consume_elements(dq):
    while True:
        while len(dq) > 0:
            item = dq.pop()
            print('processing item', item)
        print('queue empty = yielding control')
        yield

        
def coordinator():
    dq = deque(maxlen=5)
    producer = produce_elements(dq, 14)
    consumer = consume_elements(dq)
    while True:
        try:
            print('\n', 'producing...')
            next(producer)
        except StopIteration:
            # producer finished
            break
        finally:
            print('\n', 'consuming...')
            next(consumer)

            
coordinator()


 producing...
queue full - yielding control

 consuming...
processing item 1
processing item 2
processing item 3
processing item 4
processing item 5
queue empty = yielding control

 producing...
queue full - yielding control

 consuming...
processing item 6
processing item 7
processing item 8
processing item 9
processing item 10
queue empty = yielding control

 producing...

 consuming...
processing item 11
processing item 12
processing item 13
processing item 14
queue empty = yielding control


<br>

### Generator states

In [5]:
def gen(s):
    for c in s:
        yield c

In [6]:
g = gen('abc')  # Create an istance of the generator function.
                # The generator (object) is in the 'created' state

In [7]:
getgeneratorstate(g)

'GEN_CREATED'

In [8]:
next(g)         # The generator is in the 'running state'
                # and then in the 'suspended' state

'a'

In [9]:
getgeneratorstate(g)

'GEN_SUSPENDED'

In [10]:
list(g)         # Exhaust the generator.
                # After that the generator is in the 'closed' state

['b', 'c']

In [11]:
# next(g)       #> StopIteration exception

In [12]:
getgeneratorstate(g)

'GEN_CLOSED'

<br>

expose 'running' state

In [13]:
def gen(s):
    for c in s:
        print(getgeneratorstate(global_gen))
        yield c

In [14]:
global_gen = gen('abc')

In [15]:
next(global_gen)

GEN_RUNNING


'a'

In [16]:
getgeneratorstate(global_gen)

'GEN_SUSPENDED'

<br>

### Sending to generators

In [17]:
def echo():
    while True:
        received = yield
        print('You said:', received)

In [18]:
e = echo()

getgeneratorstate(e)

'GEN_CREATED'

In [19]:
next(e)   # or 'e.send(None)', but not other values

getgeneratorstate(e)

'GEN_SUSPENDED'

In [20]:
e.send('eternity')

You said: eternity


<br>

More complicated example: receiving and sending data in one string

In [21]:
def squares(n):
    for i in range(n):
        received = yield i**2  # confusing: two actions in one string
        print(received)

In [22]:
sq = squares(5)

next(sq)

0

In [23]:
next(sq)

None


1

In [24]:
sq.send('qwerty')

qwerty


4

<br>

### Closing generators

In [25]:
input_file = 'working_files/cars-2.csv'

In [26]:
def parse_file(f_name):
    print('opening file...')
    f = open(f_name, 'r')
    try:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        reader = csv.reader(f, dialect=dialect)
        for row in reader:
            yield row
    finally:
        print('closing file...')
        f.close()

In [27]:
parser = parse_file(input_file)
for row in itertools.islice(parser, 5):
    print(row)

opening file...
['Car', 'MPG', 'Cylinders', 'Displacement', 'Horsepower', 'Weight', 'Acceleration', 'Model', 'Origin']
['Chevrolet Chevelle Malibu', '18.0', '8', '307.0', '130.0', '3504.', '12.0', '70', 'US']
['Buick Skylark 320', '15.0', '8', '350.0', '165.0', '3693.', '11.5', '70', 'US']
['Plymouth Satellite', '18.0', '8', '318.0', '150.0', '3436.', '11.0', '70', 'US']
['AMC Rebel SST', '16.0', '8', '304.0', '150.0', '3433.', '12.0', '70', 'US']


In [28]:
parser.close()

closing file...


<br>

### Using decorators to prime coroutines

In [29]:
def coroutine(gen_fn):
    def inner():
        gen = gen_fn()
        next(gen)
        return gen
    return inner

In [30]:
@coroutine
def echo():
    while True:
        received = yield
        print(received)

In [31]:
e = echo()
getgeneratorstate(e)

'GEN_SUSPENDED'

In [32]:
e.send('hello')

hello


<br>

A little more complicated example

In [33]:
def coroutine(gen_fn):
    def inner(*args, **kwargs):
        gen = gen_fn(*args, **kwargs)
        next(gen)
        return gen
    return inner

In [34]:
@coroutine
def power_up(p):
    result = None
    while True:
        received = yield result
        try:
            result = math.pow(received, p)
        except TypeError:
            result = None

In [35]:
squares = power_up(2)

In [36]:
squares.send(5)

25.0

In [37]:
squares.send('abc')  # exception from not correct data was intercepted
                     # and suppressed

In [38]:
getgeneratorstate(squares)

'GEN_SUSPENDED'

In [39]:
squares.send(6)

36.0

In [40]:
squares.close()

In [41]:
getgeneratorstate(squares)

'GEN_CLOSED'

<br>

### Pipelines - Pulling data

`caller <-- filter <-- data`

In [42]:
input_file = 'working_files/cars-2.csv'

In [43]:
# generator that will produce data from the file

def parse_data(f_name):
    with open(f_name) as fh:
        dialect = csv.Sniffer().sniff(fh.read(2000))
        fh.seek(0)
        next(fh)  # skip header row
        yield from csv.reader(fh, dialect=dialect)

In [44]:
# filter out rows based on the car make

def filter_data(rows, contains):
    for row in rows:
        if contains in row[0]:
            yield row

In [45]:
def output(f_name):
    data = parse_data(f_name)
    filter_1 = filter_data(data, 'Chevrolet')
    filter_2 = filter_data(filter_1, 'Carlo')
    yield from filter_2

In [46]:
results = output(input_file)

for row in results:
    print(row)

['Chevrolet Monte Carlo', '15.0', '8', '400.0', '150.0', '3761.', '9.5', '70', 'US']
['Chevrolet Monte Carlo S', '15.0', '8', '350.0', '145.0', '4082.', '13.0', '73', 'US']
['Chevrolet Monte Carlo Landau', '15.5', '8', '350.0', '170.0', '4165.', '11.4', '77', 'US']
['Chevrolet Monte Carlo Landau', '19.2', '8', '305.0', '145.0', '3425.', '13.2', '78', 'US']


<br>

More generic `output`:

In [47]:
def output(f_name, *filter_words):
    data = parse_data(f_name)
    for filter_word in filter_words:
        data = filter_data(data, filter_word)
    yield from data

In [48]:
results = output(input_file, 'Chevrolet', 'Carlo', 'Landau')
for row in results:
    print(row)

['Chevrolet Monte Carlo Landau', '15.5', '8', '350.0', '170.0', '4165.', '11.4', '77', 'US']
['Chevrolet Monte Carlo Landau', '19.2', '8', '305.0', '145.0', '3425.', '13.2', '78', 'US']


<br>

### Pipelines - Pushing data

In [49]:
# simple decorator to auto-prime coroutines

def coroutine(coro):
    def inner(*args, **kwargs):
        gen = coro(*args, **kwargs)
        next(gen)
        return gen
    return inner

In [50]:
# data consumer generator that prints what it receives

@coroutine
def handle_data():
    while True:
        received = yield
        print(received)

In [51]:
# coroutine that receives some data, transforms it, and sends it along to the next generator

@coroutine
def power_up(n, next_gen):
    while True:
        received = yield
        output = math.pow(received, n)
        next_gen.send(output)

In [52]:
# generate some data, send it to 'power_up', and specify the next stage as being 'handle_data'

print_data = handle_data()
gen = power_up(2, print_data)
for i in range(1, 6):
    gen.send(i)

1.0
4.0
9.0
16.0
25.0


<br>

complicate

In [53]:
print_data = handle_data()
gen2 = power_up(3, print_data)
gen1 = power_up(2, gen2)
for i in range(1, 6):
    gen1.send(i)

1.0
64.0
729.0
4096.0
15625.0


<br>

more complicate

In [54]:
# filter that retains only even values

@coroutine
def filter_even(next_gen):
    while True:
        received = yield
        if received % 2 == 0:
            next_gen.send(received)

In [55]:
print_data = handle_data()
filtered = filter_even(print_data)
gen2 = power_up(3, filtered)
gen1 = power_up(2, gen2)
for i in range(1, 6):
    gen1.send(i)

64.0
4096.0


<br>

### Pipelines - Broadcasting data

We are going to want to split the data into different files based on some criteria of our choosing.

In [56]:
# configuration
input_file = 'working_files/cars-3.csv'

idx_make = 0
idx_model = 1
idx_year = 2
idx_vin = 3
idx_color = 4

headers = ('make', 'model', 'year', 'vin', 'color')

converters = (str, str, int, str, str)

In [57]:
# generator that returns the data

def data_reader(f_name):
    fh = open(f_name)
    try:
        dialect = csv.Sniffer().sniff(fh.read(2000))
        fh.seek(0)
        reader = csv.reader(fh, dialect=dialect)
        yield from reader
    finally:
        fh.close

In [58]:
# test data_reader

for row in itertools.islice(data_reader(input_file), 3):
    print(row)

['car_make', 'car_model', 'model_year', 'vin', 'color']
['Mitsubishi', 'Outlander', '2011', 'WBAEV33453K542952', 'Indigo']
['Pontiac', 'Sunfire', '2001', 'SCFAD06D99G713780', 'Maroon']


<br>

In [59]:
# generator that returns the parsed data

def data_parser():
    data = data_reader(input_file)
    next(data)
    for row in data:
        parsed_row = [converter(item)
                      for converter, item in zip(converters, row)]
        yield parsed_row

In [60]:
# test data_parser

data = data_parser()
for _ in range(3):
    print(next(data))

['Mitsubishi', 'Outlander', 2011, 'WBAEV33453K542952', 'Indigo']
['Pontiac', 'Sunfire', 2001, 'SCFAD06D99G713780', 'Maroon']
['Pontiac', 'Grand Am', 1994, 'WBA6B8C59ED852919', 'Red']


<br>

In [61]:
# simple decorator to auto-prime coroutines

def coroutine(fn):
    def inner(*args, **kwargs):
        g = fn(*args, **kwargs)
        next(g)
        return g
    return inner

In [62]:
# coroutine that creates and writes data to a file

@coroutine
def save_data(f_name, headers):
    with open(f_name, 'w', newline='') as fh:
        writer = csv.writer(fh)
        writer.writerow(headers)
        while True:
            data_row = yield
            writer.writerow(data_row)

In [63]:
# Coroutine receives a data row and tests if the predicate applied to that data row is True.
# If it is, it will send the row to the next stage (target) of the pipeline,
# otherwise it just ignores the data row.

@coroutine
def filter_data(filter_predicate, target):
    while True:
        data_row = yield
        if filter_predicate(data_row):
            target.send(data_row)

In [64]:
# broadcaster that sends received data to all the generators specified in the targets argument

@coroutine
def broadcast(targets):
    while True:
        data_row = yield
        for target in targets:
            target.send(data_row)

Now put all this together.

```
     data                      
      |                      |——> filter ——> save
      v                      |
process_data ——> broadcast ——|——> filter ——> save
                             |
                             |——> filter ——> save
```

In [65]:
# specify files where the results will be written

file_for_pink_cars = 'working_files/temp_pink_cars.csv'
file_for_ford_green = 'working_files/temp_ford_green.csv'
file_for_older = 'working_files/temp_older.csv'

In [66]:
def process_data():
    out_pink_cars = save_data(file_for_pink_cars, headers)
    out_ford_green = save_data(file_for_ford_green, headers)
    out_older = save_data(file_for_older, headers)
    
    
    filter_pink_cars = filter_data(lambda d: d[idx_color].lower() == 'pink',
                                   out_pink_cars)
    
    def pred_ford_green(d):
        return (d[idx_make].lower() == 'ford'
                and d[idx_color].lower() == 'green')
    
    filter_ford_green = filter_data(pred_ford_green, out_ford_green)
    
    filter_old = filter_data(lambda d: d[idx_year] <= 2010, out_older)
    
    filters = (filter_pink_cars, filter_ford_green, filter_old)
    
    
    broadcaster = broadcast(filters)
    
    for row in data_parser():
        broadcaster.send(row)
        
    print('Finished processing')

In [67]:
process_data()

Finished processing


In [68]:
# see what files contain

def print_file_data():
    for file_name in (file_for_pink_cars, file_for_ford_green, file_for_older):
        print(f'  ——— {file_name} ———')
        for row in data_reader(file_name):
            print(row)
        print('\n\n')

print_file_data()

  ——— working_files/temp_pink_cars.csv ———
['make', 'model', 'year', 'vin', 'color']
['Land Rover', 'LR3', '2008', 'WBANE53507B229964', 'Pink']
['Aston Martin', 'DBS', '2008', 'WAUFMBFC7EN209268', 'Pink']
['Ford', 'F150', '1993', 'JN8AF5MR5BT143315', 'Pink']
['Nissan', 'Murano', '2012', '1D7RB1CP2AS005941', 'Pink']
['Chevrolet', 'HHR Panel', '2006', 'WA1VMAFE4BD281230', 'Pink']
['Chrysler', 'New Yorker', '1996', '19UUA8F31DA965112', 'Pink']
['Dodge', 'Viper', '2005', 'WA1VGBFPXEA735467', 'Pink']
['Maserati', 'Karif', '1990', '3LN6L2LU5FR389539', 'Pink']
['Chrysler', 'Voyager', '2001', 'SCFHDDAJ7BA859249', 'Pink']
['Ford', 'Expedition', '2002', 'JTJHY7AX8D4370558', 'Pink']
['Suzuki', 'Grand Vitara', '2010', '1GYUCHEF9AR508723', 'Pink']
['GMC', '2500', '1995', 'JH4DC53004S939059', 'Pink']
['GMC', 'Yukon XL 1500', '2009', '3D4PG4FB1BT559916', 'Pink']
['Dodge', 'Avenger', '2009', 'SCBCR7ZA0AC733739', 'Pink']
['GMC', 'Jimmy', '2001', '2C4RRGAGXDR589104', 'Pink']
['Subaru', 'Legacy', '1998',

<br>
<br>

Modification to keep functionality more separated.<br>
Let's rewrite the code to make `process_data` just another step in the pipeline.

In [69]:
@coroutine
def pipeline_coro():
    out_pink_cars = save_data(file_for_pink_cars, headers)
    out_ford_green = save_data(file_for_ford_green, headers)
    out_older = save_data(file_for_older, headers)
    
    
    filter_pink_cars = filter_data(lambda d: d[idx_color].lower() == 'pink',
                                   out_pink_cars)
    
    def pred_ford_green(d):
        return (d[idx_make].lower() == 'ford'
                and d[idx_color].lower() == 'green')
    
    filter_ford_green = filter_data(pred_ford_green, out_ford_green)
    
    filter_old = filter_data(lambda d: d[idx_year] <= 2010, out_older)
    
    filters = (filter_pink_cars, filter_ford_green, filter_old)
    
    broadcaster = broadcast(filters)
    
    while True:
        row = yield
        broadcaster.send(row)

In [70]:
pipe = pipeline_coro()
data = data_parser()
for row in data:
    pipe.send(row)
    
pipe.close()   # ! we need to close the pipeline

In [71]:
print_file_data()

  ——— working_files/temp_pink_cars.csv ———
['make', 'model', 'year', 'vin', 'color']
['Land Rover', 'LR3', '2008', 'WBANE53507B229964', 'Pink']
['Aston Martin', 'DBS', '2008', 'WAUFMBFC7EN209268', 'Pink']
['Ford', 'F150', '1993', 'JN8AF5MR5BT143315', 'Pink']
['Nissan', 'Murano', '2012', '1D7RB1CP2AS005941', 'Pink']
['Chevrolet', 'HHR Panel', '2006', 'WA1VMAFE4BD281230', 'Pink']
['Chrysler', 'New Yorker', '1996', '19UUA8F31DA965112', 'Pink']
['Dodge', 'Viper', '2005', 'WA1VGBFPXEA735467', 'Pink']
['Maserati', 'Karif', '1990', '3LN6L2LU5FR389539', 'Pink']
['Chrysler', 'Voyager', '2001', 'SCFHDDAJ7BA859249', 'Pink']
['Ford', 'Expedition', '2002', 'JTJHY7AX8D4370558', 'Pink']
['Suzuki', 'Grand Vitara', '2010', '1GYUCHEF9AR508723', 'Pink']
['GMC', '2500', '1995', 'JH4DC53004S939059', 'Pink']
['GMC', 'Yukon XL 1500', '2009', '3D4PG4FB1BT559916', 'Pink']
['Dodge', 'Avenger', '2009', 'SCBCR7ZA0AC733739', 'Pink']
['GMC', 'Jimmy', '2001', '2C4RRGAGXDR589104', 'Pink']
['Subaru', 'Legacy', '1998',

<br>
<br>

using contextmanager, for the purpose to do less code

In [72]:
from contextlib import contextmanager

In [73]:
@contextmanager
def pipeline():
    p = pipeline_coro()
    try:
        yield p
    finally:
        p.close()

In [74]:
with pipeline() as pipe:
    data = data_parser()
    for row in data:
        pipe.send(row)

In [75]:
print_file_data()

  ——— working_files/temp_pink_cars.csv ———
['make', 'model', 'year', 'vin', 'color']
['Land Rover', 'LR3', '2008', 'WBANE53507B229964', 'Pink']
['Aston Martin', 'DBS', '2008', 'WAUFMBFC7EN209268', 'Pink']
['Ford', 'F150', '1993', 'JN8AF5MR5BT143315', 'Pink']
['Nissan', 'Murano', '2012', '1D7RB1CP2AS005941', 'Pink']
['Chevrolet', 'HHR Panel', '2006', 'WA1VMAFE4BD281230', 'Pink']
['Chrysler', 'New Yorker', '1996', '19UUA8F31DA965112', 'Pink']
['Dodge', 'Viper', '2005', 'WA1VGBFPXEA735467', 'Pink']
['Maserati', 'Karif', '1990', '3LN6L2LU5FR389539', 'Pink']
['Chrysler', 'Voyager', '2001', 'SCFHDDAJ7BA859249', 'Pink']
['Ford', 'Expedition', '2002', 'JTJHY7AX8D4370558', 'Pink']
['Suzuki', 'Grand Vitara', '2010', '1GYUCHEF9AR508723', 'Pink']
['GMC', '2500', '1995', 'JH4DC53004S939059', 'Pink']
['GMC', 'Yukon XL 1500', '2009', '3D4PG4FB1BT559916', 'Pink']
['Dodge', 'Avenger', '2009', 'SCBCR7ZA0AC733739', 'Pink']
['GMC', 'Jimmy', '2001', '2C4RRGAGXDR589104', 'Pink']
['Subaru', 'Legacy', '1998',

In [76]:
# # remove temporary files from the disk (bash command)
! rm -f $file_for_pink_cars $file_for_older $file_for_ford_green