# асинхронно програмиране с Python 3

&nbsp;

&nbsp;

> [http://bit.ly/async-python](http://bit.ly/async-python)

![How to draw a horse?](resources/how-to-draw-a-horse.jpg)
> Van Oktop, много наблюдателен човек

## да започнем с пример

&nbsp;


> Какви webservers използват популярните портали в България?

&nbsp;

Крайна цел:
``` python
{'Apache': 34, 'nginx': 31, . . . .}
```

&nbsp;

[http://bit.ly/async-python](http://bit.ly/async-python)

### нещо такова 

&nbsp;

``` python
@pseudo_code
def main():
  for host in hosts:
    connection = connect_to_server(host)
    connection.send_request(...)
    response = connection.receive(...)
    server = detect_webserver(response)
    count_server(server)
  print(web_server_counts)
```

#### малко подготовка

In [124]:
PROJECT_PATH = '/Users/boris/Work/softuni/2015-09-AsyncPython/source/asyncio'
import sys; sys.path.insert(0, PROJECT_PATH)
import re
import os

HTTP_REQUEST_TEMPLATE = 'OPTIONS / HTTP/1.1\nHost: {}\nConnection: close\n\n'
RE_SERVER = re.compile(r'Server: (.+?)[/\r\n]', re.I)

def get_hosts():
    with open(PROJECT_PATH + '/bgsites.txt', 'r') as f:
        for host in f:
            host = host.strip()
            if not host.startswith('#'):
                yield host

def detect_webserver_from_response(http_response: bytes) -> str:
    http_response = http_response.decode(errors='ignore') if isinstance(http_response, bytes) else http_response
    found_server = RE_SERVER.findall(http_response)
    return found_server[0] if found_server else None


#### Последователно изпълнение

In [125]:
import socket
import time
from collections import Counter

def main_sequential():
    webservers = Counter()
    t = time.time()
    for host in get_hosts():
        connection = socket.create_connection((host, 80), timeout=5)
        http_request_string = HTTP_REQUEST_TEMPLATE.format(host)
        connection.send(http_request_string.encode())
        response = connection.recv(2048)
        server = detect_webserver_from_response(response)
        webservers[server] += 1
        print("Completed: {}".format(host))

    print("Done in {}sec".format(time.time() - t))
    print(str(webservers))

In [None]:
main_sequential()

![последователно изпълнение](resources/diagram-tasks-sequential.png)

### можем ли да се справим по-добре?

&nbsp;

threads?

![използване на threads](resources/diagram-tasks-threaded.png)

In [None]:
import socket
import time
import threading
from collections import Counter

def main_threads():
    webservers = Counter()
    t = time.time()
    threads = []
    for host in get_hosts():
        thread = threading.Thread(target=get_webserver_for_host, args=(host, webservers))
        thread.start()
        threads.append(thread)

    for thread in threads:
        thread.join()
    print("Done in {}sec".format(time.time() - t))
    print(str(webservers))

def get_webserver_for_host(hostname: str, webservers: Counter):
    connection = socket.create_connection((hostname, 80), timeout=5)
    http_request_string = HTTP_REQUEST_TEMPLATE.format(hostname)
    connection.send(http_request_string.encode())
    result = connection.recv(2048)
    server = detect_webserver_from_response(result)
    webservers[server] += 1
    connection.close()
    print("Completed: {}".format(hostname))

In [None]:
main_threads()

какво не беше наред с картинката?

* пуснахме >80 thread-а, за да обработим заявките

* всички пишат едновременно в променливата `webservers`

* . . . и още малко други :о)

##### втори опит, с малко подобрения

In [126]:
import socket
import time
from threading import Thread, Lock, BoundedSemaphore
from collections import Counter
from bg_webservers_common import get_hosts, HTTP_REQUEST_TEMPLATE, detect_webserver_from_response

def main_threads_improved():
    webservers = Counter()
    t = time.time()
    lock = Lock()
    semaphore = BoundedSemaphore(10)
    threads = []
    for host in get_hosts():
        thread = Thread(target=get_webserver_for_host, args=(host, webservers, lock, semaphore))
        thread.start()
        threads.append(thread)

    for thread in threads:
        thread.join()
    print("Done in {}sec".format(time.time() - t))
    print(str(webservers))


def get_webserver_for_host(hostname: str, webservers: Counter, lock: Lock, semaphore: BoundedSemaphore):
    with semaphore:
        connection = socket.create_connection((hostname, 80), timeout=5)
        http_request_string = HTTP_REQUEST_TEMPLATE.format(hostname)
        connection.send(http_request_string.encode())
        result = connection.recv(2048)
        server = detect_webserver_from_response(result)
        connection.close()
    with lock:
        webservers[server] += 1
    print("Completed: {}".format(hostname))


In [None]:
main_threads_improved()

![използване на threads](resources/diagram-tasks-threaded.png)

## асинхронно програмиране

&nbsp;

> Кодът се извиква при настъпване на събитие, вместо активно да проверява за наличие на такова

![асинхронен вариант](resources/diagram-tasks-callbacks.png)

### Event loop

![Event loop](resources/diagram-eventloop.png)

## Кога е подходящо?

&nbsp;

При приложения, които основно извършват входно/изходни операции - мрежа, управление на външни процеси, сървърни приложения и други.

## Възможни подходи за реализация

### "Callback hell"
![Callback hell](resources/callback-hell.png)

### ... и неговите разновидности

&nbsp;

http://stackabuse.com/avoiding-callback-hell-in-node-js/ 

&nbsp;

> "Design around it"  :o)

##### "Define your functions beforehand" 

``` javascript
var fs = require('fs');

function notifyUser(err) {  
    if(err) return console.log(err);
    console.log('Appended text!');
};

function appendText(err, txt) {  
    if (err) return console.log(err);

    txt = txt + '\nAppended something!';
    fs.writeFile(myFile, txt, notifyUser);
}

var dataFile = '/tmp/test';  
fs.readFile(dataFile, 'utf8', appendText);  
```

##### Async.js

``` javascript
var fs = require('fs');  
var async = require('async');

var myFile = '/tmp/test';

async.waterfall([  
    function(callback) {
        fs.readFile(myFile, 'utf8', callback);
    },
    function(txt, callback) {
        txt = txt + '\nAppended something!';
        fs.writeFile(myFile, txt, callback);
    }
], function (err, result) {
    if(err) return console.log(err);
    console.log('Appended text!');
});
```

##### Promises
``` javascript
var Promise = require('bluebird');  
var fs = require('fs');  
Promise.promisifyAll(fs);

var myFile = '/tmp/test';  
fs.readFileAsync(myFile, 'utf8').then(function(txt) {  
    txt = txt + '\nAppended something!';
    fs.writeFile(myFile, txt);
}).then(function() {
    console.log('Appended text!');
}).catch(function(err) {
    console.log(err);
});
```

а по-четимо не може ли ... ??

## варианти за асинхронно програмиране с Python

* asyncio - Python 3.3+
* Tornado - Python 2 & 3
* Twisted - Python 2 & 3
* gevent - Python 2 & 3
* plain sockets with setblocking(0) - Python 2 & 3
* libuv, eventlet, trollius (asyncio за Python 2)
* Autobahn|Python, pulsar, diesel, weightless
* . . . 

#### asyncio

In [None]:
import asyncio
import time
from collections import Counter

@asyncio.coroutine
def get_webserver_for_host(hostname: str, webservers: Counter):
    reader, writer = yield from asyncio.open_connection(host=hostname, port=80)
    http_request_string = HTTP_REQUEST_TEMPLATE.format(hostname)
    writer.write(http_request_string.encode())
    response = yield from reader.read(2048)
    server = detect_webserver_from_response(response)
    webservers[server] += 1
    writer.close()
    print("Completed: {}".format(hostname))

@asyncio.coroutine
def main_asyncio():
    webservers = Counter()
    running_coroutines = []
    t = time.time()
    for host in get_hosts():
        c = get_webserver_for_host(host, webservers)
        running_coroutines.append(c)

    yield from asyncio.gather(*running_coroutines)
    print("Done in {}sec".format(time.time() - t))
    print(str(webservers))

In [None]:
asyncio.get_event_loop().run_until_complete(main_asyncio())

### event loop, yield, какво ... ?

![асинхронен вариант](resources/diagram-tasks-callbacks.png)

## event loop

&nbsp;

![Event loop](resources/diagram-eventloop.png)

### екстра, но няма ли пак да е камара от callbacks?

&nbsp;

&nbsp;

> Да, ще е :о)


### генератори

![диаграма за генератор](resources/diagram-generator.png)

измислени през 1975 с езика CLU (Wikipedia)

> Ей на, видé ли - казáх ти, че след 70-те години нищо ново не са измислили!
>
> -- известен автор

### пример за генератор

&nbsp; 

От файла **bgsites-short.txt** трябва да извлечем всички некоментирани редове

    vbox7.com
    # tyxo.bg
    vesti.bg
    # unacs.bg
    zamunda.net

In [None]:
PROJECT_PATH = '/Users/boris/Work/softuni/2015-09-AsyncPython/source/asyncio'
# . . .
def get_all_hosts():
    result = []
    with open(PROJECT_PATH + '/bgsites-short.txt', 'r') as f:
        for host in f:
            host = host.strip()
            if not host.startswith('#'):
                result.append(host)
    return result

In [None]:
for h in get_all_hosts():
    print("Received: ", h)

In [None]:
get_all_hosts()

In [None]:
PROJECT_PATH = '/Users/boris/Work/softuni/2015-09-AsyncPython/source/asyncio'
# . . .
def get_hosts():
    print("Entering the generator ...")
    with open(PROJECT_PATH + '/bgsites-short.txt', 'r') as f:
        for host in f:
            host = host.strip()
            print("Read: ", host)
            if not host.startswith('#'):
                yield host

In [None]:
for h in get_hosts():
    print("Received: ", h)

In [None]:
g = get_hosts()
g

In [None]:
next(g)

#### генератор-функция и генератор-обект

In [None]:
get_hosts

In [None]:
get_hosts()

#### как могат да се използват, за да опростим асинхронното програмиране?

&nbsp;

| генератори | асинхронни задачи |
| ---------- | ----------------- |
| генераторът **не започва изпълнение** на функцията, докато не бъде извикан next() | задачата **не трябва да започва изпълнение**, докато event loop не я стартира |
| след yield на стойност, генераторът **"замразява" изпълнението** на функцията, докато не бъде извикан отново next() | когато задачата трябва да изпълни I/O операция, трябва да **"замрази" изпълнението си**, и да даде възможност на други задачи да работят, докато се чака резултата от I/O операцията |


In [None]:
@asyncio.coroutine
def get_webserver_for_host(hostname: str, webservers: Counter):
    reader, writer = yield from asyncio.open_connection(host=hostname, port=80)
    http_request_string = HTTP_REQUEST_TEMPLATE.format(hostname)
    writer.write(http_request_string.encode())
    response = yield from reader.read(2048)
    server = detect_webserver_from_response(response)
    webservers[server] += 1
    writer.close()
    print("Completed: {}".format(hostname))

през **2005** с Python 2.5 разширяват генераторите в езика, за да могат да се използват за и асинхронно програмиране (PEP-342)

* възможност за предаване на стойност от извикващия към генератора
``` python
response = yield from reader.read(2048)
```

* възможност за предаване на exceptions от извикващия към генератора
```
ако има проблем, read() или open_connection() ще хвърлят exception на мястото, където е извикан yield
```

веднага пример

In [None]:
def generate_multiples(n: int, multiply_by: int=2):
    while True:
        try:
            n = n * multiply_by
            new_multiply_by = yield n
            if new_multiply_by:
                multiply_by = new_multiply_by
        except Exception as e:
            print("Something went wrong: " + str(e))

In [None]:
g = generate_multiples(1)
g

In [None]:
next(g)

In [None]:
g.send(3)

In [None]:
next(g)

In [None]:
g.throw(ValueError("Stop it already!"))

####  ... а "coroutines" ?

&nbsp;

> cooperative routines

&nbsp;

* позволяват "паралелно" изпълнение на няколко операции
* програмистът решава кога е ОК да се "замрази" изпълнението на функцията (за разлика от threads)
* . . .
* заемат много малко ресурси
* в Python са реализирани като генератори

In [None]:
@asyncio.coroutine
def get_webserver_for_host(hostname: str, webservers: Counter):
    reader, writer = yield from asyncio.open_connection(host=hostname, port=80)
    http_request_string = HTTP_REQUEST_TEMPLATE.format(hostname)
    writer.write(http_request_string.encode())
    response = yield from reader.read(2048)
    server = detect_webserver_from_response(response)
    webservers[server] += 1
    writer.close()
    print("Completed: {}".format(hostname))

тъй като е генератор, не започва изпълнение, докато не е scheduled за изпълнение в event loop
``` python
loop = asyncio.get_event_loop()
loop.run_until_complete(
    get_webserver_for_host("softuni.bg", Counter())
)
```
или
``` python
loop.create_task(
    get_webserver_for_host("softuni.bg", Counter())
)
```

### Future, Task & coroutine

&nbsp;

* **Future** - резултат, който ще бъде получен в бъдеще (подобно на Promise)

&nbsp;

* **Task** - обект, отговорен да изпълни coroutine в текущия event loop

In [None]:
import asyncio

@asyncio.coroutine
def co_1():
    print("in co_1")

@asyncio.coroutine
def co_2():
    print("in co_2")

@asyncio.coroutine
def co_3():
    print("in co_3")

@asyncio.coroutine
def main():
    co_1()
    yield from co_2()
    asyncio.async(co_3())  # creates a Task instance & schedules in event loop
    print("in main")

asyncio.get_event_loop().run_until_complete(main())

In [None]:
import asyncio
import time
from collections import Counter

@asyncio.coroutine
def get_webserver_for_host(hostname: str, webservers: Counter):
    reader, writer = yield from asyncio.open_connection(host=hostname, port=80)
    http_request_string = HTTP_REQUEST_TEMPLATE.format(hostname)
    writer.write(http_request_string.encode())
    response = yield from reader.read(2048)
    server = detect_webserver_from_response(response)
    webservers[server] += 1
    writer.close()
    print("Completed: {}".format(hostname))

@asyncio.coroutine
def main_asyncio():
    webservers = Counter()
    running_coroutines = []
    t = time.time()
    for host in get_hosts():
        c = get_webserver_for_host(host, webservers)
        running_coroutines.append(c)

    yield from asyncio.gather(*running_coroutines)
    print("Done in {}sec".format(time.time() - t))
    print(str(webservers))

asyncio.get_event_loop().run_until_complete(main_asyncio())

## python 3.5

&nbsp;

нови ключови думи за улеснение на асинхронно програмиране

* **async** - указва, че функция, for-loop или with изпълняват с асинхронни операции

&nbsp;

* **await** - вместо `yield` и `yield from` ; не може да се използва извън `async` функция

``` python
async def get_webserver_for_host(hostname: str, webservers: Counter):
    reader, writer =   await   asyncio.open_connection(host=hostname, port=80)
    http_request_string = HTTP_REQUEST_TEMPLATE.format(hostname)
    writer.write(http_request_string.encode())
    response =   await   reader.read(2048)
    server = detect_webserver_from_response(response)
    webservers[server] += 1
    writer.close()
    print("Completed: {}".format(hostname))

async def main_asyncio():
    webservers = Counter()
    running_coroutines = []
    t = time.time()
    for host in get_hosts():
        c = get_webserver_for_host(host, webservers)
        running_coroutines.append(c)

    await  asyncio.gather(*running_coroutines)

asyncio.get_event_loop().run_until_complete(main_asyncio())
```

###  eкстра, а Python 2  ??

&nbsp;

* trollius - порт на  asyncio за Python 2.6+

&nbsp;

* Tornado, Twisted, gevent, . . .

&nbsp;

базирани на основната идея - event loop & coroutines

### използване на няколко async libraries/frameworks

&nbsp;

Event loop е интерфейс -> един lib може да работи върху event loop на друг 

* Twisted може да работи върху event loop на Qt, asyncio, . . .
* Tornado може да работи върху event loop на asyncio
* asyncio може да работи върху event loop на Qt, libuv, . . . 
* . . . 


### използване на код, който не е написан асинхронно



In [None]:
import asyncio
from concurrent.futures import ThreadPoolExecutor

def do_some_computations(n):
    result = 0
    for x in range(n**2):
        for y in range(n**2):
            result += (x+1)**2 / (y+1)**3
    return result

@asyncio.coroutine
def do_something_async():
    print("Async operation started")
    yield from asyncio.sleep(2)
    print("Async operation completed")

loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor(max_workers=1))
loop.run_until_complete(
    asyncio.gather(
        loop.run_in_executor(None, do_some_computations, 40),
        do_something_async()
    )
)

Tornado, Twisted и останалите библиотеки имат подобни механизми -  run_in_executor()

`ThreadPoolExecutor` & `ProcessPoolExecutor`

### a  web ... ?

пример за web приложение с Tornado

&nbsp;

## накратко, това е :о)

&nbsp;

> Email: boris.chervenkov@gmail.com

> Twitter: @b_chervenkov

> Github: [http://bit.ly/async-python](http://bit.ly/async-python)