https://jxnl.github.io/instructor/blog/2023/11/13/learn-async/
- `OpenAI()` vs. `AsyncOpenAI()`
- Asynchronous IO (async IO): a language-agnostic paradigm

https://realpython.com/async-io-python/

- **Concurrency** is a slightly broader term than parallelism. It suggests that multiple tasks have the ability to run in an overlapping manner. (There’s a saying that concurrency does not imply parallelism.)
    - **Parallelism** consists of performing multiple operations at the same time. 
        - **Multiprocessing** is a means to effect parallelism, and it entails spreading tasks over a computer’s central processing units (CPUs, or cores)
            - CPU-bound tasks
            - `multiprocessing`, `concurrent.futures.ProcessPoolExecutor`
    - **Threading** is a concurrent execution model whereby multiple threads take turns executing tasks. One process can contain multiple threads. 
        - GIL in Python
        - IO-bound tasks
        - `threading`, `concurrent.futures.ThreadPoolExecutor`
    - **Asynchronous IO**
        - async IO is a style of concurrent programming, but it is **not parallelism**
        - single-threaded, single-process design: it uses **cooperative multitasking**
        - ability to pause
        - `asyncio`

**asyncio**
- `asyncio` has changed a lot through Python 3.4 to Python 3.7
- `await asyncio.sleep(1)` - non blocking while `time.sleep(1)` - blocking
- `async def` introduces either a native **coroutine** or an **asynchronous generator** (yield)
- `async with` and `async for` available
- an object defining an ``.__await__()`` dunder method that returns an iterator is awaitable
- `await asyncio.gather(*list_of_tasks)`


Outdated things:
- `@asyncio.coroutine` repladec with `async/await` kws

Further reading: https://realpython.com/async-io-python/#async-io-design-patterns

In [12]:
import asyncio

In [2]:
import instructor
from pydantic import BaseModel
from openai import AsyncOpenAI

# Enables `response_model` in `create` method
client = instructor.patch(AsyncOpenAI())  


class Person(BaseModel):
    name: str
    age: int


async def extract_person(text: str) -> Person:
    return await client.chat.completions.create(  
        model="gpt-3.5-turbo",
        messages=[
            {"role": "user", "content": text},
        ],
        response_model=Person,
    )

In [3]:
dataset = [
    "My name is John and I am 20 years old",
    "My name is Mary and I am 21 years old",
    "My name is Bob and I am 22 years old",
    "My name is Alice and I am 23 years old",
    "My name is Jane and I am 24 years old",
    "My name is Joe and I am 25 years old",
    "My name is Jill and I am 26 years old",
]

In [15]:
# When all tasks completed
# Result order guaranteed, execution order not
async def gather(dataset):
    tasks_get_persons = [extract_person(text) for text in dataset]
    all_persons = await asyncio.gather(*tasks_get_persons)
    return all_persons 

In [8]:
# When ready, streaming
# Result order not guaranteed
# For large datasets
async def as_completed(dataset):
    all_persons = []
    tasks_get_persons = [extract_person(text) for text in dataset]
    for person in asyncio.as_completed(tasks_get_persons):
        all_persons.append(await person)
    return all_persons

# persons = await as_completed(dataset)

In [16]:
# Semaphores 
# (PW EiTI dr Kruk <3)
# Same for as_completed

sem = asyncio.Semaphore(2)


async def rate_limited_extract_person(text: str, sem: asyncio.Semaphore) -> Person:
    async with sem:  
        return await extract_person(text)


async def rate_limited_gather(dataset: list[str], sem: asyncio.Semaphore):
    tasks_get_persons = [rate_limited_extract_person(text, sem) for text in dataset]
    return await asyncio.gather(*tasks_get_persons)

![](data/asyncio_times.png)