# Async Python

## A briefing on asynchronous python coding, essential in Agent engineering

Here is a masterful tutorial by you-know-who with exercises and comparisons.

https://chatgpt.com/share/680648b1-b0a0-8012-8449-4f90b540886c

This includes how to run async code from a python module.

### And now some examples:

In [None]:
# Let's define an async function

import asyncio

async def do_some_work():
    print("Starting work")
    await asyncio.sleep(1)
    print("Work complete")


In [None]:
# What will this do?

do_some_work()

In [None]:
# OK let's try that again!

await do_some_work()

In [None]:
# What's wrong with this?

async def do_a_lot_of_work():
    do_some_work()
    do_some_work()
    do_some_work()

await do_a_lot_of_work()

In [None]:
# Interesting warning! Let's fix it

async def do_a_lot_of_work():
    await do_some_work()
    await do_some_work()
    await do_some_work()

await do_a_lot_of_work()

In [None]:
# And now let's do it in parallel
# It's important to recognize that this is not "multi-threading" in the way that you may be used to
# The asyncio library is running on a single thread, but it's using a loop to switch between tasks while one is waiting

async def do_a_lot_of_work_in_parallel():
    await asyncio.gather(do_some_work(), do_some_work(), do_some_work())

await do_a_lot_of_work_in_parallel()

### Finally - try writing a python module that calls do_a_lot_of_work_in_parallel

See the link at the top; you'll need something like this in your module:

```python
if __name__ == "__main__":
    asyncio.run(do_a_lot_of_work_in_parallel())
```

## Exemplos práticos: bloqueante vs não-bloqueante e medidas de tempo

As células abaixo demonstram: (1) diferença entre `asyncio.sleep` (não-bloqueante) e `time.sleep` (bloqueante),
(2) comportamento de `create_task` vs `await` sequencial vs `gather`, e (3) como delegar trabalho bloqueante para threads/processos
para evitar bloquear o event loop. Execute cada célula em um notebook (IPython aceita `await`).

In [None]:
# Exemplo 1: asyncio.sleep (não-bloqueante) vs time.sleep (bloqueante)
import asyncio
import time

async def async_work(i):
    print(f"{time.time():.3f} start async {i}")
    await asyncio.sleep(1)
    print(f"{time.time():.3f} done async {i}")

def blocking_work(i):
    print(f"{time.time():.3f} start block {i}")
    time.sleep(1)
    print(f"{time.time():.3f} done block {i}")

async def run_async_gather():
    t0 = time.time()
    await asyncio.gather(async_work(1), async_work(2), async_work(3))
    # ~1s: all async_work run in parallel
    print('elapsed gather (asyncio.sleep):', time.time() - t0)

async def worker_blocking(i):
    print(f"{time.time():.3f} start worker_blocking {i}")
    blocking_work(i)  # bloqueia o loop
    print(f"{time.time():.3f} end worker_blocking {i}")

async def run_blocking_gather():
    t0 = time.time()
    await asyncio.gather(worker_blocking(1), worker_blocking(2), worker_blocking(3))
    # ~3s: every blocking_work blocks the loop
    print('elapsed gather (blocking):', time.time() - t0)

# Execute (no notebook, use await):
# await run_async_gather()  # esperado ~1s
# await run_blocking_gather()  # esperado ~3s (cada blocking_work bloqueia o loop)

In [None]:
await run_async_gather()

In [None]:
await run_blocking_gather()

In [None]:
# Exemplo 2: sequencial vs create_task vs gather (medindo tempo)
import asyncio, time

async def job(i):
    print(f"{time.time():.3f} start {i}")
    await asyncio.sleep(1)
    print(f"{time.time():.3f} end {i}")

async def seq():
    t0 = time.time()
    await job(1)
    await job(2)
    await job(3)
    # ~3s: cada job espera 1s sequencialmente
    print('seq elapsed', time.time() - t0)

async def tasks_then_await():
    t0 = time.time()
    t1 = asyncio.create_task(job(1))
    t2 = asyncio.create_task(job(2))
    t3 = asyncio.create_task(job(3))
    await t1
    await t2
    await t3    
    # ~1s: todos os jobs disparam em paralelo nas linhas t1, t2 e t3 do código
    # Todos os jobs foram disparados quase simultaneamente, sem blockeios entre eles
    print('tasks_then_await elapsed', time.time() - t0)

async def gather_example():
    t0 = time.time()
    await asyncio.gather(job(1), job(2), job(3))
    # ~1s: todos os jobs disparam em paralelo na chamada do gather
    # Nennhum job bloqueia outro
    print('gather elapsed', time.time() - t0)

async def blocking_job(i):
    print(f"{time.time():.3f} start blocking job {i}")
    time.sleep(1)  # bloqueia o loop
    print(f"{time.time():.3f} end blocking job {i}")
    
async def blocking_tasks_then_await():
    t0 = time.time()
    t1 = asyncio.create_task(blocking_job(1))
    t2 = asyncio.create_task(blocking_job(2))
    t3 = asyncio.create_task(blocking_job(3))
    await t1
    await t2
    await t3    
    # ~3s: cada blocking_job bloqueia o loop
    # create_task nao cria threads, apenas agenda as tarefas no loop
    print('blocking tasks_then_await elapsed', time.time() - t0)  # esperado ~3s: cada blocking_job bloqueia o loop

# Execute in notebook: await seq(); await tasks_then_await(); await gather_example()
# Esperado: seq ~3s, tasks_then_await ~1s, gather ~1s

In [None]:
await seq()

In [None]:
await tasks_then_await()

In [None]:
await gather_example()

In [None]:
await blocking_tasks_then_await()

In [32]:
# Exemplo 3: delegar bloqueante para threads/processos
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time, math

from cpu_tasks import cpu_bound


def blocking_io(x):
    time.sleep(1)
    return x * 2


# def cpu_bound(n):
#     # trabalho CPU-intenso simples
#     s = 0
#     for i in range(2000000):
#         s += (i * i) % (n + 1)
#     return s


async def using_to_thread():
    t0 = time.time()
    res = await asyncio.gather(
        asyncio.to_thread(blocking_io, 1),
        asyncio.to_thread(blocking_io, 2),
        asyncio.to_thread(blocking_io, 3),
    )
    #  ~1s: todas as chamadas blocking_io rodam em threads separadas; multi-thread real.
    print("to_thread elapsed", time.time() - t0, res)


async def using_threadpool():
    t0 = time.time()
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as pool:
        tasks = [loop.run_in_executor(pool, blocking_io, i) for i in range(3)]
        res = await asyncio.gather(*tasks)
        #  ~1s: todas as chamadas blocking_io rodam em threads separadas; multi-thread real.
        print("threadpool elapsed", time.time() - t0, res)


async def using_processpool(n_tasks=30):
    t0 = time.time()
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        # Lambda simples: calcula x*x para cada x
        tasks = [loop.run_in_executor(pool, cpu_bound, i) for i in range(n_tasks)]
        res = await asyncio.gather(*tasks)
        print("processpool elapsed", time.time() - t0, "results:", res)


# Execute: await using_to_thread(); await using_threadpool(); await using_processpool()
# Observações: ProcessPoolExecutor cria processos (overhead) e evita GIL (bom para CPU-bound).

In [33]:
await using_to_thread()

to_thread elapsed 1.0005359649658203 [2, 4, 6]


In [34]:
await using_threadpool()

threadpool elapsed 1.0007672309875488 [0, 2, 4]


In [35]:
# time to single bound:
t0 = time.time()
cpu_bound(1)
print('single cpu_bound elapsed', time.time() - t0)

single cpu_bound elapsed 0.14500784873962402


In [None]:
await using_processpool(n_tasks=30) # aumentando número de tarefas para ver benefício

processpool elapsed 0.816753625869751 results: [0, 1000000, 1333333, 1000000, 4000000, 4333330, 3999997, 3000000, 5333329, 9000000, 7999993, 6333328, 11999989, 10999990, 9333325, 7000000, 15999992, 14333320, 15999981, 13000000, 17333317, 18999982, 15999996, 12333328, 20000000, 24999976, 21333313, 20999980, 27999993, 24333340]


**Instruções de execução:**

- Em ambiente Jupyter/IPython: execute as células com `Shift+Enter`. Use `await` diretamente nas células para chamar as funções `async` (p.ex. `await run_async_gather()`).
- Em módulo/script: coloque chamadas em `if __name__ == "__main__": asyncio.run(main())` ou construa uma função `main()` que chame os exemplos.