## 异步编程

### 同步/异步

发生在函数/方法调用的时候，是否得到**直接**的**最终结果**

- 得到直接的最终结果，同步调用
- 得到非直接的最终结果，异步调用

### 阻塞/非阻塞

发生在函数/方法调用的时候，是否立刻返回

- 立刻返回， 非阻塞调用
- 不立刻返回：阻塞调用

同步/异步 和 阻塞/非阻塞 的区别：

- 同步/异步：强调的是结果
- 阻塞/非阻塞：强调的是是否等待

### 同步IO/异步IO/IO多路复用

IO：
- 内核从输入设备读写数据
- 进程会从内核复制数据


- 同步IO: 进程从输入设备读写数据，会直接返回结果
- 异步IO: 进程从输入设备读写数据，并不得到直接的返回结果
- 同步IO:
    - 阻塞IO: 进程等待，直到读写完成 read/write
    - 非阻塞IO: 进程不等待，如果IO设备没有准备好，返回Error，内核和用户空间之间复制数据是阻塞的read/write
    - IO多路复用：阻塞的，直到其中一个IO设备准备好，select/poll/epoll/kqueue/iocp, 从epoll 开始就不一样了，linux 是epoll的，mac 是kqueue的，windows 是iocp的，
- 异步IO：整个过程，对进城发起IO请求，内核完成IO的两步，通知进程

- 使用性能较低的select/poll
- 判断操作系统类型和版本，根据操作系统使用不同的方法

- 第一个途径比较简单，性能低
- 第二个途径复杂，性能还比较ok

In [1]:
import selector

ModuleNotFoundError: No module named 'selector'

In [2]:
import selectors

In [3]:
import socket

In [29]:
selector = selectors.DefaultSelector()

In [21]:
def read(conn, mask):
    message = conn.recv(1024).strip()
    print('recv {}'.format(message.decode()))
    
def accept(sock, mask):
    conn, _ = sock.accept()
    conn.setblocking(False)
    selector.register(conn, selectors.EVENT_READ, read)

In [30]:
sock = socket.socket()

In [31]:
sock.bind(('127.0.0.1', 1237))

In [32]:
sock.listen()

In [33]:
key = selector.register(sock, selectors.EVENT_READ, accept)

In [34]:
key

SelectorKey(fileobj=<socket.socket fd=59, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1237)>, fd=59, events=1, data=<function accept at 0x10b5a9b70>)

In [35]:
import threading

In [36]:
event = threading.Event()

- fileobj, 就是register 注册的文件对象
- fd，文件对象的fd，文件描述符
- events， 注册的东西，有哪些events
- data 通常就是可选参数

In [38]:
try:
    while not event.is_set():
        events = selector.select(1)
        for key, _ in events:
            key.data(key.fileobj)
except KeyboardInterrupt:
    event.set()
    sock.close()
    selector.unregister(sock)

TypeError: accept() missing 1 required positional argument: 'mask'

In [39]:
sock.close()

- import selectors
- 初始化selector
- 创建read，accept函数
- 初始化、bind、listen
- 注册一个函数

- 读准备好。内核和外部的输入输出设备建立好了连接了
- 写准备好

In [40]:
from collections import namedtuple

In [41]:
Client = namedtuple('Client', ['conn', 'queue', 'handle'])

In [46]:
import queue

class ChatServer:
    def __init__(self, host='127.0.0.1', port=1234):
        self.host = host
        self.port = port
        self.sock = socket.socket()
        self.event = threading.Event()
        self.clients = {}
        
        self.selector = selectors.DefaultSelector()
        
    
    def disconnect(self, peer):
        client = self.clients[peer]
        self.selector.unregister(client.conn)
        client.conn.close()
        del self.clients[peer]
        
    def broadcast(self, peer, message):
        for _, queue, _ in self.clients.values():
            queue.put((peer, message))
            
    def handle(self, conn, mask):
        peer = conn.getpeername()
        client = self.clients[peer]
        if mask & selectors.EVENT_READ:
            message = conn.recv(1024).strip().decode()
            if message == '/exit':
                self.disconnect(peer)
                return
            self.broadcast(peer, message)
        if mask & selector.EVENT_WRITE:
            while not client.queue.empty():
                (host, port), message = client.queue.get()
                conn.send('{}:{}\n{}\r\n'.format(host, port, message).encode())
                
    def accept(self):
        conn, client_address = self.sock.accept()
        self.clients[client_address] = Client(conn, queue.Queue(), self.handle)
        self.selector.register(conn, selectors.EVENT_READ|selectors.EVENT_WRITE, self.clients[client_address])
        
    def run(self):
        while not self.event.is_set():
            for key, mask in self.selector.select(1):
                if callable(key.data):
                    key.data()
                else:
                    key.data.handle(key.data.conn, mask)
            

    def start(self):
        self.sock.bind((self.host, self.port))
        self.sock.listen()
        self.selector.register(self.sock, selectors.EVENT_READ, self.accept)
        
    def shutdown(self):
        self.event.set()
        for conn, _, _ in list(self.clients.values()):
            self.disconnect(conn.getpeername())
        self.selector.unregister(self.sock)
        self.sock.close()
        

In [47]:
chat_server = ChatServer()

In [48]:
try:
    chat_server.start()
except KeyboardInterrupt:
    chat_server.shutdown()

In [45]:
chat_server.shutdown()

KeyError: "<socket.socket fd=53, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 1234)> is not registered"

### asyncio

协程是一种用户空间实现调度的并发方法，python的协程是非抢占式调度，只有一个协程主动让出控制权，另外一个协程才会被调度

In [49]:
import asyncio

In [50]:
@asyncio.coroutine
def sleep(x):
    yield asyncio.sleep(x)

In [51]:
async def sleep(x):
    await asyncio.sleep(x)

threading.Thread(target=func, args=(), name='', daemon=True).start()

event loop

In [52]:
loop = asyncio.get_event_loop()

In [53]:
loop

<_UnixSelectorEventLoop running=False closed=False debug=False>

In [54]:
loop.run_until_complete(sleep(3))

In [58]:
class AsyncContextManager:
    async def __aenter__(self):
        print('aenter')
        
    async def __aexit__(self, *args):
        print('async exit')

In [59]:
async def co():
    async with AsyncContextManager():
        print('body')

In [60]:
loop.run_until_complete(co())

aenter
body
async exit


- 协程只能通过event loop 来跑
- async with 只能在协程里

In [61]:
help(loop.call_soon)

Help on method call_soon in module asyncio.base_events:

call_soon(callback, *args) method of asyncio.unix_events._UnixSelectorEventLoop instance
    Arrange for a callback to be called as soon as possible.
    
    This operates as a FIFO queue: callbacks are called in the
    order in which they are registered.  Each callback will be
    called exactly once.
    
    Any positional arguments after the callback will be passed to
    the callback when it is called.



In [62]:
h = loop.call_soon(lambda x: x*x, 3)

In [63]:
h = loop.call_later(3, lambda x: x*x, 3)

In [None]:
loop.call_at

In [64]:
async def echo_handle(r, w):
    message = await r.read(1024)
    print(message)
    w.write(message)
    await w.drain()
    w.close()

In [67]:
server  = asyncio.start_server(echo_handle, host='127.0.0.1', port=9001, loop=loop)

In [68]:
loop.run_until_complete(server)

<Server sockets=[<socket.socket fd=53, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 9001)>]>

In [69]:
loop.run_forever()

b'sdfdsf\r\n'


KeyboardInterrupt: 