# Concurency

- If we run these tasks concurrently we can essentially hide the wait time by running another task in the meantime. It is important to note that this is all still happening on a single thread and still only uses one CPU at a time!

- While concurrency isn’t limited to I/O, this is where we see the greatest benefits. In a concurrent program, instead of having your code run serially—that is, from one line to the next—your code is written to handle events, and different parts of your code run when different events happen.

- By modeling a program in this way, we are able to deal with the particular event that we are concerned with: I/O wait.

In [None]:
# This is the server, save this content to file server.py and run: python server.py
from tornado import httpserver
from tornado import options
from tornado import ioloop
from tornado import web
from tornado import gen

import ujson as json
import time
from collections import defaultdict

options.define("port", default=8080, help="Port to serve on")


class AddMetric(web.RequestHandler):
    metric_data = defaultdict(list)

    @gen.coroutine
    def get(self):
        if self.get_argument("flush", False):
            json.dump(self.metric_data, open("metric_data.json", "w+"))
        else:
            name = self.get_argument("name")
            try:
                delay = int(self.get_argument("delay", 1024))
            except ValueError:
                raise web.HTTPError(400, reason="Invalid value for delay")

            start = time.time()
            yield gen.Task(ioloop.IOLoop.instance().add_timeout, start + delay / 1000.)
            self.write('.')
            self.finish()
            end = time.time()
            self.metric_data[name].append({
                "start": start,
                "end": end,
                "dt": end - start,
            })


if __name__ == "__main__":
    options.parse_command_line()
    port = options.options.port

    application = web.Application([
        (r"/add", AddMetric),
    ])

    http_server = httpserver.HTTPServer(application)
    http_server.listen(port)
    print("Listening on port: {}".format(port))
    ioloop.IOLoop.instance().start()

## Serial Request, one by one

In [17]:
import requests
import string
import random


def generate_urls(base_url, num_urls):
    """
    We add random characters to the end of the URL to break any caching
    mechanisms in the requests library or the server
    """
    for i in xrange(num_urls):
        yield base_url + "".join(random.sample(string.ascii_lowercase, 10))


def run_experiment(base_url, num_iter=500):
    response_size = 0
    for url in generate_urls(base_url, num_iter):
        response = requests.get(url)
        response_size += len(response.text)
    return response_size

if __name__ == "__main__":
    import time
    delay = 100
    num_iter = 500
    base_url = "http://localhost:8080/add?name=serial&delay={}&".format(delay)

    start = time.time()
    result = run_experiment(base_url, num_iter)
    end = time.time()
    print("Result: {}, Time: {}".format(result, end - start))

Result: 500, Time: 55.24573493


## Use grequest instead of request to async request, do request another while waiting response

In [16]:
import grequests
import string
import random


def generate_urls(base_url, num_urls):
    for i in xrange(num_urls):
        yield base_url + "".join(random.sample(string.ascii_lowercase, 10))


def run_experiment(base_url, num_iter=500):
    urls = generate_urls(base_url, num_iter)
    requests = (grequests.get(u) for u in urls)
    response_futures = grequests.imap(requests, size=100)
    response_size = sum(len(r.text) for r in response_futures)
    return response_size

if __name__ == "__main__":
    import time
    delay = 100
    num_iter = 500

    start = time.time()
    result = run_experiment(
        "http://127.0.0.1:8080/add?name=grequests&delay={}&".format(delay),
        num_iter)
    end = time.time()
    print result, (end - start)

500 4.11812591553


## Using tornado to even boost the speed more

In [None]:
from tornado import ioloop
from tornado.httpclient import AsyncHTTPClient

from functools import partial
import string
import random

AsyncHTTPClient.configure(
    "tornado.curl_httpclient.CurlAsyncHTTPClient", max_clients=100)


def generate_urls(base_url, num_urls):
    for i in xrange(num_urls):
        yield base_url + "".join(random.sample(string.ascii_lowercase, 10))


def fetch_urls(urls, callback):
    http_client = AsyncHTTPClient()
    urls = list(urls)
    responses = []

    def _finish_fetch_urls(result):
        responses.append(result)
        if len(responses) == len(urls):
            callback(responses)
    for url in urls:
        http_client.fetch(url, callback=_finish_fetch_urls)


def run_experiment(base_url, num_iter=500, callback=None):
    urls = generate_urls(base_url, num_iter)
    callback_passthrou = partial(_finish_run_experiment,
                                 callback=callback)
    fetch_urls(urls, callback_passthrou)


def _finish_run_experiment(responses, callback):
    response_sum = sum(len(r.body) for r in responses)
    print response_sum
    callback()

if __name__ == "__main__":
    import time
    delay = 100
    num_iter = 500
    base_url = "http://127.0.0.1:8080/add?name=tornado_callback&delay={}&".format(
        delay)

    _ioloop = ioloop.IOLoop.instance()
    _ioloop.add_callback(run_experiment, base_url, num_iter, _ioloop.stop)

    start = time.time()
    _ioloop.start()
    end = time.time()
    print (end - start)

In [None]:
500 1.07092499733

## Use asyncio, standard library from Python 3.4 that unify tornado and grequests


In [None]:
import asyncio
import aiohttp
import random
import string


def generate_urls(base_url, num_urls):
    for i in range(num_urls):
        yield base_url + "".join(random.sample(string.ascii_lowercase, 10))


def chunked_http_client(num_chunks):
    semaphore = asyncio.Semaphore(num_chunks)

    @asyncio.coroutine
    def http_get(url):
        nonlocal semaphore
        with (yield from semaphore):
            response = yield from aiohttp.request('GET', url)
            body = yield from response.content.read()
            yield from response.wait_for_close()
        return body
    return http_get


def run_experiment(base_url, num_iter=500):
    urls = generate_urls(base_url, num_iter)
    http_client = chunked_http_client(100)
    tasks = [http_client(url) for url in urls]
    responses_sum = 0
    for future in asyncio.as_completed(tasks):
        data = yield from future
        responses_sum += len(data)
    return responses_sum

if __name__ == "__main__":
    import time
    loop = asyncio.get_event_loop()
    delay = 100
    num_iter = 500

    start = time.time()
    result = loop.run_until_complete(
        run_experiment(
            "http://127.0.0.1:8080/add?name=asyncio&delay={}&".format(delay),
            num_iter))
    end = time.time()
    print("{} {}".format(result, end - start))

In [None]:
500 1.7761478424072266