In [24]:
import time

In [25]:
def lucas(): # similar to fib function 
    yield 2
    a = 2
    b = 1
    while True:
        yield b
        a,b = b, a + b

In [26]:
from itertools import islice 

In [27]:
list(islice(lucas(), 10))

[2, 1, 3, 4, 7, 11, 18, 29, 47, 76]

**No Concurrency Example**
- Simple search method 
    - we take in an iterable (like a sentence) and search for the item that satifies our ```predicate()``` function 
- This runs synchronously

In [28]:
def search(iterable, predicate):
    for item in iterable: 
        if predicate(item):
            return item 
    raise ValueError("Not found")

In [29]:
search(lucas(), lambda x: len(str(x)) >= 6)

103682

**How do we convert a program to be asynchronous?**
- One major player in making programs asynchronous in Python are ```generators```
    - Generators allow us to break up the computation of a loop
    - This makes it possible to introduce some managable control flow 

In [30]:
def async_search(iterable, predicate):
    for item in iterable: 
        if predicate(item):
            return item
        yield
    raise ValueError("Not Found")

In [31]:
g = async_search(lucas(), lambda x: x >= 10)
print(g)
print(next(g))
print(next(g))
print(next(g))
print("Hello Group!") # we can do other things before the gen. is exhausted!
print(next(g))
print(next(g))
print(next(g)) # we are returning a value in a generator (not traditional)

<generator object async_search at 0x10cdb51a8>
None
None
None
Hello Group!
None
None


StopIteration: 11

**What Happened?**
- We found the item and ran ```return item```
- Generators are supposed to ```yield``` a value and return nothing
    - But we are returning a value and yielding nothing
    - So when we return a value, we stop iteration of the generator
- We can see that the return value is still sorta returned in the payload of the ```StopIteration: 11``` error 
    - Weird!

In [51]:
from collections import deque

class Task:
    next_id = 0
    def __init__(self, routine):
        self.id = Task.next_id
        Task.next_id += 1
        self.routine = routine 
        
class Scheduler:
    def __init__(self):
        """
        Scheduler stores queue of tasks, completed results, 
        and raised exceptions 
        """
        self.runnable_tasks = deque()
        self.completed_task_results = {}
        self.failed_task_errors = {}
    
    def add(self, routine):
        """
        Wrap a routine in a task and add that task to queue
        """
        task = Task(routine)
        self.runnable_tasks.append(task)
        return task.id
    
    def run_to_completion(self):
        # run while the queue has stuff in it 
        while len(self.runnable_tasks) != 0:
            task = self.runnable_tasks.popleft() # pop off next task
            #print("Running task {} ...".format(task.id), end='')
            try:
                yielded = next(task.routine)
            except StopIteration as stopped: # if stopIteration, we done 
                print("completed with result: {!r}".format(stopped.value))
                # return the payload of the error (our return result)
                self.completed_task_results[task.id] = stopped.value
            except Exception as e: # if other exception, something is wrong
                print("failed with exception: {}".format(e))
                self.failed_task_errors[task.id] = e
            else: # executes if try is successful
                assert yielded is None
                #print("now yielded")
                self.runnable_tasks.append(task)

In [33]:
scheduler = Scheduler()
scheduler.add(async_search(lucas(), lambda x: len(str(x)) >= 6))

0

In [34]:
scheduler.run_to_completion()

Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...now yielded
Running task 0 ...completed with result: 103682


In [35]:
# we can have more than one task!
scheduler.add(async_search(lucas(),lambda x: len(str(x)) >= 7))
scheduler.add(async_search(lucas(),lambda x: len(str(x)) >= 9))
scheduler.run_to_completion()

Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running task 2 ...now yielded
Running task 1 ...now yielded
Running ta

**We need a slightly heavier example to really see what's happening**

In [36]:
from math import sqrt
def is_prime(x):
    if x < 2: 
        return False
    for i in range(2,int(sqrt(x)) + 1):
        if x % i == 0:
            return False
    return True

print(is_prime(12))
print(is_prime(13))
print(is_prime(2**31 - 1))
#print(is_prime(2**41 - 1))

False
True
True


In [37]:
def async_print_matches(iterable, predicate):
    for item in iterable:
        if predicate(item):
            print("Found :", item, end=", ")
        yield

In [16]:
# this will run infinitely 
scheduler = Scheduler()
scheduler.add(async_print_matches(lucas(), is_prime))
scheduler.run_to_completion()

Running task 3 ...Found : 2, now yielded
Running task 3 ...now yielded
Running task 3 ...Found : 3, now yielded
Running task 3 ...now yielded
Running task 3 ...Found : 7, now yielded
Running task 3 ...Found : 11, now yielded
Running task 3 ...now yielded
Running task 3 ...Found : 29, now yielded
Running task 3 ...Found : 47, now yielded
Running task 3 ...now yielded
Running task 3 ...now yielded
Running task 3 ...Found : 199, now yielded
Running task 3 ...now yielded
Running task 3 ...Found : 521, now yielded
Running task 3 ...now yielded
Running task 3 ...now yielded
Running task 3 ...Found : 2207, now yielded
Running task 3 ...Found : 3571, now yielded
Running task 3 ...now yielded
Running task 3 ...Found : 9349, now yielded
Running task 3 ...now yielded
Running task 3 ...now yielded
Running task 3 ...now yielded
Running task 3 ...now yielded
Running task 3 ...now yielded
Running task 3 ...now yielded
Running task 3 ...now yielded
Running task 3 ...now yielded
Running task 3 ...now y

KeyboardInterrupt: 

### Print a message at intervals     

In [38]:
def async_repetitive_message(message, interval_seconds):
    while True:
        print(message)
        start = time.time()
        expiry = start + interval_seconds
        while True:
            yield # all coroutines must be able to yield no matter what
            now = time.time()
            if now >= expiry: # this could pass infinitely
                break
            # yield (if int sec is small, we will never yeild here)

In [39]:
scheduler = Scheduler()
scheduler.add(async_repetitive_message("Come get dinner", 2.5))

3

In [40]:
scheduler.add(async_print_matches(lucas(), is_prime))
scheduler.run_to_completion()

Running task 3 ...Come get dinner
now yielded
Running task 4 ...Found : 2, now yielded
Running task 3 ...now yielded
Running task 4 ...now yielded
Running task 3 ...now yielded
Running task 4 ...Found : 3, now yielded
Running task 3 ...now yielded
Running task 4 ...now yielded
Running task 3 ...now yielded
Running task 4 ...Found : 7, now yielded
Running task 3 ...now yielded
Running task 4 ...Found : 11, now yielded
Running task 3 ...now yielded
Running task 4 ...now yielded
Running task 3 ...now yielded
Running task 4 ...Found : 29, now yielded
Running task 3 ...now yielded
Running task 4 ...Found : 47, now yielded
Running task 3 ...now yielded
Running task 4 ...now yielded
Running task 3 ...now yielded
Running task 4 ...now yielded
Running task 3 ...now yielded
Running task 4 ...Found : 199, now yielded
Running task 3 ...now yielded
Running task 4 ...now yielded
Running task 3 ...now yielded
Running task 4 ...Found : 521, now yielded
Running task 3 ...now yielded
Running task 4 ...n

KeyboardInterrupt: 

### What's Wrong Here?
- We need to be very careful to make sure that a coroutine returns results right away or yeilds control
    - Everything you call from a coroutine should be non-blocking 
        - If we don't yield control, we will hang the system 
- Here, ```is_prime``` blocks the program from yielding control to the ```async_repetetive_message``` function

In [44]:
# rewrite is_prime to be asynchronous, ie. generator    
def async_is_prime(x):
    if x < 2: 
        return False
    for i in range(2, int(sqrt(x))+1):
        if x % i == 0:
            return False
        yield
    return True

- Since it is now a generator, we don't just call ```is_prime```, we have to iterate it 

In [45]:
# rewrite print matches to work with generator 
def async_print_matches(iterable, async_predicate):
    for item in iterable:
        # yield from allows us to call generator from within generator
        matches = yield from async_predicate(item) 
        if matches:
            print("Found :", item)
        # yield (bare yield)

- We allow the predicate function to make progress and yield control by invoking it with the ```yield from``` syntax
- We don't need the final yield since we yield control in ```print_matches``` and in the new ```is_prime``` function 

### It is critical that nothing blocks the call stack
- We have now demonstrated that:
    - Coroutines are contagious to callees
    - Coroutines are contagious to callers
- This means that asyncronous design requires a fairly robust asynchronous system
    - We can't just make one part asynchronous

### We can extract coroutines to refactor our code

In [49]:
def async_repetitive_message(message, interval_seconds):
    while True:
        print(message)
        yield from async_sleep(interval_seconds)

def async_sleep(interval_seconds):
    start = time.time()
    expiry = start + interval_seconds
    while True:
        yield
        now = time.time()
        if now >= expiry:
            break

### Okay, but why? 
- ```async_sleep``` has two critical properties for our system: 
    - ```async_sleep``` yields at least once 
    - ```async_sleep(0)``` yields exactly once
- Now all bare yields can be replaced with ```yield from async_sleep(0)``` 

Let's apply this change: 

In [47]:
def async_is_prime(x):
    if x < 2: 
        return False
    for i in range(2, int(sqrt(x))+1):
        if x % i == 0:
            return False
        yield from async_sleep(0)
    return True

def async_search(iterable, predicate):
    for item in iterable: 
        if predicate(item):
            return item
        yield from async_sleep(0)
    raise ValueError("Not Found")
    
def async_print_matches(iterable, async_predicate):
    for item in iterable:
        matches = yield from async_predicate(item) 
        if matches:
            print("Found :", item)

In [52]:
scheduler = Scheduler()
scheduler.add(async_repetitive_message("Come Get Dinner Fam!", 2.5))
scheduler.add(async_print_matches(lucas(), async_is_prime))
scheduler.run_to_completion()

Come Get Dinner Fam!
Found : 2
Found : 3
Found : 7
Found : 11
Found : 29
Found : 47
Found : 199
Found : 521
Found : 2207
Found : 3571
Found : 9349
Found : 3010349
Found : 54018521
Found : 370248451
Found : 6643838879
Found : 119218851371
Come Get Dinner Fam!
Found : 5600748293801
Come Get Dinner Fam!
Come Get Dinner Fam!
Come Get Dinner Fam!
Come Get Dinner Fam!
Come Get Dinner Fam!
Come Get Dinner Fam!


KeyboardInterrupt: 

### We've basically implemented a simple asyncio of our own
- Now we can look at our code rewritten in asyncIO (it's very similar)
    - All coroutines declared as ```async def foo()```
    - We use ```asyncio.sleep(time)``` instead of our own sleep function
    - Any instance of ```yield from foo``` is now ```await foo``` 

In [59]:
import asyncio

async def is_prime(x):
    if x < 2: 
        return False
    for i in range(2, int(sqrt(x)) + 1):
        if x % 1 == 0:
            return False
        await asyncio.sleep(0)
    return True

async def search(iterable, async_predicate):
    for item in iterable:
        if await async_predicate(item):
            return item
    raise ValueError("Not Found")

async def print_matches(iterable, async_predicate):
    for item in iterable:
        matches = await async_predicate(item)
        if matches:
            print("Found: ", item)

async def repetitive_message(message, interval_seconds):            
    while True:
        print(message)
        await asyncio.sleep(interval_seconds)

In [60]:
scheduler = asyncio.get_event_loop()
scheduler.create_task(repetitive_message("Come Get Dinner Fam!!", 2.5))
scheduler.create_task(print_matches(lucas(), is_prime))
scheduler.run_forever()

Task exception was never retrieved
future: <Task finished coro=<print_matches() done, defined at <ipython-input-59-e407444998b7>:18> exception=OverflowError('int too large to convert to float',)>
Traceback (most recent call last):
  File "<ipython-input-59-e407444998b7>", line 20, in print_matches
    matches = await async_predicate(item)
  File "<ipython-input-59-e407444998b7>", line 6, in is_prime
    for i in range(2, int(sqrt(x)) + 1):
OverflowError: int too large to convert to float


Come Get Dinner Fam!!
Found:  2
Found:  3
Come Get Dinner Fam!!
Come Get Dinner Fam!!
Come Get Dinner Fam!!
Come Get Dinner Fam!!
Come Get Dinner Fam!!
Come Get Dinner Fam!!
Come Get Dinner Fam!!
Come Get Dinner Fam!!
Come Get Dinner Fam!!
Come Get Dinner Fam!!
Come Get Dinner Fam!!
Come Get Dinner Fam!!
Come Get Dinner Fam!!


KeyboardInterrupt: 

### So what makes asyncio special?
- Coroutines implement tasks 
- Coroutines await other coroutines 
- Event-loop schedules other coroutines 
- Tasks must not block
- Awaiting facilitates context switches 
- To yield control without needing a result: ```await asyncio.sleep(0)```