---

# Coroutines and Futures
### [Emil Sekerinski](http://www.cas.mcmaster.ca/~emil/), McMaster University, Fall 2019

---

<div style="float:right" class="code-img-pair-container">
  <div>
    <img src="attachment:Procedures.svg"/>
  </div>
</div>

Procedures, or _subroutines,_ always start the execution at the beginning and at some point return. Procedures can call other procedures, leading to a nested calling structure. In the diagram to the right, vertical lines show the lifetime of a procedure invocation: bars mean that the procedure is executing.

<div style="float:right" class="code-img-pair-container">
  <div>
    <img src="attachment:Coroutines.svg"/>
  </div>
</div>

_Coroutines_ generalize procedures by allowing control to be _transferred_ out of a coroutine at some point and then resumed again at that point. Coroutines, like procedures, can have local variables. These are preserved when control is transferred out of a coroutine. When a coroutine is created, it starts execution at the beginning. 

The transfer from one coroutine to another can take different forms:
- _Symmetric:_ In coroutine `A`, statement `transfer B` will transfer control coroutine `B`; all coroutines are equal, `B` can transfer back to `A` or transfer to `C`, etc. The transfer structure is arbitrary.
- _Asymmetric:_ Coroutine `A` calls coroutine `B` initially; in `B` statement `suspend` will return control to `A`; in `A`, statement `resume B` resumes execution in `B` at the point it suspended; coroutine `A` is like a caller and `B` like a callee, except that `B` will resume where it left and its state is preserved when suspending.

Coroutines run concurrently but are scheduled _cooperatively,_ meaning that there are explicit points when control is transferred. Only one coroutine is executed at a time. By contrast, threads (and processes) are scheduled _preemptively,_ meaning that after a certain time, control is transferred. Threads (and processes) can run in parallel.

### Python Generators

In Python, asymmetric coroutines are used as _generators,_ which are functions that when suspending, additionally yield a value. The keyword `yield` is used for suspending and the function `next` is used for resuming and obtaining the next yielded value:

In [None]:
def fib():
    a, b = 0, 1
    while True:
        yield a
        a, b = b, a + b

In [None]:
f = fib(); f

The generator `f` has it own state and runs concurrently with the main program:

In [None]:
next(f), next(f), next(f), next(f), next(f), next(f)

Generators can be used in `for` statements for iterating over all generated elements. As `fib()` generates arbitrarily many elements, this allows in principle iteration over an infinite sequence:

In [None]:
for x in fib(): print(x) # caution

_Exercise:_ Modify `fib` to take an additional integer parameter: `fib(n)` yields only numbers that are less than `n` and then terminates. This allows `fib(n)` to be used like `range(n)` in loops.

Generators can be used to avoid intermediate data structures to be constructed. Consider functions
- `unique(iterable)`, which takes an `iterable` (list, tuple) and returns the elements in same order but without duplicates,
- `filter(fn, iterable)`, which takes argument `fn`, a predicate, and returns the elements of the second argument, `iterable`, that satisfy `fn`.

In [None]:
def unique(iterable):
    seen = set()
    for e in iterable:
        if e not in seen:
            yield e
            seen.add(e)

In [None]:
list(unique([1, 3, 4, 2, 1, 3]))

In [None]:
def filter(fn, iterable):
    for e in iterable:
        if fn(e):
            yield e

In [None]:
def even(x): return x % 2 == 0
list(filter(even, [1, 2, 3, 4, 5, 6]))

In [None]:
list(unique(filter(even, [0, 1, 0, 1, 2, 2, 0, 1])))

While we can think of `filter` returning a list of even numbers, no such list is constructed in memory. Generator `filter` and `unique` are coroutines that run concurrently with the main program. (Note that Python has a built-in function `filter` with similar functionality.)

_Exercise:_ Implement `unique` and `filter` in Go as goroutines and compose them to achieve the same functionality as above. Note that while all even numbers are being transmitted over a channel, a sequence of all even numbers is not constructed in memory.

### Goroutines

Goroutines are similar to symmetric coroutines in that control is transferred explicitly when sending and receiving. However, goroutines can be scheduled preemptively; if the goroutines do not access global variables, this is not observable. The current Go implementation avoids time slices when scheduling preemptively, instead inserts `transfer` instructions at places where the code might take long, like in loops. Thus even if this appears to be preemptive scheduling, the Go implementation uses cooperative scheduling for efficiency.

### Python async / await

Threads (and processes) are suitable for _CPU-bound_ programs, as the computation can be spread among processors (cores). Coroutines are suitable for _I/O-bound_ programs: as the programs mainly waits for different I/O actions, it can quickly switch among those. In Python

- a coroutine is declared with `async def`,
- `await c` tranfers control to coroutine `c` and resumes when control is transferred back,
- `asyncio.run(c)` initiates coroutine `c`,
- `asyncio.gather(c0, c1, ...)` starts coroutines `c0, c1, ...` and waits for them to terminate.

_Note:_ some features are only available in Python 3.7 or 3.8. Check with `python3 -V`.

In [None]:
%%writefile asyncrequest.py
import asyncio
import time

async def request(i):
    print("Request " + str(i))
    await asyncio.sleep(1) # this could be an I/O operation
    print("Done " + str(i))

async def main():
    await asyncio.gather(request(0), request(1), request(2))

start = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - start
print(f"executed in {elapsed:0.2f} seconds.")

In [None]:
!python3 asyncrequest.py

The `asyncio` library has a class `Queue` for communication between coroutines. This can be used to as a buffer between producers and consumers:

In [None]:
%%writefile asyncpc.py
import asyncio
import random
import time

async def makeitem(p: int) -> str:
    return str(random.randint(0, 10)) + ' by ' + str(p)

async def produce(p: int, q: asyncio.Queue):
    for _ in range(random.randint(0, 10)):
        print("Producer " + str(p) + " sleeping")
        await asyncio.sleep(random.randint(0, 3))
        i = await makeitem(p)
        await q.put(i)
        print(i + " added")

async def consume(c: int, q: asyncio.Queue):
    while True:
        print("Consumer " + str(c) + " sleeping")
        await asyncio.sleep(random.randint(0, 3))
        i = await q.get()
        print(i + " removed by " + str(c))
        q.task_done()

async def main(np: int, nc: int): # number of producers, consumers
    q = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, q)) for n in range(np)]
    consumers = [asyncio.create_task(consume(n, q)) for n in range(nc)]
    await asyncio.gather(*producers)
    await q.join()  # blocks until q is empty
    for c in consumers:
        c.cancel()

asyncio.run(main(3, 5))

In [None]:
!python3 asyncpc.py

### Futures

A future a is the result of a function call that is not available when the function returns, but only later when it is needed. Thus execution of the function can be delayed; this is useful if the function is I/O bound and the result is not immediately needed.

While less common, futures can be expressed in Python with the `concurrent.futures` library. Python allows to check if the function has termianted and the result is available:

In [None]:
from concurrent.futures import ThreadPoolExecutor
from time import sleep
 
def return_after_5_secs(message):
    sleep(5)
    return message
 
pool = ThreadPoolExecutor(3)
 
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print(future.result())