# Introduction
Key Words: coroutines; event loops; tasks

## 普通函数和协程
以做菜为比喻.
* 普通函数
1. 以烘焙蛋糕为例, 从开始到结束，按照食谱一步步进行，中途不停止，直到蛋糕烤好之前，不能做其他事情。在烤蛋糕的整个过程中，注意力完全集中在烤蛋糕上，无法同时做别的事。
2. 普通函数特点: 一旦开始执行，就会按照顺序依次完成，中间不会中断去执行其他任务，直到执行完当前函数，才能继续执行后续的代码

* 协程

1. 协程好比做一顿包含多个菜肴的饭。开始做第一个菜，但可以在等待第一个菜烹饪的过程中，暂停它，去开始做第二个菜。这样就可以同时管理多个菜肴，根据需要在它们之间切换，高效地利用时间和资源。
2. 协程能够实现多任务处理。它不像普通函数那样一直执行下去，而是在等待某些操作（比如 I/O 操作）完成时，可以暂停当前任务，去执行其他任务，等到等待的操作完成后再回来继续执行之前暂停的任务，从而可以在多个任务之间灵活切换，充分利用系统资源，提高效率。

## 事件循环(Event loop)
事件循环就像是一个乐队的指挥，确保每个音乐家（或者说是任务）知道何时开始演奏以及何时暂停。在编程的语境中，事件循环负责监督不同任务的执行，逐一处理它们，使得任何一个任务都不会阻塞整个程序。
![image.png](../assets/basic_event_loop.webp)

## Promise-Future模型
asyncio中默认的通信模型。一般意义上的Promise-Future由Promise和Future两个对象组成，它原本是一种多线程中的模型，代表一次异步通信过程：发送方通过Promise对象发送消息，接收方可以随时通过Future对象在合适的接受结果，或者等待结果到来。

Future指一个只读的值的容器，这个值可能立即可用，也可能在未来某个时间可用。而Promise则是一个只能写入一次的对象。每个Promise关联一个Future，对Promise的写入会令Future的值可用。我们只讨论Promise和Future一对一的场景，在这个场景中Future就是值，而Promise是产生值的方法。

Future和Promise来源于函数式语言，其目的是分离一个值和产生值的方法，从而简化异步代码的处理。
[Future与Promise](https://fuzhe1989.github.io/2018/01/30/future-promise/)




In [1]:
import asyncio

async def coroutine_func(number: int) -> int:
    return number * 2

def normal_func(number: int) -> int:
    return number * 2

norm_res = normal_func(3)

# 协程返回一个协程对象, 还未运行。这个协程对象可稍后由事件循环来调度和执行
# 这种定义和运行的机制使得任务管理更加灵活和高效
coro_res = coroutine_func(3)

print(f"Normal result is {norm_res} and the type is {type(norm_res)}")
print(f"Coroutine result is {coro_res} and the type is {type(coro_res)}")

Normal result is 6 and the type is <class 'int'>
Coroutine result is <coroutine object coroutine_func at 0x7f8f00c0bcc0> and the type is <class 'coroutine'>


In [6]:
async def fetch_data(data_id: int) -> str:
    print(f"Fetch data for ID {data_id}")
    await asyncio.sleep(2)      # 模拟网络延迟
    print(f"Data fetched for ID {data_id}")
    return f"Data {data_id}"

async def compute_result(value: int) -> int:
    await asyncio.sleep(1)      # 模拟计算延迟
    return value * 2

async def process_data() -> None:
    # When the `await` keyword is used, the coroutine paused, allowing other operations to run
    # Once the awaited task completes, execution resumes from where it left off
    data = await fetch_data(1)
    result = await compute_result(data)
    print(f"Result: {result}")
    print(f"Processed Data: {data}")

# Jupyter本省已经运行了一个事件循环, 所以在Jupyter里不能直接使用asyncio.run()
# asyncio.run(process_data())
# asyncio.run(协程对象)
await process_data()

Fetch data for ID 1
Data fetched for ID 1
Result: Data 1Data 1
Processed Data: Data 1


In [7]:
async def serialized_fetch() -> None:
    await fetch_data(1)
    await fetch_data(2)
    await fetch_data(3)

import time
start_time = time.time()
await serialized_fetch()
print(f"It tooks {time.time() - start_time}")

Fetch data for ID 1
Data fetched for ID 1
Fetch data for ID 2
Data fetched for ID 2
Fetch data for ID 3
Data fetched for ID 3
It tooks 6.004991054534912


In [13]:
async def parallel_fetch() -> None:
    items = [1, 2, 3]
    tasks = [asyncio.create_task(fetch_data(item)) for item in items]
    # task1 = asyncio.create_task(fetch_data(1))
    # task2 = asyncio.create_task(fetch_data(2))
    # task3 = asyncio.create_task(fetch_data(3))

    # 等待所有任务完成
    # await asyncio.gather(task1, task2, task3)
    await asyncio.gather(*tasks)

    # or await all task
    # await task1
    # await task2
    # await task3

start_time = time.time()
await parallel_fetch()
print(f"It tooks {time.time() - start_time}")

Fetch data for ID 1
Fetch data for ID 2
Fetch data for ID 3
Data fetched for ID 1
Data fetched for ID 2
Data fetched for ID 3
It tooks 2.004282236099243


In [24]:
import random
import time

class Resource:

    def __enter__(self):
        print(f"Initialize resources ...")
        time.sleep(2)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print(f"Cleaning resources ...")
        time.sleep(2)

    # async def process(self, item):
    def process(self, item):
        print(f"Processing task: {item}")
        proc_time = random.uniform(1.0, 2.0)
        time.sleep(proc_time)
        return f"Finished task: {item}, It took {proc_time:.2f} seconds"

async def processor() -> None:
    items = ['Task1', 'Task2', 'Task3']
    with Resource() as rs:
        tasks = [asyncio.create_task(rs.process(item)) for item in items]
        await asyncio.gather(*tasks)

await processor()

Initialize resources ...
Processing task: Task1
Cleaning resources ...


TypeError: a coroutine was expected, got 'Finished task: Task1, It took 1.26 seconds'

In [None]:
import random
import time

# 关于异步上下文管理器和一步迭代器, 可以阅读: https://bbc.github.io/cloudfit-public-docs/asyncio/asyncio-part-3.html
# 更深入的实现, 阅读源码
class AsyncResource:

    async def __aenter__(self):
        print(f"Initialize resources ...")
        await asyncio.sleep(2)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"Cleaning resources ...")
        await asyncio.sleep(2)

    async def process(self, item):
        print(f"Processing task: {item}")
        proc_time = random.uniform(1.0, 2.0)
        await asyncio.sleep(proc_time)
        return f"Finished task: {item}, It took {proc_time:.2f} seconds"

async def processor() -> None:
    items = ['Task1', 'Task2', 'Task3']
    async with AsyncResource() as rs:
        tasks = [asyncio.create_task(rs.process(item)) for item in items]
        await asyncio.gather(*tasks)

await processor()

Initialize resources ...
Processing task: Task1
Processing task: Task2
Processing task: Task3
Cleaning resources ...


1. 下面是一个基于Generator来实现“暂停-恢复”机制的demo, 理解yield如何让出控制权
2. 现在Python版本是基于asyncio event loop的协程实现

In [44]:
from collections import deque
import time
import types

# 通过调度器管理多个协程交替执行
class Scheduler:
    def __init__(self):
        self.ready = deque()
        self.sleeping = []
        self.current = None

    def new_task(self, coro):
        self.ready.append(coro)

    def run(self):
        """运行事件循环"""
        while self.ready or self.sleeping:
            if not self.ready:
                # print(f"No self.ready")
                self.sleeping.sort(key=lambda t: t[0])
                deadline, coro = self.sleeping.pop(0)
                delta = deadline - time.time()
                if delta > 0:
                    time.sleep(delta)
                self.ready.append(coro)

            self.current = self.ready.popleft()
            try:
                # 生成器(Generator)不仅可以yield值, 还可以接受外部传入的值, send()方法就是向生成器发送数据的
                # send(None): 相当于next(generator), 用于启动生成器
                # send(value): 恢复生成器并传入一个值(这个值会成为yield表达式的结果)
                result = self.current.send(None)

                if isinstance(result, types.GeneratorType):
                    # print(f"Enter types.GeneratorType")
                    self.new_task(result)
                elif isinstance(result, Sleep):
                    # print(f"Enter Sleep, result.util = {result.until}, self.current = {self.current}")
                    self.sleeping.append((result.until, self.current))
                else:
                    self.new_task(self.current)
            except StopIteration:
                pass

    def sleep(self, delay):
        return Sleep(time.time() + delay)

# Sleep类似于asyncio.sleep
class Sleep:
    """休眠请求封装"""
    def __init__(self, until):
        self.until = until

def countdown(n):
    while n > 0:
        print(f"倒计时: {n}")
        # yield类似于await, 通过yield暂停和恢复执行
        # 1s后唤醒countdown协程
        yield sched.sleep(5)
        n -= 1

def countup(stop):
    x = 0
    while x < stop:
        print(f'计数: {x}')
        # 0.5s后唤醒countup协程
        yield sched.sleep(1)  # 协程暂停0.5秒
        x += 1

# 创建调度器实例
sched = Scheduler()

# 添加任务
sched.new_task(countup(5))
sched.new_task(countdown(5))

sched.run()


计数: 0
倒计时: 5
计数: 1
计数: 2
计数: 3
计数: 4
倒计时: 4
倒计时: 3
倒计时: 2
倒计时: 1
