# Generators and Coroutines

## Processing Pipelines

In [None]:
def gen_lines(filename):
    """This is a 'source', so it does not take a generator as input."""
    with open(filename) as fp:
        for line in fp:
            yield line
            

In [None]:
def gstrip(gen):
    for line in gen:
        yield line.strip()

In [None]:
def gdecomment(gen):
    for line in gen:
        if line.startswith('#'):
            continue
        if not line:
            continue
        yield line

In [None]:
def gsplit(gen):
    for line in gen:
        yield line.split()

In [None]:
def remove_bom(gen):
    for line in gen:
        if line == '\ufeff':
            continue
        else:
            yield line

In [None]:
x = gen_lines('/etc/hosts')
x = gstrip(x)
x = gdecomment(x)
x = remove_bom(x)   # for wsl2...
x = gsplit(x)
for words in x:
    print(words)

In [None]:
!cat /etc/hosts

In [None]:
x = open('/etc/hosts')
x = (line.strip() for line in x)   # strip
x = (line for line in x if not line.startswith('#'))  # decomment
x = (line for line in x if line)  # remove empty lines
x = (line for line in x if line != '\ufeff')  # remove BOM
x = (line.split() for line in x)  # split into words
for words in x:
    print(words)

## CSV example

In [None]:
!head data/closing-prices.csv

In [None]:
import csv
from datetime import datetime

In [None]:
import itertools

def month(tup):
    """tup is (date, price)
    
    where date is a string that looks like '2020-09-23'
    """
    date, price = tup
    dt = datetime.strptime(date, '%Y-%m-%d')
    return (dt.year, dt.month)

def get_prices_for(it, ticker):
    """it is an iterator of dictionaries "d"
    
    where d[''] is the date on which trades occurred
    and d['TSLA'] contains the closing price for that date
    """
    for d in it:
        date_str = d['']
        closing_price = float(d[ticker])
        yield (date_str, closing_price)

In [None]:
lines = gen_lines('./data/closing-prices.csv')
dicts = csv.DictReader(lines)

In [None]:
tesla_prices = get_prices_for(dicts, 'TSLA')

In [None]:
for month, sub_iter in itertools.groupby(tesla_prices, month):
    prices = [price for (dt, price) in sub_iter]
    o = prices[0]
    h = max(prices)
    l = min(prices)
    c = prices[-1]
    print(month, (o, h, l, c))

## Coroutines

In [None]:
def simple_coro(name):
    print('Entering coro', name)
    while True:
        value = yield
        print('coro {} got {}'.format(name, value))

In [None]:
sc0 = simple_coro('sc0')
sc1 = simple_coro('sc1')

In [None]:
next(sc0)
next(sc1)

In [None]:
sc0.send('foo')

In [None]:
sc1.send('bar')

In [None]:
sc1.send('bat')

# Event streams & discrete event simulation

In [None]:
def every_xs(x):
    for i in range(5):
        name = yield x   # "wait for 'x' ticks"
        print('{}: every_xs({}, {})'.format(name, x, i))


In [None]:
every_1s = every_xs(1)
every_2s = every_xs(2)
every_3s = every_xs(3)

In [None]:
import heapq

def ev_loop(coros):
    now = 0
    events = []
    
    # Initialize coroutines
    for name, coro in coros.items():
        offset = next(coro)
        heapq.heappush(events, (offset + now, name, coro))
        
    while events:
        new_now, name, coro = heapq.heappop(events)
        if new_now != now:
            print('=== {} ==='.format(new_now))
            now = new_now
        try:
            offset = coro.send(name)
            sched = offset + now
            # print '{}: scheduled next event for {}'.format(name, offset+now)
            heapq.heappush(events, (sched, name, coro))
        except StopIteration:
            print(name, 'is done!')
            
    print('No more events!')
        

In [None]:
ev_loop({'1s': every_1s, '2s': every_2s, '3s': every_3s})

In [None]:
def every_xs(sim, x):
    while True:
        print('Yielding from every_xs({})'.format(x))
        yield sim.delay(x)


In [None]:
class Event:
    
    def __init__(self, when, coro, value=None):
        self.when = when
        self.coro = coro
        self.value = value
        
    def __lt__(self, other):
        "Implements self < other"
        return self.when < other.when

class Simulator:

    def __init__(self):
        self.now = 0
        self.events = []
        self.current = None

    def run(self, coro):
        try:
            self.current = coro
            ev = next(coro)
            heapq.heappush(self.events, ev)
        except StopIteration:
            pass
        
    def simulate(self, max_time=10):
        while self.events and self.now <= max_time:
            ev = heapq.heappop(self.events)
            if ev.when != self.now:
                print(f'=== {ev.when} ===')
            self.now, self.current = ev.when, ev.coro
            try:
                ev = self.current.send(ev.value)
                heapq.heappush(self.events, ev)
            except StopIteration:
                pass
        print('Simulation terminated at', self.now)

    def delay(self, seconds):
        return Event(self.now + seconds, self.current, 'delay expired!')
        

In [None]:
sim = Simulator()
sim.run(every_xs(sim, 1))
sim.run(every_xs(sim, 2))
sim.run(every_xs(sim, 3))

In [None]:
sim.simulate()

### Event streams use case: Asynchronous I/O

Asynchronous I/O can use event streams and coroutines to provide 'thread-like' syntax.

1. A top-level event loop is created to handle events
1. Each coroutine runs until it yields an object to 'wait' on -- call it an 'awaitable'. (This could be a socket that we are receiving data from.) The 'yield' goes to the event loop.
1. The event loop generally has a `select` or `poll` statement that checks for available data on all its awaitable objects.
1. When the 'awaitable' has data, that data is "sent" into the coroutine, which picks up *as though it had blocked waiting on data*


```python
# your code does this
data = yield from socket.async_recv(100)

# socket.async_recv does this
value_to_be_result_of_yield_from = yield wait_for_socket_to_have_data

# the event loop eventually does this
coro.send(data_from_socket)

# socket.async_recv eventually does this:
raise StopIteration(data_from_socket)
# alternatively
return data_from_socket
```

Open [Advanced Generators Lab](./advanced-generators-lab.ipynb)