# AsyncIO Study Note

A fancy way of writing concurrent code without threading.

## Parallelisim - Python Thread and GIL
(**Doing** multiple things at once)
*   [Understanding the Python GIL](https://www.youtube.com/watch?v=Obt-vMVdM8s&t=1746s&ab_channel=DavidBeazley)
*   [A Zoomable Interactive Python Thread Visualization](http://www.dabeaz.com/GIL/gilvis/index.html)



## Concurrency - Async I/O
(**Dealing** multiple things at once)
*  [Coroutine Concurrency in Python 3 with asyncio](https://www.youtube.com/watch?v=c5wodlqGK-M&ab_channel=NDCConferences)

### Building Coroutines from scratch using Python *generator*
#### Example - The Fib sequence


In [None]:
import time
from math import sqrt

In [None]:
def fib():
  a = 2
  b = 1
  yield a
  while True:
    yield b
    a, b = b, a + b

Calling *fib()* DOES NOT execute it, it returns a **generator** object.

In [None]:
f = fib()
f

<generator object fib at 0x7f64a73c8eb8>

Calling **`next()`** triggers one iteration of the actual execution

In [None]:
next(f)

2

In [None]:
from itertools import islice
list(islice(fib(), 20))

[2,
 1,
 3,
 4,
 7,
 11,
 18,
 29,
 47,
 76,
 123,
 199,
 322,
 521,
 843,
 1364,
 2207,
 3571,
 5778,
 9349]

#### A linear search
Returns the first item satisfying a predicate

In [None]:
def search(iterable, predicate):
  for item in iterable:
    if predicate(item):
      return item
  raise ValueError("Not found")

In [None]:
search(fib(), lambda x: len(str(x)) >= 3)

123

#### Cooperative linear search
Periodically yields control to caller and on completion returns result (in excpetion payload)

In [None]:
def async_search(iterable, predicate):
  print('initializing generator')
  call_counter = 0

  for item in iterable:

    call_counter += 1

    print(f'async_search - called {call_counter} times')
    if predicate(item):
      # `return` raises `StopIteration` to signal
      # the iterator has reached the end of the data
      print(f'async_search - predicate returned True')
      return item
    # interrupt the loop and hand over control to caller
    # `yield` is a shorthand for `yield None`
    yield
  raise ValueError("Not found")

In [None]:
g = async_search(fib(), lambda x: len(str(x)) >= 3)

In [None]:
# Now we can perform the search and do more than one thing concurrently
next(g)
next(g)
next(g)
print("Do other stuff 1")
next(g)
next(g)
next(g)
next(g)
print("Do other stuff 2")
next(g)
next(g)
print("Do other stuff 3")
next(g)


initializing generator
async_search - called 1 times
async_search - called 2 times
async_search - called 3 times
Do other stuff 1
async_search - called 4 times
async_search - called 5 times
async_search - called 6 times
async_search - called 7 times
Do other stuff 2
async_search - called 8 times
async_search - called 9 times
Do other stuff 3
async_search - called 10 times


In [None]:
# Eventually `return` in `async_search` causes an exception to be raised by
# generator and result is wrapped in the `StopIteration` exception
next(g)

async_search - called 11 times
async_search - predicate returned True


StopIteration: ignored

**Conclusion:** 
* We can use `generator` to create functions to support concurrency in Python
* A **coroutine** essentially is an ***interruptable*** function where we can pause/resume execution at the point we left off at some later time

#### Building Task and Scheduler

In [None]:
class Task:
  next_id = 0

  def __init__(self, routine):
    self.id = Task.next_id
    self.routine = routine
    Task.next_id += 1

from collections import deque

class Scheduler: 
  def __init__(self):
    # Use a `deque` to pop and push tasks from both
    # end of the queue
    self.runnable_tasks = deque()
    # Use a `dict` to store the (task_id, task_result) pairs
    self.completed_task_result = {}
    self.failed_task_errors = {}

  def add(self, routine):
    # Wrap the routine with a `Task` and push the task to the queue
    task = Task(routine)
    self.runnable_tasks.append(task)
    return task.id

  def run_to_completion(self):
    # While the queue is not empty, take the next task 
    # run it by calling `next`, try to catch the result
    # if the task is completed, otherwise push it back to
    # the end of the queue and pop out next task for
    # execution
    while len(self.runnable_tasks) != 0:
      task = self.runnable_tasks.popleft()
      print(f"Running task {task.id}")
      try:
        # run task
        yielded = next(task.routine)
      # check result
      except StopIteration as stopped:
        print(f"----------------------\nTask {task.id} completed with result: {stopped.value}")
        self.completed_task_result[task.id] = stopped.value
      except Exception as e:
        print(f"Failed with exception: {e}")
      else: 
        assert yielded is None
        # print("now yielded")
        self.runnable_tasks.append(task)



##### Try the Scheduler with one task

In [None]:
scheduler = Scheduler()
scheduler.add(async_search(fib(), lambda x: len(str(x)) >= 3))

0

In [None]:
scheduler.run_to_completion()

Running task 0
initializing generator
async_search - called 1 times
Running task 0
async_search - called 2 times
Running task 0
async_search - called 3 times
Running task 0
async_search - called 4 times
Running task 0
async_search - called 5 times
Running task 0
async_search - called 6 times
Running task 0
async_search - called 7 times
Running task 0
async_search - called 8 times
Running task 0
async_search - called 9 times
Running task 0
async_search - called 10 times
Running task 0
async_search - called 11 times
async_search - predicate returned True
----------------------
Task 0 completed with result: 123


In [None]:
# pop the result to get answer
scheduler.completed_task_result.pop(0)

123

In [None]:
#### Try Scheduler with 2 tasks
scheduler = Scheduler()
scheduler.add(async_search(fib(), lambda x: len(str(x)) >= 3))
scheduler.add(async_search(fib(), lambda x: len(str(x)) >= 4))
scheduler.run_to_completion()


Running task 1
initializing generator
async_search - called 1 times
Running task 2
initializing generator
async_search - called 1 times
Running task 1
async_search - called 2 times
Running task 2
async_search - called 2 times
Running task 1
async_search - called 3 times
Running task 2
async_search - called 3 times
Running task 1
async_search - called 4 times
Running task 2
async_search - called 4 times
Running task 1
async_search - called 5 times
Running task 2
async_search - called 5 times
Running task 1
async_search - called 6 times
Running task 2
async_search - called 6 times
Running task 1
async_search - called 7 times
Running task 2
async_search - called 7 times
Running task 1
async_search - called 8 times
Running task 2
async_search - called 8 times
Running task 1
async_search - called 9 times
Running task 2
async_search - called 9 times
Running task 1
async_search - called 10 times
Running task 2
async_search - called 10 times
Running task 1
async_search - called 11 times
async_

#### Finding Prime && Fib numbers in an asynchronous way
Callees of coroutines must be non-blocking.

In [None]:
def async_sleep(interval_secs):
  start = time.time()
  expiry = start + interval_secs
  while True:
    yield
    now = time.time()
    if now > expiry:
      break;

In [None]:
def async_is_prime(x):
  if x < 2:
    return False

  # Simulate IO blocking execution
  for i in range(2, int(sqrt(x))+1):
    if x % i == 0:
      return False
    # yield to avoid blocking on large numbers
    yield from async_sleep(0)
  
  return True


In [None]:
async_is_prime(2)

In [None]:
def async_print_matches(iterable, async_predicate):
  for item in iterable:
    # ` yield from` allows the predicate to make progress
    # and yield control
    matches = yield from async_predicate(item)
    if matches:
      print(f"Found: {item}")
    # Keep iterating instead of returning/interrupting  
    # yield

In [None]:
def async_repetitive_message(msg, interval_secs):
  # Periodically print a message
  while True:
    print(msg)
    yield from async_sleep(interval_secs)

In [None]:
sched = Scheduler()
sched.add(async_repetitive_message('This is an async message', 2))
sched.add(async_print_matches(fib(), async_is_prime))
#sched.run_to_completion()

#### Replacing implementation with *asyncio*
* `import asyncio` 
* `def async_method_name -> async def method_name`
* `async_sleep` -> `asyncio.sleep`
* `yield from` -> `await`
* `Scheduler` -> asyncio event loop

**Note** Run following code in your own IDE

In [None]:
from math import sqrt
import asyncio

def fib():
    
    a = 2
    b = 1
    yield a
    while True:
#         print(f'yielding {b}')
        yield b
        a, b = b, a + b
        
async def is_prime(x):
    if x < 2:
        return False

    for i in range(2, int(sqrt(x))+1):
        if x % i == 0:
            return False
        # yield to avoid blocking on large numbers
        await asyncio.sleep(0)
  
    return True 


async def search(iterable, predicate):
    
    print('search')
    for item in iterable:
#         print(f'search - checking {item}')
        if await predicate(item):
            print('predicate returned True')
            return item

        await asyncio.sleep(0)

    raise ValueError("Not found")
  

async def print_matches(iterable, async_predicate):
    for item in iterable:
        # ` yield from` allows the predicate to make progress
        # and yield control back to caller
        matches = await async_predicate(item)
        if matches:
            print(f"Found: {item}")

async def repetitive_message(msg, interval_secs):
    # Periodically print a message
    while True:
        print(msg)
        await asyncio.sleep(interval_secs)  
        

loop = asyncio.get_event_loop()
    
search_task = asyncio.ensure_future(
    print_matches(fib(), is_prime), loop=loop
)
 

repetitive_task = asyncio.ensure_future(
    repetitive_message('This is an async message!', 2), 
    loop=loop
)

all_future = asyncio.gather(search_task, repetitive_task)

loop.run_until_complete(all_future)
loop.close()
