In [1]:
import aiohttp
import asyncio
import json
import logging

from IPython.html import widgets
from IPython.display import display as ipydisplay

from utils import colorify_log_handler



In [2]:
colorify_log_handler(
    logging.getLogger().handlers[0],  # IPython by default inject one
    log_lineno = False,
    time_fmt = '%H:%M:%S'
)

logger = logging.getLogger('bench_rest_api')
logger.setLevel(logging.DEBUG)

logging.getLogger('asyncio').setLevel(logging.DEBUG)

In [3]:
logger.info('This is info')
logger.debug('我會說中文喔')
logger.error('……人家不是喜歡才跟你講話的喔')
logger.warning('笨蛋')

[01;32m18:11:25.315 I[0m [36m[bench_rest_api <module>][0m This is info[0m
[37m18:11:25.316 D[0m [36m[bench_rest_api <module>][0m 我會說中文喔[0m
[01;31m18:11:25.316 E[0m [36m[bench_rest_api <module>][0m ……人家不是喜歡才跟你講話的喔[0m
[01;33m18:11:25.317 W[0m [36m[bench_rest_api <module>][0m 笨蛋[0m


In [4]:
!curl -s -XGET "http://localhost:5566/" | python -m json.tool

{
    "env_details": {
        "num_process": 1,
        "quotes_pikle_pth": "parsed_1984.pkl"
    },
    "version": "2015.7"
}


In [5]:
!curl -s -XGET "http://localhost:5566/quote/uniform" | python -m json.tool

{
    "quote": " there, now, that's as far as I can get. A farthing, that was a small copper coin, looked something like a cent.'"
}


In [6]:
%%bash

ab -c 10 -n 10 "http://localhost:5566/quote?slow=true"

This is ApacheBench, Version 2.3 <$Revision: 1604373 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient).....done


Server Software:        TornadoServer/4.2
Server Hostname:        localhost
Server Port:            5566

Document Path:          /quote?slow=true
Document Length:        2322 bytes

Concurrency Level:      10
Time taken for tests:   0.509 seconds
Complete requests:      10
Failed requests:        9
   (Connect: 0, Receive: 0, Length: 9, Exceptions: 0)
Total transferred:      9227 bytes
HTML transferred:       7218 bytes
Requests per second:    19.63 [#/sec] (mean)
Time per request:       509.488 [ms] (mean)
Time per request:       50.949 [ms] (mean, across all concurrent requests)
Transfer rate:          17.69 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.0      0       0
Proc

## Basic

In [7]:
@asyncio.coroutine
def quote_simple(url='http://localhost:5566/quote/uniform', slow=False):
    r = yield from aiohttp.request(
        'GET', url, params={'slow': True} if slow else {}
    )
    if r.status != 200:
        logger.error('Unsuccessful response [Status: %s (%d)]' 
                     % (r.reason, r.status))
        r.close(force=True)
        return None
    quote_json = yield from r.json()
    return quote_json['quote']

In [8]:
loop = asyncio.get_event_loop()

[37m18:11:26.38 D[0m [36m[asyncio __init__][0m Using selector: EpollSelector[0m


To run a simple asyncio corountine.

In [9]:
coro = quote_simple()
quote = loop.run_until_complete(coro)
quote

"'Room 101,' said the officer."

Internally asyncio wraps it with [`asyncio.Task`].
So the following works equivalently.

[`asyncio.Task`]:  https://docs.python.org/3.4/library/asyncio-task.html#asyncio.Task

In [10]:
task = asyncio.Task(quote_simple())
quote = loop.run_until_complete(task)
quote

'After the middle of the present century, the first danger had in reality disappeared. Each of the three powers which now divide the world is in fact unconquerable, and could only become conquerable through slow demographic changes which a government with wide powers can easily avert. The second danger, also, is only a theoretical one. The masses never revolt of their own accord, and they never revolt merely because they are oppressed. Indeed, so long as they are not permitted to have standards of comparison, they never even become aware that they are oppressed. The recurrent economic crises of past times were totally unnecessary and are not now permitted to happen, but other and equally large dislocations can and do happen without having political results, because there is no way in which discontent can become articulate. As for the problem of over-production, which has been latent in our society since the development of machine technique, it is solved by the device of continuous warf

However, `coro` is `corountine`, and `task` is `Task` (subclass of [`Future`]).

One can use `asyncio.ensure_future` to make sure having a Future obj returned.

[`Future`]: https://docs.python.org/3.4/library/asyncio-task.html#asyncio.Future

In [11]:
type(coro), type(task)

(generator, asyncio.tasks.Task)

Passing wrong URL gives error

In [12]:
quote = loop.run_until_complete(
    quote_simple(url='http://localhost:5566/quote/uniform?part=100')
)

[01;31m18:11:26.171 E[0m [36m[bench_rest_api quote_simple][0m Unsuccessful response [Status: Bad Request (400)][0m


## Multiple Concurrent Requests

In [13]:
@asyncio.coroutine
def quote_many_naive(num_quotes=1):
    coroutines = [
        quote_simple(slow=True) for i in range(num_quotes)
    ]
    quotes = yield from (asyncio.gather(*coroutines))
    return quotes

In [14]:
%%time
quotes = loop.run_until_complete(quote_many_naive(2000))

CPU times: user 1.96 s, sys: 228 ms, total: 2.19 s
Wall time: 4.66 s


This is not helping since we open 2000 connections at a time. It is slower than expected.

### Limiting connection pool size

Ref on [official site](http://aiohttp.readthedocs.org/en/latest/client.html#limiting-connection-pool-size).

In [15]:
@asyncio.coroutine
def quote(conn, url='http://localhost:5566/quote/uniform', slow=False):
    r = yield from aiohttp.request(
        'GET', url, params={'slow': True} if slow else {},
        connector=conn
    )
    if r.status != 200:
        logger.error('Unsuccessful response [Status: %s (%d)]' 
                     % (r.reason, r.status))
        r.close(force=True)
        return None
    quote_json = yield from r.json()
    r.close(force=True)
    return quote_json['quote']

@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20):
    conn = aiohttp.TCPConnector(keepalive_timeout=1, force_close=True, limit=conn_limit)
    coroutines = [
        quote(conn) for i in range(num_quotes)
    ]
    quotes = yield from (asyncio.gather(*coroutines))
    return quotes

In [16]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100))

CPU times: user 1.87 s, sys: 244 ms, total: 2.11 s
Wall time: 3.94 s


I don't know why, but using its internal connection limit is slow. But we can implement one ourselves.

### Custom connection limit using semaphore

Use [`asyncio.Semaphore`] acting as a lock.

[`asyncio.Semaphore`]: https://docs.python.org/3.4/library/asyncio-sync.html#asyncio.Semaphore

In [17]:
def quote_with_lock(semaphore, url='http://localhost:5566/quote/uniform'):
    with (yield from semaphore):
        r = yield from aiohttp.request('GET', url)
        if r.status != 200:
            logger.error('Unsuccessful response [Status: %s (%d)]' 
                         % (r.reason, r.status))
            r.close(force=True)
            return None
    quote_json = yield from r.json()
    r.close(force=True)
    return quote_json['quote']

@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20):
    semaphore = asyncio.Semaphore(conn_limit)
    coroutines = [
        quote_with_lock(semaphore) for i in range(num_quotes)
    ]
    quotes = yield from (asyncio.gather(*coroutines))
    return quotes

In [18]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100))

CPU times: user 1.9 s, sys: 224 ms, total: 2.12 s
Wall time: 1.91 s


## Add Progressbar

If you don't care the original of coroutines

In [19]:
@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20, progress=None, step=10):
    if progress is None:
        progress = widgets.IntProgress()
        progress.max = num_quotes // step
        ipydisplay(progress)
    semaphore = asyncio.Semaphore(conn_limit)
    coroutines = [
        quote_with_lock(semaphore) for i in range(num_quotes)
    ]
    # quotes = yield from (asyncio.gather(*coroutines))
    quotes = []
    for ith, coro in enumerate(asyncio.as_completed(coroutines), 1):
        if ith % step == 0:
            progress.value += 1
        q = yield from coro
        quotes.append(q)
    return quotes

In [20]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=1))

CPU times: user 2.68 s, sys: 312 ms, total: 3 s
Wall time: 2.69 s


For fast response, progress bar introduces considerable latency. Try modify the step higher.

In [21]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=20))

CPU times: user 1.99 s, sys: 308 ms, total: 2.3 s
Wall time: 2.02 s


### Original order matters

... go eat yourself.

In [22]:
@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20, progress=None, step=10):
    if progress is None:
        progress = widgets.IntProgress()
        progress.max = num_quotes // step
        ipydisplay(progress)

    # create the lock
    semaphore = asyncio.Semaphore(conn_limit)

    finished_task_count = 0
    def progress_adder(fut):
        nonlocal finished_task_count
        finished_task_count += 1
        if finished_task_count % step == 0:
            progress.value += 1
    
    # wrap coroutines as Tasks
    futures = []
    for i in range(num_quotes):
        task = asyncio.Task(quote_with_lock(semaphore))
        task.add_done_callback(progress_adder)
        futures.append(task)
    
    quotes = yield from (asyncio.gather(*futures))
    return quotes

In [23]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=1))

CPU times: user 2.76 s, sys: 264 ms, total: 3.03 s
Wall time: 2.72 s


In [24]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=20))

CPU times: user 1.93 s, sys: 232 ms, total: 2.16 s
Wall time: 1.97 s


### Alternative way

In [25]:
@asyncio.coroutine
def quote_many(num_quotes=1, conn_limit=20, progress=None, step=10):
    if progress is None:
        progress = widgets.IntProgress()
        progress.max = num_quotes // step
        ipydisplay(progress)
    
    semaphore = asyncio.Semaphore(conn_limit)
    
    # wrap coroutines with future
    # For Python 3.4.4+, asyncio.ensure_future(...)
    # will wrap coro as Task and keep input the same 
    # if it is already Future.
    futures = [
        asyncio.ensure_future(quote_with_lock(semaphore))
        for i in range(num_quotes)
    ]

    for ith, coro in enumerate(asyncio.as_completed(futures), 1):
        if ith % step == 0:
            progress.value += 1
        yield from coro
        
    quotes = [fut.result() for fut in futures]
    return quotes

In [26]:
%%time
quotes = loop.run_until_complete(quote_many(2000, conn_limit=100, step=20))

CPU times: user 1.86 s, sys: 272 ms, total: 2.13 s
Wall time: 1.95 s
