#### threading

---

##### Event

In [None]:
from threading import Event, Thread
import time


def countdown(number: int, event: Event) -> None:
    print(f"Countdown from {number} started")
    for index in range(number):
        print(f"Countdown: {number - index}")
        time.sleep(1)
    print(f"Countdown from {number} finished")
    event.set()


event = Event()

thread = Thread(target=countdown, args=(5, event))
thread.start()

event.wait()
print("Countdown completed, main thread continues")

event.clear()

##### Condition

In [None]:
from threading import Condition, Thread
import time


def produce(condition: Condition, items: list) -> None:
    for item in range(5):
        with condition:  # ✅ 每次生产时短暂持有锁
            items.append(item)
            print(f"Produced {item}")
            condition.notify()
        time.sleep(2)


def consume(condition: Condition, items: list) -> None:
    while True:  # ✅ 持续消费
        with condition:
            while not items:
                print("等待生产...")
                condition.wait()
            item = items.pop(0)
            print(f"Consumed {item}")


items = []
condition = Condition()

producer_thread = Thread(target=produce, args=(condition, items))
consumer_thread = Thread(target=consume, args=(condition, items))

producer_thread.start()
consumer_thread.start()

producer_thread.join()

##### Lock

In [None]:
from threading import Lock, Thread
import time
from typing import Any


def process(source: Any, lock: Lock) -> None:
    with lock:
        print(f"Processing {source}")
        time.sleep(5)
        print(f"Finished processing {source}")


def get(source: Any, lock: Lock) -> None:
    while True:
        if lock.locked():
            print(f"Wait {source}")
            time.sleep(1)
        else:
            print(f"Get {source}")
            break


lock = Lock()
source = "Data Source"
process_thread = Thread(target=process, args=(source, lock))
get_thread = Thread(target=get, args=(source, lock))
process_thread.start()
get_thread.start()
process_thread.join()
get_thread.join()

##### RLock

In [None]:
from threading import RLock, Thread
import time


def process_data(source: str, lock: RLock) -> None:
    with lock:  # 第一次获取锁
        print(f"🔒 [Process] 开始处理 {source}")
        time.sleep(1)

        # 嵌套调用需要同一把锁的函数
        save_result(source, lock)  # ⭐ 这里需要再次获取锁
        print(f"✅ [Process] 完成处理 {source}")


def save_result(source: str, lock: RLock) -> None:
    with lock:  # 第二次获取锁（RLock 允许同一线程重复获取）
        print(f"💾 [Save] 保存 {source} 的结果")
        time.sleep(0.5)


# 使用 RLock 代替 Lock
lock = RLock()
source = "关键数据"

# 启动线程
t = Thread(target=process_data, args=(source, lock))
t.start()
t.join()

##### Lock/RLock

|特性|`Lock` (普通锁)|`RLock` (可重入锁)|
|---|---|---|
|**重入性**|❌ 同一线程无法重复获取锁|✅ 同一线程可多次获取锁|
|**锁的持有者**|不记录持有线程|记录持有线程和递归深度|
|**释放机制**|必须由获取锁的线程释放|必须由获取锁的线程释放，且释放次数需匹配|
|**性能**|更高（简单实现）|略低（需要维护递归计数）|
|**适用场景**|简单的互斥操作|嵌套/递归调用的代码|

##### 安全使用锁

- 所有线程获取锁的顺序要一致
- 使用超时机制 `acquire(timeout=3)`

##### 同时按照顺序获取多个锁的简洁写法

```python
with lock_a, lock_b:
    # 等同于 with lock_a: with lock_b:
    ...
```

##### Barrier

In [None]:
import threading
import time
import random


# 定义阶段数量（每个阶段一个 Barrier）
phase1_barrier = threading.Barrier(
    parties=3, action=lambda: print("\n== 所有线程完成数据加载，开始清洗 ==\n")
)
phase2_barrier = threading.Barrier(
    parties=3, action=lambda: print("\n== 所有线程完成数据清洗，开始分析 ==\n")
)


def data_processing(thread_id):
    # 第一阶段：数据加载
    load_time = random.uniform(2, 4)
    time.sleep(load_time)
    print(f"线程 {thread_id} 数据加载完成（耗时 {load_time:.1f}s）")

    phase1_barrier.wait()  # 等待所有线程完成加载

    # 第二阶段：数据清洗
    clean_time = random.uniform(2, 4)
    time.sleep(clean_time)
    print(f"线程 {thread_id} 数据清洗完成（耗时 {clean_time:.1f}s）")

    phase2_barrier.wait()  # 等待所有线程完成清洗

    # 第三阶段：数据分析
    analysis_time = random.uniform(2, 4)
    time.sleep(analysis_time)
    print(f"线程 {thread_id} 数据分析完成（耗时 {analysis_time:.1f}s）")


# 创建 3 个线程
threads = []
for i in range(1, 4):
    t = threading.Thread(target=data_processing, args=(i,))
    threads.append(t)
    t.start()

# 等待所有线程完成
for t in threads:
    t.join()

print("所有处理阶段完成！")

##### Semaphore

In [None]:
import threading
import time
import random

# 初始化一个信号量，允许最多 3 个线程同时访问
semaphore = threading.Semaphore(3)


def access_resource(thread_id):
    with semaphore:  # 请求信号量，如果信号量计数大于 0，线程才能继续
        print(f"⌛️ Thread-{thread_id} has acquired the resource.")
        time.sleep(random.uniform(2, 4))  # 模拟线程使用资源的时间
        print(f"✅ Thread-{thread_id} has released the resource.")


# 创建并启动 5 个线程
threads = []
for i in range(5):
    t = threading.Thread(target=access_resource, args=(i,))
    t.start()
    threads.append(t)

# 等待所有线程完成
for t in threads:
    t.join()

print("All threads have finished.")

##### BoundedSemaphore

In [None]:
import threading


bounded_semaphore = threading.BoundedSemaphore(3)

try:
    bounded_semaphore.release()
except ValueError as e:
    print(f"Semaphore released failed, {e}")

try:
    bounded_semaphore.acquire(timeout=0.1)
    bounded_semaphore.release()
    print("Semaphore released successfully")
except ValueError as e:
    print(e)

##### Semaphore/BoundedSeamphore

|特性|`Semaphore`|`BoundedSemaphore`|
|---|---|---|
|**最大信号量数**|可以随意增加释放次数，不会抛出异常|不允许释放次数超过初始化的信号量数|
|**`release()` 的限制**|没有限制，可以超过初始计数|`release()` 次数要与 `acquire()` 一致|
|**适用场景**|适合对释放次数没有严格要求的情况|适合需要严格控制释放次数的场景，如防止程序中释放信号量过多导致的问题|

##### Timer

In [None]:
import time
import threading
from typing import Any


def do_something(name: Any, queue: list):
    queue.append(name)


pool = list()
timer = threading.Timer(2, do_something, args=("Alice", pool))
timer.start()

while not pool:
    print("Waiting for Alice...")
    time.sleep(1)
else:
    print(pool)

#### communication tools

---

##### Queue

- `queue.Queue` 普通的 FIFO 队列
- `queue.LifoQueue` LIFO 队列（后进先出）
- `queue.PriorityQueue` 带优先级的队列

In [None]:
import threading
import queue
import time


def produce(production: queue.LifoQueue):
    for goods in range(5):
        time.sleep(1)
        print(f"生产者生产了数据: {goods}")
        production.put(goods)


def consume(production: queue.Queue):
    while True:
        item = production.get()
        if item is None:
            break
        print(f"消费者消费了数据: {item}")
        production.task_done()


production = queue.LifoQueue()
producer_thread = threading.Thread(target=produce, args=(production,))
producer_thread.start()
consumer_thread = threading.Thread(target=consume, args=(production,))
consumer_thread.start()
producer_thread.join()
production.put(None)
consumer_thread.join()
print("所有线程结束")

##### threading.local

In [None]:
import threading
import time
import random

# 创建一个线程局部存储
local_data = threading.local()


def thread_task(thread_id):
    # 每个线程有自己的局部数据
    local_data.value = thread_id
    time.sleep(random.randrange(1, 3))
    print(f"线程 {thread_id} 的局部数据: {local_data.value}")


# 创建多个线程并启动
threads = []
for i in range(5):
    t = threading.Thread(target=thread_task, args=(i,))
    threads.append(t)
    t.start()

# 等待所有线程结束
for t in threads:
    t.join()

print("所有线程结束")

#### multiprocessing

---

##### Pipe

In [None]:
import multiprocessing
import time


def worker(pipe):
    # 进程的工作函数
    data = pipe.recv()
    print(f"Received: {data}")
    pipe.send(f"Processed {data}")


def main():
    # 创建管道
    parent, child = multiprocessing.Pipe()

    # 启动进程
    process = multiprocessing.Process(target=worker, args=(child,))
    process.start()

    # 向进程发送数据，此时子进程会阻塞在 recv() 处
    parent.send("Data from main process")

    # 获取进程处理后的数据
    print(f"Received from worker: {parent.recv()}")

    # 等待进程完成
    process.join()


if __name__ == "__main__":
    # 显式设置启动方式
    multiprocessing.set_start_method("spawn")
    main()

##### Manager

- `Manager` 下的数据结构是进程间共享的

In [None]:
import multiprocessing
import random
import time


def work(share_list, item):
    time.sleep(random.randint(1, 3))
    print(f"I am {item} process, Current list: {share_list}")
    share_list.append(item)


if __name__ == "__main__":
    # Create a list that can be shared between processes
    share_list = multiprocessing.Manager().list()

    # Create a list of processes
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=work, args=(share_list, i))
        processes.append(p)
        p.start()

    # Wait for all processes to finish
    for p in processes:
        p.join()

    print(f"Final list: {share_list}")

##### Pool

| 方法                 | 描述                                   | 返回值                     | 是否异步执行 |
| ------------------ | ------------------------------------ | ----------------------- | ------ |
| `apply()`          | 同步执行单个任务，返回结果                        | 任务的结果                   | ❌      |
| `apply_async()`    | 异步执行单个任务，返回 `AsyncResult` 对象         | `AsyncResult` 对象，稍后获取结果 | ✅      |
| `imap()`           | 并行处理可迭代对象，按顺序返回结果                    | 迭代器                  | ❌      |
| `imap_unordered()` | 并行处理可迭代对象，返回无序结果                     | 迭代器                  | ❌      |
| `join()`           | 等待所有进程完成（通常在调用 `close()` 后使用）        | 无                       | ❌      |
| `map()`            | 并行处理可迭代对象，返回结果（按顺序返回）                | 列表               | ❌      |
| `map_async()`      | 异步执行 `map()`，返回 `AsyncResult` 对象     | `AsyncResult` 对象，稍后获取结果 | ✅      |
| `Process()`        | 直接创建进程并启动，适用于单独的进程任务执行               | 无                       | ✅      |
| `starmap()`        | 并行处理需要多个参数的任务，解包元组参数                 | 列表               | ❌      |
| `starmap_async()`  | 异步执行 `starmap()`，返回 `AsyncResult` 对象 | `AsyncResult` 对象，稍后获取结果 | ✅      |
| `terminate()`      | 立即终止所有进程，强制结束任务                      | 无                       | ✅      |
| `close()`          | 关闭池，不再接受新的任务                         | 无                       | ❌      |

In [None]:
import multiprocessing
import time


def square(x):
    time.sleep(1)
    return x * x


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)

    numbers = [1, 2, 3, 4, 5]

    # 使用 imap_unordered 处理任务（不保证顺序）
    result = pool.imap_unordered(square, numbers)

    for res in result:
        print(res)

    pool.close()
    pool.join()

##### Value/RawValue

|属性|`Value`|`RawValue`|
|---|---|---|
|**类型检查**|提供类型检查，保证数据类型一致|不进行类型检查，存储原始字节数据|
|**内存管理**|自动加锁，适合常规数据类型（如整数、浮动等）|无类型管理，更底层，适合存储原始数据|
|**使用场景**|适用于共享基本数据类型，如整数、浮动数等|适用于需要存储原始数据或更复杂结构的场景|
|**性能**|稍慢（有类型检查）|更快（无类型检查），但需要自己管理数据存储和格式|


In [None]:
import multiprocessing
import time


def increment(shared_value):
    for _ in range(5):
        time.sleep(1)
        with shared_value.get_lock():
            shared_value.value += 1
            print(f"Shared value in process: {shared_value.value}")


if __name__ == "__main__":
    # 禁用自动锁，需要自行保证进程安全
    shared_value = multiprocessing.Value("i", 0, lock=False)

    processes = []
    for _ in range(3):
        p = multiprocessing.Process(target=increment, args=(shared_value,))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    print(f"Final shared value: {shared_value.value}")


##### Array/RawArray

|属性|`Array`|`RawArray`|
|---|---|---|
|**类型检查**|自动提供类型检查，确保数据类型一致|不进行类型检查，直接存储原始字节数据|
|**同步机制**|自动加锁，保证线程安全|不提供自动同步，需要手动加锁|
|**适用场景**|适用于需要线程安全访问的共享数组|适用于需要低级控制和不需要同步机制的共享数据|
|**性能**|稍慢（因为有同步机制）|更快（因为没有同步机制，适合对数据进行更精细的控制）|

##### 自动锁

- 复合语句依旧需要加锁
- 多个变量同步也需要加锁

##### 加锁场景
```python
# 非原子操作
with v.get_lock():
    if v.value > 10:
        v.value -= 5

# 多个共享变量
with v1.get_lock(), v2.get_lock():
    v1.value += v2.value
```

In [None]:
import multiprocessing
import time


def increment(shared_array, index):
    for _ in range(5):
        time.sleep(1)
        shared_array[index] += 1
        print(f"Shared array[{index}] in process: {shared_array[index]}")


if __name__ == "__main__":
    # 启用自动锁
    shared_array = multiprocessing.Array("i", [0, 0, 0], lock=True)

    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=increment, args=(shared_array, i))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    print(f"Final shared array: {shared_array[:]}")

##### SimpleQueue

In [None]:
import multiprocessing
import time


def producer(q):
    for i in range(5):
        time.sleep(1)
        print(f"Producer putting item {i} into queue")
        q.put(i)  # 向队列中放入数据


def consumer(q):
    for _ in range(5):
        item = q.get()  # 从队列中取出数据
        print(f"Consumer got item {item} from queue")
        time.sleep(2)  # 模拟处理时间


if __name__ == "__main__":
    # 创建一个 SimpleQueue 对象
    q = multiprocessing.SimpleQueue()

    # 创建并启动生产者进程和消费者进程
    producer_process = multiprocessing.Process(target=producer, args=(q,))
    consumer_process = multiprocessing.Process(target=consumer, args=(q,))

    producer_process.start()
    consumer_process.start()

    producer_process.join()  # 等待生产者进程完成
    consumer_process.join()  # 等待消费者进程完成

##### Other

- `multiprocessing.Barrier()`
- `multiprocessing.Condition()`
- `multiprocessing.Semaphore()`
- `multiprocessing.BoundedSemaphore()`
- `multiprocessing.Lock()`
- `multiprocessing.RLock()`
- `multiprocessing.Event()`

#### concurrent.futures

---

##### ThreadPoolExecutor

In [None]:
import concurrent.futures
import time


def fetch_data(x):
    print(f"Fetching data for {x}...")
    time.sleep(2)
    return f"Data {x}"


if __name__ == "__main__":
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        result = list(executor.map(fetch_data, [1, 2, 3, 4, 5]))

    print(result)

##### ProcessPoolExecutor

- 等效于 `multiprocessing.Pool()`
- API 更加简洁

In [None]:
import concurrent.futures
import time


def compute_square(x):
    print(f"Computing square for {x}...")
    time.sleep(2)
    return x * x


if __name__ == "__main__":
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        result = list(executor.map(compute_square, [1, 2, 3, 4, 5]))

    print(result)


#### asyncio

---

##### async/await

In [None]:
import asyncio


# 定义一个简单的协程任务
async def hello_world():
    print("Hello")
    await asyncio.sleep(1)  # 模拟一个 I/O 操作
    print("World")


# 运行事件循环
async def main():
    await hello_world()


asyncio.run(main())

In [None]:
import asyncio


async def fetch_data(x):
    print(f"Fetching data for {x}...")
    await asyncio.sleep(2)
    return f"Data {x}"


async def main():
    tasks = [fetch_data(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
    print(results)


asyncio.run(main())

In [None]:
import asyncio


async def task(name, delay):
    if delay == 2:
        raise ValueError(f"Task {name} encountered an error!")
    await asyncio.sleep(delay)
    return f"Task {name} completed"


async def main():
    tasks = [task("A", 1), task("B", 2), task("C", 3)]
    # 使用 gather 时捕获异常
    try:
        results = await asyncio.gather(*tasks)
        print(results)
    except Exception as e:
        print(f"Error occurred: {e}")


asyncio.run(main())

In [None]:
import asyncio


async def periodic_task():
    while True:
        print("Task executed")
        await asyncio.sleep(3)  # 每隔 3 秒执行一次


async def main():
    # 创建周期性任务
    task = asyncio.create_task(periodic_task())
    # 运行 10 秒后停止
    await asyncio.sleep(10)
    task.cancel()  # 取消周期性任务


asyncio.run(main())

In [None]:
import asyncio

import aiohttp


async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()


async def main():
    urls = ["https://www.example.com", "https://www.python.org"]
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(f"Fetched data: {result}...")


asyncio.run(main())

##### Event loop

- 事件循环就是一个在后台不停运行、等待和执行任务的循环
- 事件循环是其核心机制，它负责调度和管理
- 它使得程序能够在执行某个任务时，不会被阻塞，能够同时执行其他任务
- `run_forever()` 必须手动停止
- `call_later()` 的回调函数不能是协程

In [None]:
import asyncio


async def task(name, delay):
    print(f"Task {name} started")
    await asyncio.sleep(delay)
    print(f"Task {name} completed after {delay} seconds")


# 获取事件循环
loop = asyncio.get_event_loop()

# 创建任务
task1 = loop.create_task(task("A", 2))
task2 = loop.create_task(task("B", 1))

# 等待任务完成
loop.run_until_complete(asyncio.gather(task1, task2))

# 关闭事件循环
loop.close()

In [None]:
import asyncio


async def infinite_task():
    while True:
        print("Running forever...")


# 获取事件循环
loop = asyncio.get_event_loop()

# 创建任务
task = loop.create_task(infinite_task())

# 运行 3 秒后停止事件循环
loop.call_later(3, loop.stop)

# 启动事件循环
loop.run_forever()

# 关闭事件循环
loop.close()

##### Queue

In [None]:
import asyncio


# 生产者协程
async def producer(queue):
    for i in range(5):
        print(f"Producer: producing {i}")
        await queue.put(i)  # 将数据放入队列
        await asyncio.sleep(1)


# 消费者协程
async def consumer(queue):
    while True:
        item = await queue.get()  # 从队列中取出数据
        if item is None:  # 如果是 None，说明生产者已结束，退出消费者
            break
        print(f"Consumer: consumed {item}")
        queue.task_done()  # 标记任务完成
        await asyncio.sleep(2)


async def main():
    # 创建一个容量为 3 的队列
    queue = asyncio.Queue(maxsize=3)

    # 创建生产者和消费者任务
    producer_task = asyncio.create_task(producer(queue))
    consumer_task = asyncio.create_task(consumer(queue))

    # 等待生产者完成
    await producer_task
    # 向消费者发送停止信号（None）
    await queue.put(None)
    # 等待消费者完成
    await consumer_task


# 启动事件循环并执行 main 协程
asyncio.run(main())

##### Other

- `asyncio.Barrier()`
- `asyncio.Condition()`
- `asyncio.Semaphore()`
- `asyncio.BoundedSemaphore()`
- `asyncio.Lock()`
- `asyncio.RLock()`
- `asyncio.Event()`

#### [celery](https://docs.celeryq.dev)

---

##### work

- 假如当前为 `work.py`

In [None]:
from celery import Celery

app = Celery(
    "tasks",
    broker="redis://43.156.74.18:6379/0",
    backend="redis://43.156.74.18:6379/0",
)


@app.task
def add(x, y):
    return x + y

##### job

- 当前是 `job.py`

In [None]:
from work import add

result = add.apply_async((4, 6))
print("Task ID:", result.id)
print("Result:", result.get())

##### work/job 必须要分开

- work 进程启动的时候，它要只负责监听任务，不应该执行任务
- producer（你写的提交任务的逻辑）应该是单独运行的进程，去 `apply_async()` 提交任务到队列。
- 放在一起会阻塞

#### [ray](https://www.ray.io)

---

##### cluster

```zsh
# 主节点运行
ray start --head --port=6379

# 工作节点连接到主节点
ray start --address='192.168.1.100:6379'
ray start --address='192.168.1.100:6379' --redis-password='5241590000000000'
```

##### sample

In [None]:
import ray


ray.init()
# ray.init(address="192.168.1.100:6379")


@ray.remote
def multiply(x):
    return x * x


futures = [multiply.remote(i) for i in range(100)]
results = ray.get(futures)
print(results)

ray.shutdown()

#### [dask](https://www.dask.org)

---

##### local cluster

In [2]:
import dask
from dask.delayed import delayed
from dask.distributed import Client, LocalCluster

# 先启动本地集群（4个 worker，每个 worker 1线程）
cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)

# 打印一下当前连接状态
print(client)


# 普通函数
def add(x, y):
    return x + y


def multiply(x, y):
    return x * y


def sum_list(numbers):
    return sum(numbers)


# 用 delayed 包装（告诉 Dask 这些可以并行）
a = delayed(add)(1, 2)
b = delayed(multiply)(a, 10)
c = delayed(sum_list)([b, 5, 6])

# 构建好执行图，但到这里都还没真正执行
print(c)  # 你会看到一个 delayed 对象

# 真正执行（通过本地分布式集群）
result = c.compute()

print("Result:", result)

# 关闭 client（可选）
client.close()

<Client: 'tcp://127.0.0.1:51842' processes=4 threads=4, memory=24.00 GiB>
Delayed('sum_list-9205217b-d908-4400-aa96-6a90a5ebbfdd')
Result: 41


##### remote

|角色|IP地址|
|---|---|
|Scheduler|192.168.1.10|
|Worker 1|192.168.1.11|
|Worker 2|192.168.1.12|

##### 搭建集群

```zsh
# scheduler, display: "Scheduler at: tcp://192.168.1.10:8786"
dask-scheduler

# worker 1/worker 2
dask-worker tcp://192.168.1.10:8786
```

##### 代码连接集群

```python
from dask.distributed import Client

client = Client('tcp://192.168.1.10:8786')

...
```

#### [dramatiq](https://dramatiq.io)

---

##### work

- 建立 `work.py` 文件，定义了 `add()` 方法
- 执行 `dramatiq work`

In [None]:
import dramatiq
from dramatiq.brokers.redis import RedisBroker

# 配置 Redis 作为 broker（默认连接 redis://localhost:6379/0）
redis_broker = RedisBroker(url="redis://43.156.74.18:6379/0")
dramatiq.set_broker(redis_broker)


@dramatiq.actor
def add(x, y):
    print(f"Result: {x + y}")

##### job

- 创建 `job.py` 文件
- 调用方法并运行

In [None]:
from work import add

# 异步发送任务到 Dramatiq
add.send(3, 5)

#### [arq](https://arq-docs.helpmanual.io)

---

##### work

- `work.py` 文件，定义 `add()` 方法
- 远程连接配置需要和 `job.py` 相同

In [None]:
# worker.py
from arq import func
from arq.connections import RedisSettings


async def add(ctx, a, b):
    return a + b


class WorkerSettings:
    functions = [add]
    redis_settings = RedisSettings(host="43.156.74.18", port=6379, database=0)

##### job

- `job.py` 文件，调用 `add()` 方法
- 远程连接配置需要和 `work.py` 相同

In [None]:
import asyncio

from arq import create_pool
from arq.connections import RedisSettings


async def main():
    redis = await create_pool(RedisSettings(host="43.156.74.18", port=6379, database=0))
    job = await redis.enqueue_job("add", 3, 7)
    result = await job.result(timeout=5)  # 等待任务完成并获取结果
    print(f"Got result: {result}")


asyncio.run(main())

#### conclusion

---

##### About Queue

- `queue.Queue()` 是线程队列
- `multiprocessing.Queue()` 是进程间队列
- `multiprocessing.SimpleQueue()` 是进程间的简单队列
- `mutliprocessing.Manager().Queue()` 是进程间共享队列
- `asyncio.Queue()` 协程队列

##### 进程/线程/协程

| **技术** | **资源开销** | **并行能力** | **适用场景**       | **典型用例**        |
| ------ | -------- | -------- | -------------- | --------------- |
| **进程** | 高        | 是（多核）    | CPU 密集型、隔离性要求高 | 视频编码、科学计算       |
| **线程** | 中        | 否（受 GIL） | I/O 密集型、简单数据共享 | 数据库查询、GUI 事件处理  |
| **协程** | 极低       | 否        | 超高并发 I/O、低延迟需求 | Web 服务器、爬虫、聊天服务 |

##### 线程/协程（协程为主，线程为辅）

|**维度**|**线程（Thread）**|**协程（Coroutine）**|
|---|---|---|
|**调度机制**|由操作系统内核调度，抢占式（可能被强制切换）|用户态协作式调度（需主动 `yield`/`await`）|
|**切换开销**|高（需内核态/用户态切换，保存寄存器等）|极低（通常只是函数调用级别的上下文保存）|
|**内存占用**|每个线程需分配 MB 级栈内存（默认 1-8MB）|协程栈通常为 KB 级（甚至动态增长）|
|**并行能力**|受限于 GIL（如 CPython），多核利用率低|单线程内并发，无法利用多核|
|**编程复杂度**|需处理竞态条件（锁、信号量等）|无锁编程（但需避免阻塞操作）|

##### celery/ray/dask


| 框架         | 适合场景                               | 核心特点                                       |
| ---------- | ---------------------------------- | ------------------------------------------ |
| **Celery** | 异步任务队列（比如发送邮件、处理视频转码）              | 分布式任务调度，简单耐用，但主要是 I/O 型任务（不是很强调 CPU 密集计算）       |
| **Ray**    | 分布式计算（高性能机器学习、大规模 Python 函数调度）       | 支持超大规模分布式、远超 Celery 性能，超简单的远程函数（@ray.remote） |
| **Dask**   | 分布式数据处理（特别适合 Pandas、Numpy 那种表格/数组运算） | 并行化大数据处理（局部任务调度 + 延迟计算），轻量版的 Spark            |

##### dramatiq/arq

| 框架           | 适合什么情况                | 特点                            |
| ------------ | --------------------- | ----------------------------- |
| **dramatiq** | 想要比 Celery 更简单的同步任务队列   | 类似 Celery 但更优雅，支持 RabbitMQ/Redis |
| **arq**      | asyncio 项目里需要高性能异步任务队列 | 完全 async/await，超轻量，只用 Redis    |