## Asyncio

#### Reminder: tasks and gather
`asyncio.create_task` is used to create tasks that run in the background..

In [3]:
import asyncio


async def fire_after(seconds: float) -> None:
    await asyncio.sleep(seconds)
    print("Fired")


t = asyncio.create_task(fire_after(0.1))
await asyncio.sleep(0.2)
print("after")
await t

Fired
after


`asyncio.gather` is generally the easiest way to create parallel tasks.

In [11]:
import asyncio


async def task(n: int) -> int:
    await asyncio.sleep(random.uniform(0, 0.5))
    return n


await asyncio.gather(*(task(n) for n in range(20)))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

#### TaskGroup

TaskGroup is a new feature in 3.11, it has a lot of similarities with gather. The task group will not exit until all tasks are completed or interrupted. 

The example above can be done quite simply by:

In [16]:
async def task_group():
    async with asyncio.TaskGroup() as g:
        for n in range(20):
            g.create_task(task(n))

await task_group()

Here we can dispatch 20 tasks quite easily, the task group context only exits after all 20 are complete.

But how do we return the values as in `asyncio.gather`?

`TaskGroup.create_task` does return a task that can be awaited for the results. We can await each of the tasks and obtain the result:

In [17]:
async def task_group2():
    async with asyncio.TaskGroup() as g:
        futs = [g.create_task(task(n)) for n in range(20)]

    return [await f for f in futs]


await task_group2()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Note that the line:
```python
[await f for f in futs]
```

is actually equivalent to:
```python
await asyncio.gather(*futs)
```
In this situation.

Another and perhaps clearer way to write this is
```python
return [f.result() for f in futs]
```

As the task has already been completed and we simply require the results.

#### TaskGroup Benefits

##### Better Structure

TaskGroups essentially define areas of parallel code using the context manager, this makes it easier to spot these regions.

```python
async with asyncio.TaskGroup() as g:
    ...  # Something parallel running here

await blah  # somethings running in serial

async with asyncio.TaskGroup() as g:
    ...  # Another set of parallel tasks
```

Task groups may also be nested. This may provide more semantic meaning to your groups as well as better error handling we'll talk about later.

For example:

```python
async with asyncio.TaskGroup() as server:
    server.create_task(background_job())
    async with asyncio.TaskGroup() as services:
        services.create_task(service1.start())
        services.create_task(service2.start())
```

This runs a `background_job` and starts `services`. Since the services are grouped up, then if one dies the others are also stopped.

A service stopping also causes the `server` group to stop, which will terminate the background job.

All in all, this can improve the readability of parallel code.


##### Better Cancellation
Using gather, by default, all tasks are run to completion. If one task errors that error is propagated, and the rest of the tasks run in the background to completion.

In [33]:
class AbortError(Exception):
    pass


async def task_abort(wait: float, abort: bool = False):
    await asyncio.sleep(wait)
    if abort:
        raise AbortError("Aborting the task")

    print("Task complete")


await asyncio.gather(
    task_abort(0.1, True),
    task_abort(0.2, False),
    task_abort(0.2, False),
)

AbortError: Aborting the task

Task complete
Task complete


In a task group, if one task is cancelled or aborted, all other tasks are cancelled.

In [37]:
async def task_group_abort():
    async with asyncio.TaskGroup() as g:
        g.create_task(task_abort(0.1, True))
        g.create_task(task_abort(0.1, True))
        g.create_task(task_abort(0.2, False))
        g.create_task(task_abort(0.2, False))


await task_group_abort()

ExceptionGroup: unhandled errors in a TaskGroup (2 sub-exceptions)

##### PEP 654 – Exception Groups and except*


Note that exceptions are no longer individually raised but raised as a group in this case. This is a new feature in 3.11 allowing exceptions to be bundled up and handled properly.

In [39]:
try:
    await task_group_abort()
except ExceptionGroup as eg:
    sub_group = eg.subgroup(AbortError)
    if sub_group:
        for e in sub_group.exceptions:
            print(f"Got {e}")

Got Aborting the task
Got Aborting the task


A new except* is introduced so we can handle this more easily.

In [38]:
try:
    await task_group_abort()
except* AbortError as eg:
    for e in eg.exceptions:
        print(f"Got {e}")


Got Aborting the task
Got Aborting the task
