#### 目的
实现真正的异步调用
```ts
import { readFile } from 'fs';
readFile('/etc/hosts', (err, data) => {
  console.log(1);
});
console.log(2);

// output 2 1
```

#### 下层协程调度器包含以下几个部分
- 事件循环(Event Loop)
- 事件队列(Event Queue)
  - 定时器(Timer)
  - I/O(async I/O)

#### 用阻塞的 socket 实现一次网络 IO
- socket.connect() blocking call
- socket.send()
- socket.recv() blocking call

#### 一个用非阻塞 socket 发起一个 HTTP 请求的例子

In [None]:
import socket
import urllib.parse
import selectors
from selectors import EVENT_WRITE, EVENT_READ

url = "http://antispam-gaojunkang.devops.sl.sit.xiaohongshu.com/moss/api/algorithms"

# parse url
urlParsed = urllib.parse.urlparse(url)
selector = selectors.DefaultSelector()
# get a non-blocking socket
sock = socket.socket()
sock.setblocking(False)

# on connected
def connected():
  # unregister write event(connected)
  selector.unregister(sock.fileno())
  # register read event, it will be triggered when socket is readable(response data)
  selector.register(sock.fileno(), EVENT_READ, responded)
  # send request
  sock.send(
    (f"""GET {urlParsed.path if urlParsed.path != '' else '/'}{'?' if urlParsed.query != '' else '' + urlParsed.query} HTTP/1.0\r\n""" +
      f"""Host: {urlParsed.hostname}\r\n\r\n""")
    .encode('ascii')
  )

# init response data buffer in closure scope
responseData = bytes()
# on responded
def responded():
  global responseData
  # read a chunk of data
  chunk = sock.recv(4096)
  if chunk:
    # response data is not complete, append it to buffer
    responseData += chunk
  else:
    # response data is complete, unregister read event(response data)
    selector.unregister(sock.fileno())
    
    # process response data
    print(responseData.decode('utf-8'))

# register write event, it will be triggered when socket is writable(connected)
selector.register(sock.fileno(), EVENT_WRITE, connected)
try:
  sock.connect((urlParsed.hostname, urlParsed.port if urlParsed.port != None else 80))
except BlockingIOError as e:
  # [Errno 36] Operation now in progress
  # In non-blocking mode, connect returns immediately, regardless of whether the
  # connection is established, and throwing an exception is the expected behavior
  pass




在 selector 上等待三次事件发生:

In [None]:
events = selector.select()
for eventKey, _ in events:
  eventKey.data()

In [None]:
events = selector.select()
for eventKey, _ in events:
  eventKey.data()

events = selector.select()
for eventKey, _ in events:
  eventKey.data()

但更优雅的方式是像下面这样做，而这就是事件循环:

In [None]:
while True:
  # check if there is any event
  if len(selector.get_map()) == 0:
    break
  
  # i/o waiting
  for eventKey, _ in selector.select():
    eventKey.data()

封装一下我们的事件循环:

In [None]:
import selectors

class Loop:
  def __init__(self) -> None:
    self.selector = selectors.DefaultSelector()
    
  def loop(self) -> None:
    while True:
      if len(self.selector.get_map()) == 0: break
      for eventKey, _ in self.selector.select():
        eventKey.data()
        
globalLoop = Loop()

封装一下我们的 HTTP GET 方法:

In [None]:
import socket
import urllib.parse
from selectors import EVENT_WRITE, EVENT_READ
from typing import Callable

def asyncGet(url: str, callback: Callable[[str], None]):
  urlParsed = urllib.parse.urlparse(url)
  # difference
  # selector = selectors.DefaultSelector()
  selector = globalLoop.selector
  sock = socket.socket()
  sock.setblocking(False)

  def connected():
    selector.unregister(sock.fileno())
    selector.register(sock.fileno(), EVENT_READ, responded)
    sock.send(
          (f"""GET {urlParsed.path if urlParsed.path != '' else '/'}{'?' if urlParsed.query != '' else '' + urlParsed.query} HTTP/1.0\r\n""" +
            f"""Host: {urlParsed.hostname}\r\n\r\n""")
          .encode('ascii')
    )

  responseData = bytes()
  def responded():
    nonlocal responseData
    chunk = sock.recv(4096)
    if chunk:
      responseData += chunk
    else:
      selector.unregister(sock.fileno())
      # difference
      # print(responseData.decode('utf-8'))
      callback(responseData.decode('utf-8'))

  selector.register(sock.fileno(), EVENT_WRITE, connected)
  try:
    sock.connect((urlParsed.hostname, urlParsed.port if urlParsed.port != None else 80))
  except BlockingIOError as e:
    pass 


我们实现了吗？
```ts
import { readFile } from 'fs';
readFile('/etc/hosts', (err, data) => {
  console.log(1);
});
console.log(2);

// output 2 1
```

In [None]:
asyncGet("http://antispam-gaojunkang.devops.sl.sit.xiaohongshu.com/moss/api/algorithms", lambda data: print(1))
print(2)

# difference between js and python
globalLoop.loop()

#### 我们为什么需要事件队列

- 在复杂的调度场景下, 使用事件队列来更容易地组织我们的调度逻辑
  - 宏任务
    - **setTimeout / setInterval, 定时器优先队列(最近时间最小堆)**
    - **I/O callback, I/O 事件队列**
  - 微任务队列(promise callback/ MutationObserver callback)


In [None]:
from collections import deque
from typing import Callable

class EventQueue:
  def __init__(self) -> None:
    self.queue = deque()
  
  def enqueue(self, callback: Callable[[], None]):
    self.queue.append(callback)
    
  def dequeue(self) -> Callable[[], None]:
    return self.queue.popleft()

globalEventQueue = EventQueue()

**第一个例子**: 为了避免护航效应

在异步接口 I/O 结束后, 将开发者的回调函数推到事件队列:

In [None]:
import socket
import urllib.parse
from selectors import EVENT_WRITE, EVENT_READ
from typing import Callable

def asyncGet(url: str, callback: Callable[[str], None]):
  urlParsed = urllib.parse.urlparse(url)
  selector = globalLoop.selector
  sock = socket.socket()
  sock.setblocking(False)

  def connected():
    selector.unregister(sock.fileno())
    selector.register(sock.fileno(), EVENT_READ, responded)
    sock.send(
          (f"""GET {urlParsed.path if urlParsed.path != '' else '/'}{'?' if urlParsed.query != '' else '' + urlParsed.query} HTTP/1.0\r\n""" +
            f"""Host: {urlParsed.hostname}\r\n\r\n""")
          .encode('ascii')
    )

  responseData = bytes()
  def responded():
    nonlocal responseData
    chunk = sock.recv(4096)
    if chunk:
      responseData += chunk
    else:
      selector.unregister(sock.fileno())
      # here is the difference
      # callback(responseData.decode('utf-8'))
      globalEventQueue.enqueue(lambda: callback(responseData.decode('utf-8')))

  selector.register(sock.fileno(), EVENT_WRITE, connected)
  try:
    sock.connect((urlParsed.hostname, urlParsed.port if urlParsed.port != None else 80))
  except BlockingIOError as e:
    pass 


我们的事件循环也要负责调度事件队列中的回调函数了:

In [None]:
import selectors

class Loop:
  def __init__(self) -> None:
    self.selector = selectors.DefaultSelector()
    
  def loop(self) -> None:
    while True:
      # difference
      # check event queue
      if len(globalEventQueue.queue) > 0:
        globalEventQueue.dequeue()()

      # I/O waiting
      if len(self.selector.get_map()) > 0:
        for eventKey, _ in self.selector.select():
          eventKey.data()
        
      if len(self.selector.get_map()) == 0 and len(globalEventQueue.queue) == 0: break
        
globalLoop = Loop()

**第二个例子**: 最近时间最小堆实现的 timer 优先队列

抽象一个 timer, 它仅仅保存一个回调函数和一个时间戳:

In [None]:
import time

class Timer:
  def __init__(self, timeout: float, callback: Callable) -> None:
    self.end = time.time() + timeout
    self.callback = callback

  def isTimeout(self) -> bool:
    return time.time() >= self.end

  def getCallback(self):
    return self.callback
  
  def __lt__(self, other: 'Timer') -> bool:
    return self.end < other.end


为了让 timer 能够被调度, 我们需要一个 timer 优先队列:

In [None]:
# timer heap impl
from typing import Callable, Any, List
import heapq

class MinHeap:
    def __init__(self):
        self.__data: List[Any] = []

    def push(self, item: Any):
        heapq.heappush(self.__data, item)

    def pop(self):
        if len(self.__data) == 0:
            return None
        return heapq.heappop(self.__data)

    def peek(self):
        if len(self.__data) == 0:
            return None
        return self.__data[0]

class TimerHeap:
    __single = None

    @classmethod
    def getInstance(cls) -> "TimerHeap":
        if TimerHeap.__single is None:
            TimerHeap.__single = TimerHeap()
        return TimerHeap.__single

    def __init__(self) -> None:
        self.__heap: MinHeap = MinHeap()

    def pushTimer(self, timer: "Timer") -> "TimerHeap":
        return self.__heap.push(timer)

    def popTimer(self) -> "Timer":
        return self.__heap.pop()

    def peekTimer(self) -> "Timer":
        return self.__heap.peek()


globalTimerHeap = TimerHeap.getInstance()

修改我们的事件循环, 使得它能够调度 timer 队列中的 timer:

In [None]:
import selectors

class Loop:
  def __init__(self) -> None:
    self.selector = selectors.DefaultSelector()
    
  def loop(self) -> None:
    while True:
      # difference
      # check timer queue
      if globalTimerHeap.peekTimer() is not None and globalTimerHeap.peekTimer().isTimeout():
        globalTimerHeap.popTimer().getCallback()()
      
      # check event queue
      if len(globalEventQueue.queue) > 0:
        globalEventQueue.dequeue()()

      # I/O waiting
      if len(self.selector.get_map()) > 0:
        for eventKey, _ in self.selector.select():
          eventKey.data()
        
      if len(self.selector.get_map()) == 0 and len(globalEventQueue.queue) == 0 and globalTimerHeap.peekTimer() is None: break
        
globalLoop = Loop()

实现一个 api 用来向 timer 队列中添加 timer:
类似 JavaScript 的 setTimeout
```ts
setTimeout(() => {
  console.log('setTimeout');
}, 0);
```

In [None]:
def setTimeout(callback: Callable, timeout: int) -> None:
  globalTimerHeap.pushTimer(Timer(timeout / 1000, callback))

试一试:

In [None]:
asyncGet("http://antispam-gaojunkang.devops.sl.sit.xiaohongshu.com/moss/api/algorithms", lambda data: print('http request response'))
setTimeout(lambda: print("timer 1s timeout"), 5000)

globalLoop.loop()

#### 参考
- 达夫设备，在 C 中实现协程 https://www.chiark.greenend.org.uk/~sgtatham/coroutines.html

- 知乎灵剑 https://www.zhihu.com/people/ling-jian-94

- 500 lines or less，实现单线程并发爬虫 http://aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html

- stackful 和 stackless 协程 https://www.zhihu.com/question/65647171/answer/233495694

- 完整实现, 包含较完整的 promise 实现 https://github.com/Drincann/py-coroutine