\
            # 39. asyncio 实战（asyncio Practical Guide）

            围绕任务调度、取消、超时、队列、并发限制与“桥接阻塞代码”，给出可复用的 asyncio 模板。
注：Notebook 通常支持顶层 await；如在脚本中请改为 asyncio.run(main())。

            > 约定：Python 3.8；示例尽量只用标准库；代码块可直接运行（第三方依赖会做可选降级）。


## 前置知识

- 协程与 async/await
- 异常基础
- 并发基础


## 知识点地图

- 1. Task：并发执行的基本单位
- 2. 超时与取消：必须显式处理
- 3. Queue + worker：背压与吞吐控制
- 4. 并发限制：Semaphore / 限流模式
- 5. 桥接阻塞代码：run_in_executor


## 自检清单（学完打勾）

- [ ] 会用 create_task/gather 组织并发
- [ ] 知道取消（CancelledError）与超时（wait_for）怎么处理
- [ ] 会用 asyncio.Queue 做生产者-消费者与背压
- [ ] 知道如何把阻塞函数放进线程池（run_in_executor）


In [None]:
\
from pathlib import Path

ART = Path('_nb_artifacts')
ART.mkdir(exist_ok=True)
print('artifacts dir:', ART.resolve())


## 知识点 1：Task：并发执行的基本单位

- 协程对象要么被 `await`，要么被封装成 Task 才会运行。
- `asyncio.create_task(coro())` 立即安排执行；`await task` 获取结果。
- `asyncio.gather(*tasks)` 汇总结果；可用 `return_exceptions=True` 收集异常。


In [None]:
import asyncio


async def job(i):
    await asyncio.sleep(0.03)
    return i


async def main():
    tasks = [asyncio.create_task(job(i)) for i in range(5)]
    print(await asyncio.gather(*tasks))


await main()


## 知识点 2：超时与取消：必须显式处理

- `asyncio.wait_for(coro, timeout)` 超时会取消内部任务并抛 `TimeoutError`。
- 取消会抛 `CancelledError`（3.8 中它继承自 Exception，要注意捕获范围）。
- 最佳实践：在 finally 里释放资源（连接、文件、锁）。


In [None]:
import asyncio


async def slow():
    try:
        await asyncio.sleep(10)
    finally:
        print('cleanup')


async def main():
    try:
        await asyncio.wait_for(slow(), timeout=0.05)
    except asyncio.TimeoutError:
        print('timeout')


await main()


## 知识点 3：Queue + worker：背压与吞吐控制

- `asyncio.Queue(maxsize=...)`：队列满时 `put` 会等待 -> 背压。
- 固定数量 worker 消费队列，相当于“并发上限”。
- 用 sentinel（例如 None）通知 worker 退出。


In [None]:
import asyncio


async def worker(name, q: asyncio.Queue):
    while True:
        item = await q.get()
        try:
            if item is None:
                return
            await asyncio.sleep(0.02)
            print(name, 'handled', item)
        finally:
            q.task_done()


async def main():
    q = asyncio.Queue(maxsize=3)
    workers = [asyncio.create_task(worker(f'w{i}', q)) for i in range(2)]

    for i in range(10):
        await q.put(i)
    for _ in workers:
        await q.put(None)

    await q.join()
    await asyncio.gather(*workers)
    print('done')


await main()


## 知识点 4：并发限制：Semaphore / 限流模式

- `Semaphore(N)` 常用于“同一时间最多 N 个请求”。
- 和 Queue-worker 相比：更适合“直接并发发起”的场景（gather 很多任务）。
- 限流不是为了慢，而是为了稳定（避免打爆下游）。


## 知识点 5：桥接阻塞代码：run_in_executor

在 asyncio 中直接调用阻塞函数会卡住事件循环。

3.8 推荐：
- `loop.run_in_executor(None, blocking_fn, ...)`：扔到默认线程池执行。

注意：线程池也要限流，避免无限提交导致内存暴涨。


In [None]:
import asyncio
import time


def blocking(i):
    time.sleep(0.05)
    return i * 2


async def main():
    loop = asyncio.get_running_loop()
    results = await asyncio.gather(*[loop.run_in_executor(None, blocking, i) for i in range(5)])
    print(results)


await main()


## 常见坑

- 忘记 await：协程对象创建了但从未执行
- 一把抓 Exception 把 CancelledError 吃掉：导致无法取消
- 无限 create_task：任务堆积导致内存/句柄爆
- 在事件循环里跑 CPU 重任务：吞吐骤降、延迟飙升


## 综合小案例：“并发抓取（模拟）+ 重试 + 超时 + 限流”模板

把它当作“批量调用外部 API/爬虫”的骨架：
- 最大并发 5
- 单次请求超时 80ms
- 可重试 2 次（指数退避）


In [None]:
import asyncio
import random


async def fake_fetch(url: str):
    await asyncio.sleep(random.uniform(0.01, 0.06))
    if random.random() < 0.25:
        raise TimeoutError('network glitch')
    return {'url': url, 'status': 200}


async def fetch_with_retry(url: str, sem: asyncio.Semaphore, *, retries=2):
    for attempt in range(1, retries + 2):
        async with sem:
            try:
                return await asyncio.wait_for(fake_fetch(url), timeout=0.08)
            except (asyncio.TimeoutError, TimeoutError):
                await asyncio.sleep(0.02 * (2 ** (attempt - 1)))
    return {'url': url, 'status': 'failed'}


async def main():
    sem = asyncio.Semaphore(5)
    urls = [f'https://example/{i}' for i in range(20)]
    results = await asyncio.gather(*[fetch_with_retry(u, sem) for u in urls])
    ok = sum(1 for r in results if r['status'] == 200)
    print('ok', ok, 'failed', len(results) - ok)


await main()


## 自测题（不写代码也能回答）

- Task 和 coroutine object 的区别是什么？
- 为什么超时通常会触发取消？怎么保证资源被释放？
- 如何避免 asyncio 中“无限创建任务”的问题？


## 练习题（建议写代码）

- 给小案例加“结果缓存”（同一个 URL 只抓一次）。
- 为小案例加入“熔断”：连续失败 N 次暂停一段时间。
- 把 fake_fetch 换成真实 HTTP 请求，并区分连接超时/读取超时（了解）。
