In [118]:
import time
from typing import Callable
from collections import deque
import itertools
import heapq
import random
import threading

# 自制 `Event Loop`

In [119]:
class EventLoop:
    def __init__(self):
        self._ready = deque()
        self._scheduled = []
        self._stopping = False

    def call_soon(self, callback: Callable, *args):
        self._ready.append((callback, args))

    def call_later(self, delay: float, callback: Callable, *args):
        # heap push 会按照时间顺序升序排序
        heapq.heappush(self._scheduled, (time.time() + delay, callback, args))

    def stop(self):
        self._stopping = True

    def run_once(self):
        # 处理定时任务
        now = time.time()
        if self._scheduled:
            # 如果时间到了，加到就绪队列中
            if self._scheduled[0][0] < now:
                _, callback, args = heapq.heappop(self._scheduled)
                self._ready.append((callback, args))

        # 执行就绪队列中的任务
        num = len(self._ready)
        for i in range(num):
            callback, args = self._ready.popleft()
            callback(*args)

    def run_forever(self):
        while True:
            self.run_once()
            if self._stopping:
                break

In [120]:
# 定义辅助 class, 用来包裹 yield 内容, 实现 yield from 调用原本 yield 的内容
class Future:
    def __init__(self):
        global loop
        self._loop = loop
        self._result = None
        self._done = False
        self._callbacks = []

    # 设置结果
    def set_result(self, result):
        if self._done:
            raise RuntimeError('future is already done')
        self._result = result
        self._done = True

        for callback in self._callbacks:
            self._loop.call_soon(callback)

    # 获取结果
    def result(self):
        if self._done:
            return self._result
        else:
            raise RuntimeError('future is not done yet')

    # 添加回调函数
    def add_done_callback(self, callback):
        self._callbacks.append(callback)

    def __await__(self):
        yield self
        return self._result

In [121]:
def fake_io_read(future):
    """使用另一个线程来模型IO读取,不会阻塞主线程"""

    def read():
        time.sleep(random.random()) # IO阻塞
        future.set_result(random.randint(0, 100)) # 设置结果
    threading.Thread(target=read).start() # 启动线程

In [122]:
async def small_step():
    global loop
    future = Future()

    # 指派一个目标来执行 set_result() 方法, 必须在 await 之前绑定 set_result() 方法
    fake_io_read(future)

    result = await future

    return result

In [123]:
async def big_step():
    """一个大步骤"""
    ... # 其他小步骤
    print("     begin small_step")

    small_result = await small_step()

    print("     end small_step")
    return small_result * 1000

In [124]:
async def one_task():
    """一个任务例子"""
    print("begin task")
    ... # 其他步骤
    print("     begin big_step")

    big_result = await big_step()

    print(f"     end big_step with {big_result}")
    ... # 其他步骤
    print("end task")

In [125]:
task_id_counter = itertools.count(1)

In [126]:
class Task(Future):
    def __init__(self, coro) -> None:
        super().__init__()
        self.coro = coro
        self._id = f"Task-{next(task_id_counter)}"
        self._loop.call_soon(self.run)
        self._start_time = time.time()

    def run(self):
        print(f"----- { self._id } starting -----")
        if not self._done:
            try:
                x = self.coro.send(None)
            except StopIteration as e:
                self.set_result(e.value)
                # 统计阻塞时间
                global total_block_time
                total_block_time += time.time() - self._start_time
            else:
                assert isinstance(x, Future)
                # Task 想知道我什么时候可以恢复执行, Future 完成时通知我(self.run)
                x.add_done_callback(self.run)
        else:
            print("Task is done")
        print(f"----- { self._id } ending -----")


In [127]:
def unitl_all_done(tasks):
    tasks = [t for t in tasks if not t._done]
    if tasks:
        loop.call_soon(unitl_all_done, tasks)
    else:
        loop.stop()

In [128]:
loop = EventLoop()

all_tasks = [Task(one_task()) for i in range(10)]

total_block_time = 0
start_time = time.time()

# 运行 1 秒后停止
loop.call_later(1, unitl_all_done, all_tasks)

loop.run_forever()

print(total_block_time, time.time() - start_time)
# 根据打印可以看到开始执行时是有顺序的，不过后面执行时就按照 sleep time 顺序执行了。

----- Task-1 starting -----
begin task
     begin big_step
     begin small_step
----- Task-1 ending -----
----- Task-2 starting -----
begin task
     begin big_step
     begin small_step
----- Task-2 ending -----
----- Task-3 starting -----
begin task
     begin big_step
     begin small_step
----- Task-3 ending -----
----- Task-4 starting -----
begin task
     begin big_step
     begin small_step
----- Task-4 ending -----
----- Task-5 starting -----
begin task
     begin big_step
     begin small_step
----- Task-5 ending -----
----- Task-6 starting -----
begin task
     begin big_step
     begin small_step
----- Task-6 ending -----
----- Task-7 starting -----
begin task
     begin big_step
     begin small_step
----- Task-7 ending -----
----- Task-8 starting -----
begin task
     begin big_step
     begin small_step
----- Task-8 ending -----
----- Task-9 starting -----
begin task
     begin big_step
     begin small_step
----- Task-9 ending -----
----- Task-10 starting -----
begin ta