# Coroutines

In [1]:
from collections import deque
def produce_elements(dq, n):
    """
    Add elements to the queue.
    """
    for i in range(n):
        dq.appendleft(i)
        if len(dq) == dq.maxlen:
            print('Queue full - yielding control')
            yield

def consume_elements(dq):
    """
    Consume elements from queue.
    """
    while True:
        while len(dq) > 0:
            item = dq.pop()
            print('Processing ', item)
        print('Queue empty - yieling control')
        yield

def coordinator():
    """
    Coordinator to coordinate the producer and the consumer.
    """
    dq = deque(maxlen = 10)
    producer = produce_elements(dq, 36)
    consumer = consume_elements(dq)
    while True:
        try:
            print('producing...')
            next(producer)
        except StopIteration:
            break
        finally:
            print('Consuming...')
            next(consumer)
            

In [2]:
coordinator()

producing...
Queue full - yielding control
Consuming...
Processing  0
Processing  1
Processing  2
Processing  3
Processing  4
Processing  5
Processing  6
Processing  7
Processing  8
Processing  9
Queue empty - yieling control
producing...
Queue full - yielding control
Consuming...
Processing  10
Processing  11
Processing  12
Processing  13
Processing  14
Processing  15
Processing  16
Processing  17
Processing  18
Processing  19
Queue empty - yieling control
producing...
Queue full - yielding control
Consuming...
Processing  20
Processing  21
Processing  22
Processing  23
Processing  24
Processing  25
Processing  26
Processing  27
Processing  28
Processing  29
Queue empty - yieling control
producing...
Consuming...
Processing  30
Processing  31
Processing  32
Processing  33
Processing  34
Processing  35
Queue empty - yieling control


# Sending Data to  Generators

In [3]:
def echo():
    while True:
        received = yield
        print('You said:', received)

In [4]:
# Create a generator
g = echo()

# yield when we are calling next.
next(g)

In [5]:
# The code continues to run and print 'You said: None'
next(g)

You said: None


In [6]:
# We can send data to the generator, and the variable 
# received stores the string 'You are right!'.
# Now the generator continued running until it hit a yield again.
g.send('You are right!')
g.send('Next one!')

You said: You are right!
You said: Next one!


In [7]:
# An average example using coroutine.
def averager():
    total = 0
    count = 0
    running_avg = None
    while True:
        value = yield running_avg
        total += value
        count += 1
        running_avg = total / count

def running_average(iterable):
    avg = averager()
    next(avg)
    for e in iterable:
        avg_value = avg.send(e)
        print(avg_value)

In [8]:
running_average((1, 3, 5, 7, 9))

1.0
2.0
3.0
4.0
5.0


# Sending Exceptions to  Generators

Suppose we have a coroutine that handles writing data to a database.

In [9]:
class CommitException(Exception):
    pass

class RollbackException(Exception):
    pass

def write_to_db():
    print('opening database connection...')
    print('start transaction...')
    try:
        while True:
            try:
                data = yield
                print('writing data to database...', data)
            except CommitException:
                print('committing transaction...')
                print('opening next transaction...')
            except RollbackException:
                print('aborting transaction...')
                print('opening next transaction...')
    finally:
        print('generator closing...')
        print('aborting transaction...')
        print('closing database connection...')

In [10]:
sql = write_to_db()
next(sql)
sql.send(100)
sql.throw(CommitException)
sql.throw(RollbackException)
sql.send(200)
sql.throw(CommitException)
sql.close()

opening database connection...
start transaction...
writing data to database... 100
committing transaction...
opening next transaction...
aborting transaction...
opening next transaction...
writing data to database... 200
committing transaction...
opening next transaction...
generator closing...
aborting transaction...
closing database connection...


# Using Decorators to Prime Coroutines

We saw how we always to 'prime' a coroutine (i.e. get the generator in a suspended state) before we can start sending values to it.

This is something that **must** always be done - and this is an excellent use case for decorators.

We're going to create a decorator that will create and prime the coroutine for us.

Essentially we want to be able to:
1. create the coroutine (`gen()`)
2. prime the coroutine (`next(g)`)

in one step - so that's what the decorator is going to do - it will wrap our original coroutine and return a new function that will perform those steps for us, and return the newly created and primed coroutine:

In [11]:
def coroutine(fn):
    def inner(*args, **kwargs):
        g = fn(*args, **kwargs)
        next(g)
        return g
    return inner

In [12]:
@coroutine
def echo():
    while True:
        received = yield
        print(received)

In [13]:
g = echo()
g.send('Hello')

Hello


In [14]:
g.send('World')

World


In [15]:
# Example: coroutine power up
import math

@coroutine
def power_up(p):
    result = None
    while True:
        received = yield result
        try:
            result = math.pow(received, p)
        except TypeError:
            result = None

In [16]:
p = power_up(2)

In [17]:
p.send(5)

25.0

In [18]:
p.send(10)

100.0

# Pipelines

In [19]:
import csv

In [20]:
file = '/mnt/data-ubuntu/Projects/Learning_PY_hardway/data/deep_dive/cars.csv' 

idx_car = 0
idx_mpg = 1
idx_cylinders = 2
idx_displacement = 3
idx_horsepower = 4
idx_weight = 5
idx_acceleration = 6
idx_model = 7
idx_origin = 8

headers = ('car',
           'mpg',
           'cylinders',
           'displacement',
           'horsepower',
           'weight',
           'acceleration',
           'model',
           'origin')
converters = (str, float, int, float, float, float, float, int, str)

In [35]:
def data_reader(f_name):
    """
    Read data from f_name.
    """
    with open(f_name) as f:
        dialect = csv.Sniffer().sniff(f.read(2000))
        f.seek(0)
        yield from csv.reader(f, dialect = dialect)

def data_parser():
    """
    Change data type accordingly.
    """
    data = data_reader(file)
    next(data)
    for row in data:
        row = [converter(e) for converter, e in zip(converters, row)]
        yield row
        
def coroutine(fn):
    """
    Coroutine decorator.
    """
    def inner(*args, **kwargs):
        gen = fn(*args, **kwargs)
        next(gen)
        return gen
    return inner

@coroutine
def filter_data(fn_filter, next_coroutine):
    """
    Filter data based on fn_filter and send data to next_coroutine.
    """
    while True:
        data = yield
        if fn_filter(data):
            next_coroutine.send(data)

@coroutine
def print_data(headers):
    """
    Print the results.
    """
    print(headers)
    while True:
        data = yield
        print(data)


@coroutine
def pipline():
    """
    Pipline to process the data.
    """
    printer = print_data(headers)
    
    f1 = filter_data(lambda x: x[idx_origin] == 'Japan', printer)
    
    while True:
        data = yield
        f1.send(data)

In [36]:
data = data_parser()
pip = pipline()
for row in data:
    pip.send(row)

('car', 'mpg', 'cylinders', 'displacement', 'horsepower', 'weight', 'acceleration', 'model', 'origin')
['Toyota Corolla Mark ii', 24.0, 4, 113.0, 95.0, 2372.0, 15.0, 70, 'Japan']
['Datsun PL510', 27.0, 4, 97.0, 88.0, 2130.0, 14.5, 70, 'Japan']
['Datsun PL510', 27.0, 4, 97.0, 88.0, 2130.0, 14.5, 71, 'Japan']
['Toyota Corolla', 25.0, 4, 113.0, 95.0, 2228.0, 14.0, 71, 'Japan']
['Toyota Corolla 1200', 31.0, 4, 71.0, 65.0, 1773.0, 19.0, 71, 'Japan']
['Datsun 1200', 35.0, 4, 72.0, 69.0, 1613.0, 18.0, 71, 'Japan']
['Toyota Corolla Hardtop', 24.0, 4, 113.0, 95.0, 2278.0, 15.5, 72, 'Japan']
['Mazda RX2 Coupe', 19.0, 3, 70.0, 97.0, 2330.0, 13.5, 72, 'Japan']
['Datsun 510 (sw)', 28.0, 4, 97.0, 92.0, 2288.0, 17.0, 72, 'Japan']
['Toyota Corolla Mark II (sw)', 23.0, 4, 120.0, 97.0, 2506.0, 14.5, 72, 'Japan']
['Toyota Corolla 1600 (sw)', 27.0, 4, 97.0, 88.0, 2100.0, 16.5, 72, 'Japan']
['Toyota Camry', 20.0, 4, 97.0, 88.0, 2279.0, 19.0, 73, 'Japan']
['Datsun 610', 22.0, 4, 108.0, 94.0, 2379.0, 16.5, 7