# Concurrency with asyncio

## Compare vs Threading

In [1]:
# spinner_thread.py

# credits: Adapted from Michele Simionato's
# multiprocessing example in the python-list:
# https://mail.python.org/pipermail/python-list/2009-February/538048.html

# BEGIN SPINNER_THREAD
import threading
import itertools
import time
import sys


def spin(msg, done):  # <2>
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):  # <3>
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))  # <4>
        if done.wait(.1):  # <5>
            break
    write(' ' * len(status) + '\x08' * len(status))  # <6>


def slow_function():  # <7>
    # pretend waiting a long time for I/O
    time.sleep(3)  # <8>
    return 42


def supervisor():  # <9>
    done = threading.Event()
    spinner = threading.Thread(target=spin,
                               args=('thinking!', done))
    print('spinner object:', spinner)  # <10>
    spinner.start()  # <11>
    result = slow_function()  # <12>
    done.set()  # <13>
    spinner.join()  # <14>
    return result


def main():
    result = supervisor()  # <15>
    print('Answer:', result)


if __name__ == '__main__':
    main()
# END SPINNER_THREAD

spinner object: <Thread(Thread-4, initial)>
| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!           Answer: 42


## And now with an Asyncio coroutine

In [4]:
import asyncio
import itertools
import sys

@asyncio.coroutine
def spin(msg):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/=\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            yield from asyncio.sleep(.1)
            # using the combo of yield from and asyncio.sleep
            # we can sleep without blocking the event loop
        except asyncio.CancelledError: 
            break
    write(' ' * len(status) + '\x08' * len(status))

@asyncio.coroutine
def slow_function():
    # pretend waiting a long time for I/O
    yield from asyncio.sleep(3) #  handles control flow to the main loop,
    # and wil resume the coroutine after the sleep delay
    return 42

@asyncio.coroutine
def supervisor():
    # asyncio.async schedules the the spin coro to run
    # wrapping it in a Task object, which is returned immediatly
    spinner = asyncio.async(spin('thinking!'))  
    print('spinner object: ', spinner) # display the task object
    result = yield from slow_function() # drive the slow function
    # when that is done, we get the returned value
    # meanwhile the event loop will continue running because slow_function
    # ulimately uses yield from asyncio.sleep(3) to hand control
    # back to the main loop
    spinner.cancel() # ends the coro
    return result

def main():
    loop = asyncio.get_event_loop()  # get a ref to the event loop
    result = loop.run_until_complete(supervisor())  # drive the supervisor
    # to completion. The return value of the coor is the return value
    # of this call
    print('Answer: ', result)

if __name__ == '__main__':
    main()

spinner object:  <Task pending coro=<spin() running at <ipython-input-4-f05f121b810c>:5>>
| thinking!/ thinking!= thinking!\ thinking!| thinking!/ thinking!= thinking!\ thinking!| thinking!/ thinking!= thinking!\ thinking!| thinking!/ thinking!= thinking!\ thinking!| thinking!/ thinking!= thinking!\ thinking!| thinking!/ thinking!= thinking!\ thinking!| thinking!/ thinking!= thinking!\ thinking!| thinking!           Answer:  42


Note:  Never use time.sleep(...) in asyncio coro's unless you want
to block the main thread, therefore freezing the event loop 
and prob whole application.... 

If coro must spend time doing nothing, use: yield from asyncio.sleep(DELAY)


In [None]:
# quick coro experiment code
import asyncio
def run_sync(coro_or_future):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(coro_or_future)

a = run_sync(some_coroutine())

## Downloading with asyncio and aiohttp
asyncio supports TCP and UDP directly.

For HTTP or any other protocol, we need 3rd party packages.

aiohttp is the one to use or asyncio HTTP clients right now.

Idea for below example:
- Single threaded program where a main-loop activates queued coroutines one by one
- Each coroutine advances a few steps, then yields control back to the main loop, which activates the next coroutine in the queue.

In [4]:
import asyncio

import aiohttp
from flags import BASE_URL, save_flag, show, main

@asyncio.coroutine
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)  # blocking operations
    # are implemented via coroutines, and this code delegates to them 
    # via `yield from` so they run asynchronously
    image = yield from resp.read()   # reading response content
    # is a _separate_ async operation
    return image

@asyncio.coroutine
def download_one(cc):
    image = yield from get_flag(cc)  
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    loop = asyncio.get_event_loop()  # get a refernce to 
    # the underlying event loop implementation
    to_do = [download_one(cc) for cc in sorted(cc_list)] # build a list
    # of generator objects by calling the download_one function
    # once for each flag being retrieved
    wait_coro = asyncio.wait(to_do)  # despite its name, wait is not
    # a blocking function... It's a coroutine that completes when
    # all the coroutines passed to it are done... (that's the
    # default behavior of wait)
    res, _ = loop.run_until_complete(wait_coro)  # Execute the event loop
    # until wait_coro is done.  this is where the script will block 
    # while the event loop runs. We ignore the second item returned 
    # by run_until_complete. 
    
    return len(res)

# if name is main... run it.. but skipping for now

#### Notes
The asyncio.wait(...) coroutine accepts an iterable of futures or coroutines; wait wraps each Coroutine in a Task.

This means all objects managed by wait becomes instances of Future... And because is is a coro function, calling wait(...) returns a coro/generator tha; this is waht the wait_cor variable holds. 

To drive the coro, we pass to to loop.run_until_complete(...)

loop.run_until_complete(...) accepts a future or a coroutine. If it gets a coroutine, run_until complete wraps it in a task, similar to what wait does. 

Coroutines, Tasks, and Futures are ALL driven by yield from....

Upon completion run_until_complete returns a 2-tuple. First item is set of completed futures. Second item is uncompleted ones.

To use asyncio, must replace all functions that hit network with an async version that is invoked by a yield from; this ensures control is given back to the event loop.


##### Trick
When reasoning about coroutines, ignore the `yield from`, and read it as if it were regular sequential code. The `yield from` version does the same thing, but blocks and does the work asynchronously. (a little more complicated than that, but point is: don't get hung up on the yield from. think about what the code is doing)


#### Key Points
- Outermost function must be a caller that is NOT a coroutine, and uses next(...) or send(...)
- Innermost subgenerator must be a simple generator that uses yield, or an iterable object..
- In between, we can chain as many generators as we want. And that's a coroutine

#### Asyncio specifc
- For asyncio, an API call doe the driving (loop.run_until_complete(...) in this case)
- Must end with an asycio coroutine function or method (e.g. - yield from asyncio.sleep(...) or the aiohttp request that we used here.)

## Using asyncio.as_completed

Get results AS they complete, NOT once ALL are complete.

This is asyncio equivalent to the as_completed generator function used in the threadpool example, with progress bar.

Semaphore() object holds an internal counter that is decremented whenever we call acquire() and incremented whenever we call release() on the samephare...

Here, we use Semaphore as a conntext manager in this block...

The snippet guarantess that no more than concur_req instances of get_flags coros will be started at any time.

In [None]:
import asyncio
import collections

import aiohttp
from aiohttp import web
import tqdm

# and reuse some functions from the flags download example

# default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 10000

class FetchError(Exception):  # use this custom exception to wrap HTTP
    # or other network exceptions, and carry the country code for reporting
    def __init__(self, country_code):
        self.country_code = country_code
        
    @asyncio.coroutine
    def get_flag(base_url, cc): # get_flag will either return the bytes of
        # the image DL'd o, or raise web.HTTPNotFound if resp is 404
        # or raise an aiohttp.HttpProcessingError for other status codes
        url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
        resp = yield from aiohttp.requet('GET', url)
        if resp.status == 200:
            image = yield from resp.read()
            return read
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)
    
    @asyncio.coroutine
    def download_one(cc, base_url, semaphore, verbose):
        try:
            with (yield from semaphore):  # the semaphore arg is an 
                # instance of asyncio.Semaphore
                image = yield from get_flag(base_url, cc) # a semaphore
                # is used as a context manager in a yield from expression
                # so that the system as a whole is not blocked
                # only this coroutine is blocked while the semaphore
                # counter is at the maximum allowed number
                
                # also note:  when this with statement exits, the
                # semaphore counter is decremented, unblocking some other 
                # coroutine instance that may be waiting for the same
                # semaphore object
                except web.HTTPNotFound:
                    status = HTTPStatus.no_found
                    msg = 'not found'
                except Exception as exc:
                    raise FetchError(cc) from exc # Uses the exception
                    # chaining synatx from PEP 3134
                else:
                    save_flag(image, cc.lower() + '.gif')  # fn saves
                    # the flag image to disk
                    status = HTTPStatus.ok
                    msg = 'OK'
                
                if verbose and msg:
                    print(cc, msg)
                
                return Results(status, cc)
    
    @asyncio.coroutine
    def downloader_coro(cc_list, base_url, verbose, concur_req):
        # receives same args as download_many, but cannot be invoked
        # direcrly from main because it's a coor
        counter = collections.Counter()
        semaphore = asyncio.Semaphore(concur_req)  # allows up to
        # concur_req active coroutines using this same semaphore
        to_do = [download_one(cc, base_url, semaphore, verbose)
                 for cc in sorted(cc_list)]  # Create a list of coro
        # objects, one per call, to the downlone_one_function
        to_do_iter = asyncio.as_completed(to_do)  # get an iterator 
        # that will return futures as they are done
        if not verbose:
            to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) 
            # wrap iter in tqdm function to display progresss
        for future in to_do_iter:
            try:
                res = yield from future  # iteraten over the comleted futrs
            except FetchError as exc:
                country_code = exc.country_code  
                try:
                    error_msg = exc.__cause__.args[0]  # try to retrieve
                    # original msg from the exception __cause__
                except IndexError:
                    error_msg = exc.__cause__.__class__.__name___
                if verbose and error_msg:
                    msg = '*** Error for {} {}'
                    print(msg.format(country_code, error_msg))
                status = HTTPStatus.error
            else:
                status = res.status
                
            counter[status] += 1  # tally outcomes
        return counter
    
    def download_many(cc_list, base_url, verbose, concur_req):
        loop = asyncio.get_event_loop()
        coro = downloader_coro(cc_list, base_url, verbose, concur_req)
        counts = loop.run_until_complete(coro)  # download many simply
        # instantiates the coro and passes it to the event loop
        # with run_until_complete
        loop.close()  # when all work done, shut down event loop
        # and return counts
        
        return counts

if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

### Using an Executor to Avoid Blocking in the Event Loop

Only a few lines need to change. Prevents blocking for disk I/O. Can save millions of CPU cycles. (Still a fraction of a second, but it's a lot...)

In [None]:
@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
    try:
        with (yield from semaphore):
            image = yield from get_flag(base_url, cc)
        except web.HTTPNotFound:
            status = HTTPStatus.not_found
            msg = 'not found'
        except Except as exc:
            raise FetchError(cc) from exc
        else:
            loop = asycio.get_event_loop() # get a ref to
            # the event loop object
        
        loop.run_in_executor(None,  # first arg is an executor instance,
                            # if NOne, the default thpool exec of the 
                            # event loop is used
                    save_flag, image, cc.lower() + '.gif') 
        
        if verbose and msg:
            print(cc, msg)
        
        return Result(status, cc)
    

## From Callbacks to Futures to Coroutines

Instead of callbacks, use coroutines. yield, and then when result is ready, activate coro with a .send() call. 

We we even get to maintain context all withon one function body.

In [None]:
# coroutines and yield enable asynchronous programming w/o callbacks
@asyncio.coroutine
def three_stages(request1):
    response1 = yield from api_call1(request1)
    # stage 1
    request2 = step1(response1)
    response2 = yield from api_call2(request2)
    # stage 2
    request3 = step2(response2)
    response3 = yield from api_call3(request3)
    # stage 3
    step3(response3)

loop.create_task(three_stages(request1))  # must explicitly schedl selctn

IMPORTANT:  You must explicitly schedule exeution of the coro with the event loop, or activate it using yield from in another coroutine that is scheduled for execution.