In [22]:
def simple_coroutine():
    print("coroutine started")
    x = yield 
    print("coroutine received value x: ", x)

# USING THE COROUTINE    
gen = simple_coroutine()
print(type(gen))

# start the coroutine by calling next once
a = next(gen)
print(a)

# send a value to coroutine
try: 
    gen.send(42)
except StopIteration:
    print("\n>>> StopIteration error raised <<<\n")
    print("value is received, but there is no subsequent yield hence falls off")

You can explore the state of the coroutine. Possible states:
- GEN_CREATED: Waiting to start execution 
- GEN_RUNNING: Currently being executed by interpreter. You will only see this in multithreaded apps
- GEN_SUSPENDED: Currently suspended at a yield statement. Waiting for an input or something to be sent. 
- GEN_CLOSED: Execution has completed. 


In [29]:
# You can explore the state of the coroutine
from inspect import getgeneratorstate

def slightly_complex_coroutine(a):
    print("--> Coroutine started")
    b = yield a
    print("--> Received b:", b)
    c = yield a + b
    print("--> Received c:", c)
    
# using the coroutine
gen = slightly_complex_coroutine(2)
print(getgeneratorstate(gen))

value_recd_from_coroutine = next(gen)
print(value_recd_from_coroutine)
print(getgeneratorstate(gen))

value_recd_from_coroutine = gen.send(4)
print(value_recd_from_coroutine)
print(getgeneratorstate(gen))


try:
    value_recd_from_coroutine = gen.send(8)
    print(value_recd_from_coroutine)
except StopIteration:
    print(getgeneratorstate(gen))

GEN_CREATED
--> Coroutine started
2
GEN_SUSPENDED
--> Received b: 4
6
GEN_SUSPENDED
--> Received c: 8
GEN_CLOSED


In [31]:
def averager():
    total = 0.0
    average = None
    count = 0.0
    while True:
        term = yield average
        total+= term
        count+=1 
        average = total/count

In [35]:
gen = averager()
avg = next(gen)
print("average:", avg)

avg = gen.send(30)
print("average:", avg)

avg = gen.send(40)
print("average:", avg)

average: None
average: 30.0
average: 35.0


In [38]:
# instead of having to do a next everytime, use a decorator to prime the coroutine
from functools import wraps

def coroutine_primer(func):
    @wraps(func)
    def primer(*args, **kwargs):
        primed_coroutine = func(*args, **kwargs)
        next(primed_coroutine)
        return primed_coroutine
    return primer

In [40]:
@coroutine_primer
def averager():
    total = 0.0
    average = None
    count = 0.0
    while True:
        term = yield average
        total+= term
        count+=1 
        average = total/count

coro_avg = averager()
print(coro_avg.send(10))
print(coro_avg.send(20))
print(coro_avg.send(30))
print(coro_avg.send(90))


10.0
15.0
20.0
37.5


Methods available on Generators:
- send: send a value into the generator 
- throw: throw(exc[, exc_value[, traceback]]) -> throw an exception 
- close: close out the generator

In [56]:
# dealing with exceptions in generators

class DemoException(Exception):
    ...

@coroutine_primer
def averager():
    print("---> Coroutine starting")
    total = 0.0
    average = None
    count = 0.0
    try:
        while True:
            print(f"closure values {total=} {average=} {count=}")
            try: 
                term = yield average
                total+= term
            except DemoException: 
                print("\n>>> Demo exception handled, generator still running <<<\n")
            except TypeError: 
                print("\n>>> TypeError handled, generator still running <<<\n")
            else:
                count+=1 
                average = total/count
    finally:
        print("---> coroutine ending")

avg_coroutine = averager()
avg_coroutine.send(10)
avg_coroutine.send(34)
avg_coroutine.throw(DemoException)
avg_coroutine.send('hello')
avg_coroutine.send(94)
avg_coroutine.close()

---> Coroutine starting
closure values total=0.0 average=None count=0.0
closure values total=10.0 average=10.0 count=1.0
closure values total=44.0 average=22.0 count=2.0

>>> Demo exception handled, generator still running <<<

closure values total=44.0 average=22.0 count=2.0

>>> TypeError handled, generator still running <<<

closure values total=44.0 average=22.0 count=2.0
closure values total=138.0 average=46.0 count=3.0
---> coroutine ending


In [50]:
def chain(*iterables):
    for it in iterables:
        yield from it

print(list(chain(range(5), 'anudeep', ['a', 1, 3.1415926])))

[0, 1, 2, 3, 4, 'a', 'n', 'u', 'd', 'e', 'e', 'p', 'a', 1, 3.1415926]


In [3]:
data = {
'girls;kg':
    [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
    [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
    [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
    [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}

## Coroutine yield from

Value is bound in the value of the StopIteration. 
`yield from` abstracts catching the Stopiteration and reading the value of the Exception



In [33]:
from collections import namedtuple
from pprint import pprint

Result = namedtuple('Result', 'count average')

def averager():
    term = 0.0
    total = 0.0
    average = None
    count = 0
    while True:
        try:
            term = yield total
            # important! otherwise return is never reached and the subgenerator will block forever
            if term is None:
                break
            total += term
        except TypeError as e:
            print("wrong type to average", e)
        else: 
            count+=1
            average = total/count
    return Result(count, average)

# the delegating generator
def grouper(results, key):
    while True:
        # here results['girls;kg'] will not exist, until the terminating condition of the coroutine is hit
        # at that moment, the value is added to results['girls;kg']
        results[key] = yield from averager()

# the client code aka the caller
def main(data):
    results = {}
    
    for key,values in data.items():
        print('\nrunning subgenerator for ', key)
        group = grouper(results, key)
        next(group)
        for v in values:
            print(f"running total {group.send(v):.2f}")
        group.send(None)
    
    pprint(results, indent=2)
    print('\n')
    reporter(results)

def reporter(results):
    for k,result in results.items():
        group, unit = k.split(';')
        print("{:2} {:5} averaging {:.2f} {}".format(result.count, group, result.average, unit))

main(data)
        


running subgenerator for  girls;kg
running total 40.90
running total 79.40
running total 123.70
running total 165.90
running total 211.10
running total 252.80
running total 297.30
running total 335.30
running total 375.90
running total 420.40

running subgenerator for  girls;m
running total 1.60
running total 3.11
running total 4.51
running total 5.81
running total 7.22
running total 8.61
running total 9.94
running total 11.40
running total 12.85
running total 14.28

running subgenerator for  boys;kg
running total 39.00
running total 79.80
running total 123.00
running total 163.80
running total 206.90
running total 245.50
running total 286.90
running total 327.50
running total 363.80

running subgenerator for  boys;m
running total 1.38
running total 2.88
running total 4.20
running total 5.45
running total 6.82
running total 8.30
running total 9.55
running total 11.04
running total 12.50
{ 'boys;kg': Result(count=9, average=40.422222222222224),
  'boys;m': Result(count=9, average=1.388

The yield from syntax

```python
RESULT = yield from EXPR
```

Glossary:
- `_i` iterator (subgenerator)
- `_y` yielded value from subgenerator
- `_r` eventual result when the subgenerator ends
- `_s` sent, value sent by the client to delegating generator, which is passed to the subgenerator
- `_e` exception, an exception (raised internally or thrown to it)
- `_m` iterator
- `_x` exception info, `sys.exc_info()` returns a tuple (exc_type, exc_message, traceback)
- 

is semantically equivalent to the following code:

```python
_i = iter(EXPR)  # get iterator
try:
    _y = next(_i)   # prime iterator
except StopIteration as _e:
    _r = _e.value  # iterator ends as soon as it gets started, result is bound to the StopIteration Exception
else:
    while 1:    # blocks the delegator 
        try:
            _s = yield _y    # ongoing values are yielded here
        except GeneratorExit as _e:
            try:
                _m = _i.close
            except AttributeError:
                pass
            else:
                _m()
            raise _e
        except BaseException as _e:
            _x = sys.exc_info()
            try:
                _m = _i.throw
            except AttributeError:
                raise _e
            else:
                try:
                    _y = _m(*_x)
                except StopIteration as _e:
                    _r = _e.value
                    break
        else:
            try:
                if _s is None:
                    _y = next(_i)
                else:
                    _y = _i.send(_s)
            except StopIteration as _e:
                _r = _e.value
                break
RESULT = _r
```

About `_m` and `_x`:
- _m and _x are temporary variables used in the handling of exceptions and method calls on the iterator _i. Let's break down their roles:

1. _m: This variable is used as a temporary reference to a method of the iterator _i. It's used in two different contexts in the code:
    - When handling GeneratorExit, _m is assigned to _i.close if it exists. This is used to close the iterator if it supports the close() method, adhering to the cleanup protocol of generators.
    - When handling a general exception (captured as BaseException), _m is assigned to _i.throw if it exists. This is to propagate the exception into the iterator/generator _i, using its throw() method.
2. _x: This variable is used to hold the exception information captured by sys.exc_info(). This function returns a tuple of three values: the type of the exception, the exception instance, and a traceback object. This information is then used to propagate the exception into the iterator _i using _i.throw(*_x). The * operator unpacks the tuple, passing the three components of the exception information as separate arguments to the throw() method.

In summary, _m is a method reference variable, dynamically pointing to either the close or throw method of _i depending on the context, and _x is a variable holding the current exception information for re-raising or propagating the exception.

# Taxi Simulator 

Purpose to deeply understand coroutines. This is a discrete event simulation.

Brief:
A taxi can exist in any of `leave_garage, pickup_passenger, drop_passenger, return_to_garage` states. 

### Main components:
- Event: This is a named tuple that contains the attributes to run an event
- taxi_process: Callable --> This is the coroutuine that takes care of handling a taxi's pickup, dropoff and all the other states.

In [55]:
# coroutine simulation
from collections import namedtuple

Event = namedtuple('Event', 'time proc action')

def taxi_process(ident: int, start_time: int, trips: int):
    # at start, the taxi should leave the garage
    # wait until the next time is sent, to switch to the next event
    time = yield Event(start_time, ident, 'leaving_garage')
    for _ in range(trips):
        time = yield Event(time, ident, 'pickup_passenger')
        time = yield Event(time, ident, 'drop_passenger')
    
    yield Event(time, ident, 'return_to_garage')
    # at this point, any new event will fall off with stopiteration error
    

In [56]:
t = taxi_process(10,5,2)

# prime the taxi process
# this causes the taxi to leave the garage
print(next(t))

# pickup passenger, at the time point mentioned
print(t.send(10))
# drop passenger, at the time point mentioned
print(t.send(15))
# pickup passenger, at the time point mentioned
print(t.send(20))
# drop passenger, at the time point mentioned
print(t.send(40))

# since num of trips is two, next send should cause return to garage
print(t.send(50))

# anything more will cause stopiteration error
# print(t.send(50))

Event(time=5, proc=10, action='leaving_garage')
Event(time=10, proc=10, action='pickup_passenger')
Event(time=15, proc=10, action='drop_passenger')
Event(time=20, proc=10, action='pickup_passenger')
Event(time=40, proc=10, action='drop_passenger')
Event(time=50, proc=10, action='return_to_garage')


In [59]:
taxis = {i: taxi_process(i, i*5, (i+1)*2) for i in range(4)}
print(next(taxis[0]))

Event(time=0, proc=0, action='leaving_garage')


In [52]:
from queue import PriorityQueue

# coroutine simulation
from collections import namedtuple

DEFAULT_NUMBER_OF_TAXIS = 3
DEFAULT_END_TIME = 180
SEARCH_DURATION = 5
TRIP_DURATION = 20
DEPARTURE_INTERVAL = 5

Event = collections.namedtuple('Event', 'time proc action')


def taxi_process(ident, trips, start_time=0):  # <1>
    """Yield to simulator issuing event at each state change"""
    time = yield Event(start_time, ident, 'leave garage')  # <2>
    for i in range(trips):  # <3>
        time = yield Event(time, ident, 'pick up passenger')  # <4>
        time = yield Event(time, ident, 'drop off passenger')  # <5>

    yield Event(time, ident, 'going home')  # <6>
    # end of taxi process # <7>
# END TAXI_PROCESS
    

def compute_duration(previous_action):
    """Compute action duration using exponential distribution"""
    if previous_action in ['leave garage', 'drop off passenger']:
        # new state is prowling
        interval = SEARCH_DURATION
    elif previous_action == 'pick up passenger':
        # new state is trip
        interval = TRIP_DURATION
    elif previous_action == 'going home':
        interval = 1
    else:
        raise ValueError('Unknown previous_action: %s' % previous_action)
    return int(random.expovariate(1/interval)) + 1

class Simulation:
    def __init__(self, process_map):
        self.proc = dict(process_map)    # creating a local copy to avoid mutating the original
        self.q = PriorityQueue()
    
    def run(self, end_time: int):
        # prime each one of the taxi coroutines
        # gather the start events, and  put it into the queue
        for _, taxi in sorted(self.proc.items()):
            start_event = next(taxi)
            self.q.put(start_event)
        
        # main event loop and its running
        sim_time = 0
        while sim_time < end_time:
            # if event queue is empty exit
            if self.q.empty():
                print("*** No events left in queue ***")
                break
            
            current_event = self.q.get()
            sim_time, proc, prev_action = current_event
            print(f"taxi: {proc}, {'  '*proc} {current_event}")
            active_taxi = self.proc[proc]
            next_time = sim_time + compute_duration(prev_action)
            try:
                next_event = active_taxi.send(next_time)
            except StopIteration:
                del self.proc[proc]
            else:
                self.q.put(next_event)
        else:
            print(f"*** End of simulation time. {self.q.qsize()} Events pending")
        
taxis = {
i: taxi_process(
    ident = i, 
    trips = (i+1)*2,
   start_time = i*DURATION_GAP, 
) for i in range(NUM_OF_TAXIS)
}

s = Simulation(taxis)
s.run(200)
        
        


taxi: 0,  Event(time=0, proc=0, action='leave garage')
taxi: 1,    Event(time=5, proc=1, action='leave garage')
taxi: 1,    Event(time=6, proc=1, action='pick up passenger')
taxi: 2,      Event(time=10, proc=2, action='leave garage')
taxi: 0,  Event(time=14, proc=0, action='pick up passenger')
taxi: 3,        Event(time=15, proc=3, action='leave garage')
taxi: 2,      Event(time=20, proc=2, action='pick up passenger')
taxi: 3,        Event(time=22, proc=3, action='pick up passenger')
taxi: 0,  Event(time=27, proc=0, action='drop off passenger')
taxi: 2,      Event(time=28, proc=2, action='drop off passenger')
taxi: 2,      Event(time=30, proc=2, action='pick up passenger')
taxi: 0,  Event(time=31, proc=0, action='pick up passenger')
taxi: 0,  Event(time=48, proc=0, action='drop off passenger')
taxi: 0,  Event(time=50, proc=0, action='going home')
taxi: 2,      Event(time=52, proc=2, action='drop off passenger')
taxi: 1,    Event(time=53, proc=1, action='drop off passenger')
taxi: 2,   