# Event IO

An event loop that puts the CPU into power saving mode when no tasks are ready to run. It uses events (interrupts) to enable tasks.

## [A tale of event loops](https://github.com/bboser/a-tale-of-event-loops)

In [1]:
async def hello(name):
    print('Hello, {}'.format(name))

In [2]:
coro = hello('world')
print(type(coro))
print(dir(coro))

<class 'generator'>
['__next__', 'close', 'pend_throw', 'send', 'throw']


Send `None` to coro to get it going. Raises StopIteration when it finishes (after print statement).

In [3]:
task = hello('world')
try:
    task.send(None)
except StopIteration:
    print("got StopIteration")

Hello, world
got StopIteration


In [4]:
class Hello(Exception):
    def __init__(self, name):
        self.name = name
    def __str__(self):
        return "Hello, {}!".format(self.name)
    
async def hello(name):
    raise Hello(name)
    
task = hello('world')
try:
    task.send(None)
except Exception as e:
    print("Got error, msg = '{}'".format(e))

Got error, msg = 'Hello, world!'


## A dialog with the event loop

The corouting declarator is not implemented.

Let's try first without ...

In [5]:
# Awaitable nice

class Nice():
    
    def __await__(self):
        print("await called")
        yield
        
nice = Nice().__await__

async def hello(name):
    await nice()
    print("Hello {}".format(name))
    
task = hello('California')
task.send(None)
try:
    task.send(None)
except StopIteration:
    print("StopIteration")

await called
Hello California
StopIteration


### Declare @coroutine decorator

In [6]:
# Coroutine decorator

def coroutine(func):
    """Decorator for marking coroutines."""
    return func

type_coro  = type((lambda: (yield))())  # Generator type
type_corof = type((lambda: (yield)))    # Generator function

# there ought to be a better way ...
def iscoroutine(obj):
    return isinstance(obj, type_coro)

Now with the decorator ...

In [7]:
@coroutine
def nice():
    print("nice called")
    yield
    
async def hello(name):
    await nice()
    print("Hello again, {}".format(name))
    
task = hello('California')
task.send(None)
try:
    task.send(None)
except StopIteration:
    print("StopIteration")

nice called
Hello again, California
StopIteration


## Looping

In [8]:
@coroutine
def nice():
    yield
    
async def hello(name):
    for i in range(5):
        print("await", i)
        await nice()
    print("Hello {}, after 6x send (5 await, 1 StopIteration) ...".format(name))
    
def run_until_complete(task):
    print(task)
    try:
        while True:
            task.send(None)
    except StopIteration:
        pass
        
run_until_complete(hello('world'))

<generator object 'hello' at 20008530>
await 0
await 1
await 2
await 3
await 4
Hello world, after 6x send (5 await, 1 StopIteration) ...


## Spawning child tasks

In [9]:
@coroutine
def spawn(task):
    yield task

@coroutine
def nice():
    yield

async def hello(name):
    await nice()
    print("Hello,", name)
    
async def main():
    await spawn(hello('world'))
    
def run_until_complete(task):
    # tasks is a list of tuples (task, data to send)
    # for now there is only the main task (until main spawns hello)
    tasks = [(task, None)]
    while tasks:
        # Round-robin between a set of tasks
        # we may now have more than one and want to be fair
        # Schedule current tasks, start with empty queue for next while loop 
        queue, tasks = tasks, []
        for task, data in queue:
            # resume the task *once*
            try:
                data = task.send(data)
            except StopIteration as s:
                print("StopIteration", s)
                pass
            except Exception as error:
                # prevent errors in tasks from ending the loop
                print("Error", error)
            else:
                # schedule the child task
                if iscoroutine(data):
                    tasks.append((data, None))
                # reschedule parent task
                tasks.append((task, None))
                
run_until_complete(main())

StopIteration 
Hello, world
StopIteration 


In [10]:
def spawn(task):
    child = yield ('spawn', task)
    return child

@coroutine
def nice():
    yield
    
@coroutine
def join(task):
    yield ('join', task)

async def hello(name):
    await nice()
    print("Hello,", name)
    
async def main():
    child = await spawn(hello('world'))
    # wait for child task to complete
    await join(child)
    print('(after join)')
    
def run_until_complete(task):
    # tasks is a list of tuples (task, data to send)
    # for now there is only the main task (until main spawns hello)
    tasks = [(task, None)]
    # keep track of tasks to resume when a task completes
    watch = {}
    while tasks:
        # Round-robin between a set of tasks
        # we may now have more than one and want to be fair
        # Schedule current tasks, start with empty queue for next while loop 
        queue, tasks = tasks, []
        for task, data in queue:
            # resume the task *once*
            try:
                data = task.send(data)
            except StopIteration:
                # wait up tasks wating on this one
                tasks.extend((t, None) for t in watch.pop(task, []))
            except Exception as error:
                # prevent errors in tasks from ending the loop
                print("Error", error)
            else:
                if data and data[0] == 'spawn':
                    tasks.append((data[1], None))
                    tasks.append((task, data[1]))
                elif data and data[0] == 'join':
                    watch.setdefault(data[1], []).append(task)
                else:
                    tasks.append((task, None))
                
run_until_complete(main())

Hello, world
(after join)


For practical reasons, we'll probably want to have some kind **`Task` wrapper for coroutine objects**. This comes handy to expose an API for cancellation and to handle some race conditions such as the child task ending before the parent task attempts to `join()` it (can you spot the bug?)

Passing the child task's return value back as the result of `await join()` and propagating the exception that crashed the child are left as exercises to the reader.