## Concurrency with asyncio

### Thread vs. coroutine


In [3]:
# spinner_thread.py
import threading 
import itertools
import time
import sys

class Signal:
    go = True

def spin(msg, signal):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        time.sleep(.1)
        if not signal.go:
            break
    write(' ' * len(status) + '\x08' * len(status))

def slow_function():
    time.sleep(3)
    return 42

def supervisor():
    signal = Signal()
    spinner = threading.Thread(target=spin, args=('thinking!', signal))
    print('spinner object:', spinner)
    spinner.start()
    result = slow_function()
    signal.go = False
    spinner.join()
    return result

def main():
    result = supervisor()
    print('Answer:', result)
    
if __name__ == '__main__':
    main()

spinner object: <Thread(Thread-6, initial)>
| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking          Answer: 42


In [10]:
# spinner_asyncio.py
import asyncio
import itertools
import sys

@asyncio.coroutine
def spin(msg):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            yield from asyncio.sleep(.1)
        except asyncio.CancelledError:
            break
    write(' ' * len(status) + '\x08' * len(status))
    
@asyncio.coroutine
def slow_function():
    yield from asyncio.sleep(3)
    return 42

@asyncio.coroutine
def supervisor():
    #Schedule the execution of a coroutine object: 
    #wrap it in a future. Return a Task object.
    spinner = asyncio.ensure_future(spin('thinking!'))  
    print('spinner object:', spinner)
    result = yield from slow_function()
    spinner.cancel()
    return result

def main():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(supervisor())
    loop.close()
    print('Answer:', result)
    
if __name__ == '__main__':
    main()

RuntimeError: This event loop is already running

spinner object: <Task pending coro=<spin() running at <ipython-input-10-cc1b280ec93f>:6>>
| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking          

In [11]:
# flags_asyncio.py 
import asyncio

import aiohttp

from flags import BASE_URL, save_flag, show, main

@asyncio.coroutine
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    image = yield from resp.read()
    return image

@asyncio.coroutine
def download_one(cc):
    image = yield from get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    loop = asyncio.get_event_loop()
    to_do = [download_one(cc) for cc in sorted(cc_list)]
    wait_coro = asyncio.wait(to_do)
    res, _ = loop.run_until_complete(wait_coro)
    loop.close()
    
    return len(res)

if __name__ == '__main__':
    main(download_many)

RuntimeError: This event loop is already running

  # This is added back by InteractiveShellApp.init_path()
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10460bf28>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10460b748>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10460ba58>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10460b278>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10460ba58>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10460b8d0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10460bc18>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10460b160>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10460b8d0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10460bc18>
Unclosed cli

In [None]:
# flags2_asyncio.py
import asyncio
import collections

import aiohttp
from aiohttp import web
import tqdm 

from flags2_common import HTTPStatus, save_flag, Result, main

DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

class FetchError(Exception):
    def __init__(self, country_code):
        self.country_code = country_code

@asyncio.coroutine
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.ClientSession().get(url)
    if resp.status == 200:
        image = yield from resp.read()
        return image
    elif resp.status == 404:
        raise web.HTTPNotFound()
    else:
        raise aiohttp.HttpProcessingError(
            code=resp.status, message=resp.reason, headers=resp.headers)

@asyncio.coroutine        
def download_one(cc, base_url,  semaphore, verbose):
    try:
        with (yield from semaphore):
            image = yield from get_flag(base_url, cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found 
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        save_flag(image, cc.lower() + '.gif') 
        status = HTTPStatus.ok
        msg = 'OK'
    if verbose and msg: 
        print(cc, msg)
    
    return Result(status, cc)

@asyncio.coroutine
def downloader_coro(cc_list, base_url, verbose, concur_req): 
    counter = collections.Counter()
    semaphore = asyncio.Semaphore(concur_req)
    to_do = [download_one(cc, base_url, semaphore, verbose)
            for cc in sorted(cc_list)]
    to_do_iter = asyncio.as_completed(to_do) 
    if not verbose:
        to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) 
    for future in to_do_iter:
        try:
            res = yield from future
        except FetchError as exc: 
            country_code = exc.country_code 
            try:
                error_msg = exc.__cause__.args[0] 
            except IndexError:
                error_msg = exc.__cause__.__class__.__name__ 
            if verbose and error_msg:
                msg = '*** Error for {}: {}'
                print(msg.format(country_code, error_msg)) 
            status = HTTPStatus.error
        else:
            status = res.status
        counter[status] += 1 
    return counter

def download_many(cc_list, base_url, verbose, concur_req):
    loop = asyncio.get_event_loop()
    coro = download_coro(cc_list, base_url, verbose, concur_req)
    counts = loop.run_until_complete(wait_coro)
    loop.close()

    return counts

if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

In [None]:
# run_in_executor
@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
    try:
        with (yield from semaphore):
            image = yield from get_flag(base_url, cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        # save_flag 也是阻塞操作，所以使用run_in_executor调用save_flag进行
        # 异步操作
        loop = asyncio.get_event_loop()
        loop.run_in_executor(None, save_flag, image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'OK'
        
    if verbose and msg:
        print(cc, msg)
        
    return Result(status, cc)

In [None]:
## Doing multiple requests for each download
# flags3_asyncio.py
@asyncio.coroutine
def http_get(url):
    res = yield from aiohttp.request('GET', url)
    if res.status == 200:
        ctype = res.headers.get('Content-type', '').lower()
        if 'json' in ctype or url.endswith('json'):
            data = yield from res.json()
        else:
            data = yield from res.read()
            
        elif res.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.errors.HttpProcessingError(
                code=res.status, message=res.reason,
                headers=res.headers)
            
@asyncio.coroutine
def get_country(base_url, cc):
    url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower())
    metadata = yield from http_get(url)
    return metadata['country']

@asyncio.coroutine
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    return (yield from http_get(url))

@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
    try:
        with (yield from semaphore):
            image = yield from get_flag(base_url, cc)
        with (yield from semaphore):
            country = yield from get_country(base_url, cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        country = country.replace(' ', '_')
        filename = '{}-{}.gif'.format(country, cc)
        loop = asyncio.get_event_loop()
        loop.run_in_executor(None, save_flag, image, filename)
        status = HTTPStatus.ok
        msg = 'OK'
        
    if verbose and msg:
        print(cc, msg)
        
    return Result(status, cc)

### Writing asyncio servers

In [None]:
# tcp_charfinder.py
import sys
import asyncio

from charfinder import UnicodeNameIndex

CRLF = b'\r\n'
PROMPT = b'?>'

index = UnicodeNameIndex()

@asyncio.coroutine
def handle_queries(reader, writer):
    while True:
        writer.write(PROMPT)
        yield from writer.drain()
        data = yield from reader.readline()
        try:
            query = data.decode().strip()
        except UnicodeDecodeError:
            query = '\x00'
        client = writer.get_extra_info('peername')
        print('Received from {}: {!r}'.format(client, query))
        if query:
            if ord(query[:1]) < 32:
                break
            lines = list(index.find_description_strs(query))
            if lines:
                writer.writelines(line.encode() + CRLF for line in lines)
            writer.write(index.status(query, len(lines)).encode() + CRLF)
            
            yield from writer.drain()
            print('Sent {} results'.format(len(lines)))
    print('Close the client socket')
    writer.close()

def main(address='127.0.0.1', port=2323):
    port = int(port)
    loop = asyncio.get_event_loop()
    server_coro = asyncio.start_server(handle_queries, address, port, loop=loop)
    server = loop.run_until_complete(server_coro)
    
    host = server.sockets[0].getsockname()
    print('Serving on {}. Hit CTRL-C to stop.'.format(host))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    
    print('Server shutting down.')
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()
    
if __name__ == '__main__':
    main()

In [None]:
# http_charfinder.py
@asyncio.coroutine
def init(loop, address, port):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', home)
    handler = app.make_handler()
    server = yield from loop.create_server(handler, address, port)
    return server.sockets[0].getsockname()

def home(request):
    query = request.GET.get('query', '').strip()
    print('Query: {!r}'.format(query))
    if query:
        descriptions = list(index.find_descriptions(query))
        res = '\n'.join(ROW_TPL.format(**vars(descr)) 
                       for descr in descriptions)
        msg = index.status(query, len(descriptions))
    else:
        descriptions = []
        res = ''
        msg = 'Enter words describing characters.'
        
    html = template.format(query=query, result=res, message=msg)
    print('Sending {} results'.format(len(descriptions)))
    return web.Response(content_type=CONTENT_TYPE, text=html)
    
def main(address='127.0.0.1', port=8888):
    port = int(port)
    loop = asyncio.get_event_loop()
    host = loop.run_until_complete(init(loop, address, port))
    print('Serving on {}. Hit CTRL-C to stop.'.format(host))
    try:
        loop.run_forever()
    except KeyboardInterrupt:  # CTRL+C pressed
        pass
    print('Server shutting down.')
    loop.close()
    
if __name__ == '__main__':
    main(*sys.argv[1:])