# Implementing Concurrency

## Asynchronous Programming

### 并发（Concurrency）

并发是实现能够同时处理多个请求的系统的一种方式。基本思路是，在等待一个资源时，着手处理别的请求，而不是干等着。以下是同步代码：

In [1]:
import time


def network_request(n):
    time.sleep(1.0)
    return {'success': True, 'result': n ** 2}


def fetch_square(n):
    resp = network_request(n)
    if resp['success']:
        print(f'result is {resp["result"]}')


fetch_square(2)
fetch_square(3)
fetch_square(4)

result is 4
result is 9
result is 16


`network_request` 模拟远程IO访问，三次 `fetch_square` 请求逐一完成，是为同步。

### Callbacks

同步方式下，执行代码被”阻塞“（block），直到请求完成。要是代码可继续执行而非被阻塞，可以考虑使用`callback`。

In [2]:
import time
import threading


def wait_and_print_async(msg):
    def callback():
        print(msg, flush=True)

    timer = threading.Timer(1.0, callback)
    timer.start()


start = time.time()
wait_and_print_async('First')
wait_and_print_async('Second')
wait_and_print_async('Third')

time.sleep(1.1)
print(time.time() - start)

ThirdFirstSecond


1.1014273166656494


`Timer` 是非阻塞的，因为它开启了一个新线程，它仅仅 submit 了一个调用，而非 execute 它。一个问题是，这里的`callback` 无法返回值。改为：

In [3]:
import time
import threading


def network_request_async(n, on_done):

    def timer_done():
        on_done({'success': True, 'result': n ** 2})

    timer = threading.Timer(1.0, timer_done)
    timer.start()


def fetch_square(n):
    def on_done(result):
        if result['success']:
            print(f'result is {result["result"]}')

    network_request_async(n, on_done)


start = time.time()
fetch_square(2)
fetch_square(3)
fetch_square(4)

time.sleep(1.1)
print(time.time() - start)


result is 4result is 16result is 9


1.103153944015503


### Futures

上面使用 callback 将值返回给调用者。Future 可以更方便地跟踪异步调用的结果。Future 是一种抽象，顾名思义，用于跟踪请求并等待的资源，其值将在 future 获得。

In [8]:
from concurrent.futures import Future

future = Future()
print(future)

future.set_result(1)
print(future)

future.result()

<Future at 0x10f6c9c50 state=pending>
<Future at 0x10f6c9c50 state=finished returned int>


1

现在将 network_request_async 例子改写为：

In [9]:
import time
import threading
from concurrent.futures import Future


def network_request_async(n):
    future = Future()
    result = {'success': True, 'result': n ** 2}
    timer = threading.Timer(1.0, lambda: future.set_result(result))
    timer.start()
    return future


def fetch_square(n):
    fut = network_request_async(n)

    def on_done(future):
        result = future.result()
        if result['success']:
            print(f'result is {result["result"]}')

    fut.add_done_callback(on_done)


start = time.time()
fetch_square(2)
fetch_square(3)
fetch_square(4)

time.sleep(1.1)
print(time.time() - start)

result is 9result is 4

result is 16
1.1013798713684082


### Event Loop

至此，皆是通过 OS 的线程来实现并发，但在某些框架中，并发任务的协调是通过 event loop 完成的。

EL 的基本思路是，持续监控各个资源的状态，在需要的时候触发对 callback 的调用。在 EL 中，每个执行单元的代码不会同时执行，从而简化了对于共享变量、数据结构和资源的管理，因此比多线程更为简单。

In [10]:
import time


class Timer:
    def __init__(self, timeout):
        self.timeout = timeout
        self.start = time.time()
        self.callback = None

    def done(self):
        return time.time() - self.start > self.timeout

    def on_timer_done(self, callback):
        self.callback = callback


timer1 = Timer(0.2)
timer1.on_timer_done(lambda: print('Job1 is done...'))

timer2 = Timer(0.1)
timer2.on_timer_done(lambda: print('Job2 is done...'))
timers = [timer1, timer2]

while True:
    for timer in timers:
        if timer.done():
            timer.callback()
            timers.remove(timer)

    if not timers:
        break

Job2 is done...
Job1 is done...


这里，在 loop 之外，只是定义 timer 与其 callback，loop 则负责监控 timers，并执行相应的 callback。

EL 不会使用阻塞式调用。事件的通知通常使用 OS 调用（如 Unix 的 select）。Python 标准库中包含了一个方便的基于 EL 的并发框架：`asyncio`。

## `asyncio` 框架



In [15]:
def parrot():
    while True:
        msg = yield
        print(f'Parrot says: {msg}')
        
        
gen = parrot()
# TypeError: can't send non-None value to a just-started generator
gen.send(None)
gen.send('hello')
gen.send('world')

Parrot says: hello
Parrot says: world


In [17]:
async def hello():
    print('hello, async')
    
coro = hello()
coro

<coroutine object hello at 0x10f728ca8>

In [18]:
import asyncio

loop = asyncio.get_event_loop()
loop.run_until_complete(coro)

hello, async


### 阻塞与非阻塞

处理阻塞式代码的策略是使用单独线程，可使用 `ThreadPoolExcutor`。

In [20]:
from concurrent.futures import ThreadPoolExecutor


def wait_and_return(msg):
    time.sleep(1)
    return msg


executor = ThreadPoolExecutor(max_workers=3)
result = executor.submit(wait_and_return, "hello")
result

<Future at 0x10f77ffd0 state=running>

In [22]:
result.result()

'hello'

In [23]:
# manage the execution of tasks in asyncio
future = loop.run_in_executor(executor, wait_and_return, "hello")
future # not running

<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Applications/anaconda/anaconda3/lib/python3.6/asyncio/futures.py:408]>

In [24]:
# start to run
loop.run_until_complete(future)

'hello'