In [11]:
import asyncio
import random

In [1]:
# asyncio.run() 不能在已經運行的事件迴圈（event loop）中被呼叫，這通常會在 Jupyter Notebook 或 某些異步框架（如 FastAPI、JupyterLab） 中發生，因為它們已經有一個運行中的事件迴圈。

In [None]:
# 基本 async 和 await : 使用 async def 定義，並用 await 讓出控制權，讓其他異步任務可以運行。
async def say_hello():
    print("Hello")
    await asyncio.sleep(2)  # 模擬等待 2 秒
    print("World")

await say_hello()

Hello
World


In [5]:
# 多個異步函式 : 當有多個異步函式時，await 會讓它們順序執行，而不會並行運行。
async def task1():
    print("Task 1 開始")
    await asyncio.sleep(2)
    print("Task 1 完成")

async def task2():
    print("Task 2 開始")
    await asyncio.sleep(1)
    print("Task 2 完成")

async def main():
    await task1()
    await task2()

await main()

Task 1 開始
Task 1 完成
Task 2 開始
Task 2 完成


In [6]:
# 讓異步函式並行執行 (asyncio.gather()) : 同時 執行多個異步函式（並行執行），可以用 asyncio.gather()。
async def task1():
    print("Task 1 開始")
    await asyncio.sleep(2)
    print("Task 1 完成")

async def task2():
    print("Task 2 開始")
    await asyncio.sleep(1)
    print("Task 2 完成")

async def main():
    await asyncio.gather(task1(), task2())  # 同時執行 task1 和 task2

await main()

Task 1 開始
Task 2 開始
Task 2 完成
Task 1 完成


In [13]:
# 使用 asyncio.create_task() 讓任務並行 : 除了 asyncio.gather()，還可以用 asyncio.create_task() 來創建並行任務，讓 task1 和 task2 **幾乎同時開始執行**
async def task1():
    print("Task 1 開始")
    await asyncio.sleep(2)
    print("Task 1 完成")

async def task2():
    print("Task 2 開始")
    await asyncio.sleep(1)
    print("Task 2 完成")

async def main():
    t1 = asyncio.create_task(task1())  # 建立 task1 任務
    t2 = asyncio.create_task(task2())  # 建立 task2 任務
    await t1  # 等待 task1 完成
    await t2  # 等待 task2 完成

await main()

Task 1 開始
Task 2 開始
Task 2 完成
Task 1 完成


In [None]:
# 非同步爬取網頁 (aiohttp) : 當你需要大量網路請求時，async 可以大幅提升效能。這裡我們使用 aiohttp 來並行請求多個網站。
import aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            print(f"讀取 {url}，狀態碼：{response.status}")
            return await response.text()

async def main():
    urls = [
        "https://www.example.com",
        "https://www.python.org",
        "https://www.openai.com"
    ]
    
    tasks = [fetch(url) for url in urls]  # 創建任務列表
    await asyncio.gather(*tasks)  # 並行執行所有請求

await main()

讀取 https://www.python.org，狀態碼：200
讀取 https://www.openai.com，狀態碼：403
讀取 https://www.example.com，狀態碼：200


In [10]:
# 讓異步函式超時 (asyncio.wait_for()) : 有時候，我們希望某個異步函式 不能超過特定時間，否則就取消它。例如，設置 3 秒超時，如果 3 秒內沒有完成，就拋出錯誤。
async def slow_task():
    print("開始執行")
    await asyncio.sleep(5)  # 模擬一個很慢的操作
    print("任務完成")

async def main():
    try:
        await asyncio.wait_for(slow_task(), timeout=3)  # 設置 3 秒超時，函式最多運行 3 秒，超過就會中斷
    except asyncio.TimeoutError:
        print("超時！任務未完成")

await main()

開始執行
超時！任務未完成


In [12]:
# 限制並行數量 (asyncio.Semaphore()) : 如果我們想要並行運行很多任務，但不希望一次執行太多（例如避免爬取太多網站導致被封），可以使用 asyncio.Semaphore() 來限制同時運行的數量。
async def worker(semaphore, num):
    async with semaphore:  # 只有 N 個任務能同時運行
        print(f"Worker {num} 開始")
        await asyncio.sleep(random.randint(1, 3))  # 模擬不同的工作時間
        print(f"Worker {num} 完成")

async def main():
    semaphore = asyncio.Semaphore(2)  # 最多允許 2 個任務同時執行
    tasks = [worker(semaphore, i) for i in range(5)]  # 創建 5 個 worker
    await asyncio.gather(*tasks)

await main()

Worker 0 開始
Worker 1 開始
Worker 0 完成
Worker 2 開始
Worker 2 完成
Worker 3 開始
Worker 1 完成
Worker 4 開始
Worker 3 完成
Worker 4 完成
