
<div dir="rtl" align="right">

# 📘 اسلات A-3: Asyncio — Event Loop, Task, Await/Cancel/Timeout

**هدف:** درک loop، ساخت Task، اجرای concurrent، لغو و timeout؛ ربط به FastAPI (IO-bound vs CPU-bound).

</div>


In [None]:
# A3_asyncio_basics.py
import asyncio

async def io_job(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"{name} done after {delay}s"

async def demo_gather() -> None:
    results = await asyncio.gather(
        io_job("A", 1.0),
        io_job("B", 0.5),
        io_job("C", 0.2),
    )
    print("gather results:", results)

async def demo_cancel() -> None:
    task = asyncio.create_task(io_job("slow", 5))
    await asyncio.sleep(0.7)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("slow task cancelled safely")

async def demo_timeout() -> None:
    try:
        res = await asyncio.wait_for(io_job("maybe-slow", 2.0), timeout=0.8)
        print(res)
    except asyncio.TimeoutError:
        print("timeout!")

async def main() -> None:
    await demo_gather()
    await demo_cancel()
    await demo_timeout()

if __name__ == "__main__":
    asyncio.run(main())

In [None]:
# A3_asyncio_advanced.py (اختیاری)
import asyncio

async def bounded_job(sem: asyncio.Semaphore, i: int, delay: float) -> str:
    async with sem:
        await asyncio.sleep(delay)
        return f"job {i} after {delay}s"

async def demo_as_completed() -> None:
    tasks = [asyncio.create_task(asyncio.sleep(d, result=f"res {d}")) for d in (0.3, 0.1, 0.2)]
    for fut in asyncio.as_completed(tasks):
        print("finished:", await fut)

async def demo_semaphore() -> None:
    sem = asyncio.Semaphore(2)
    jobs = [bounded_job(sem, i, delay=(i % 3) * 0.2 + 0.1) for i in range(6)]
    results = await asyncio.gather(*jobs)
    print("bounded results:", results)

async def main() -> None:
    await demo_as_completed()
    await demo_semaphore()

if __name__ == "__main__":
    asyncio.run(main())

In [None]:
# A3_cpu_bound_note.py
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

def cpu_heavy(n: int) -> int:
    s = 0
    for i in range(10_000_00):
        s += (i * n) % 7
    return s

async def run_cpu_in_thread(n: int) -> int:
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor(max_workers=4) as pool:
        return await loop.run_in_executor(pool, cpu_heavy, n)

async def main() -> None:
    start = time.perf_counter()
    r1, r2 = await asyncio.gather(run_cpu_in_thread(3), run_cpu_in_thread(5))
    dur = time.perf_counter() - start
    print("results:", r1, r2, "took:", round(dur, 3), "s")

if __name__ == "__main__":
    asyncio.run(main())


<div dir="rtl" align="right">

## 🧪 تمرین‌ها
1) سه `io_job` با تاخیرهای 0.1/0.2/0.3 را با `gather` اجرا کن و ترتیب پایان را بررسی کن.  
2) یک task با تاخیر 3s را پس از 0.5s `cancel()` کن و `CancelledError` را هندل کن.  
3) `wait_for(..., timeout=0.8)` روی job با تاخیر 1.2s و مشاهدهٔ Timeout.  
4) توضیح FastAPI: چرا برای I/O `async def` بهتر است و برای CPU-bound باید از executor/صف کار استفاده کرد؟

## ▶️ A-4: ASGI vs WSGI + اپ ASGI حداقلی (اسلات بعدی)

</div>
