## Asyncio

**You can only use await inside of functions created with async def**

Multiprocessing

### Difference between Threads and Coroutines
Threads and coroutines in Python are both mechanisms for achieving concurrency, but they are fundamentally different in how they operate and are managed.

**Threads** are managed by the operating system. Each thread represents a separate flow of execution, with its own stack and memory space. Threads can run in parallel (especially on multi-core CPUs), and the OS schedules when each thread runs. This makes threads suitable for tasks that require true parallelism, such as CPU-bound operations. However, threads come with overhead due to context switching and require careful synchronization when accessing shared resources. who understands this better than an embedded systems engineer.

**Coroutines** are a programming construct managed at the language level (not by the OS). In Python, coroutines are special functions that can pause (yield) their execution and resume later, allowing other coroutines to run in the meantime. This is known as cooperative multitasking: only one coroutine runs at a time within a **single thread**, and they explicitly yield control to each other. Coroutines are lightweight, have much lower overhead than threads, and are especially well-suited for I/O-bound scenarios. Coroutines are used to perform multiple tasks cooperatively within a single thread, allowing more efficient use of resources and easier management of concurrent tasks. 

| Feature         | Threads                                 | Coroutines                                   |
|-----------------|-----------------------------------------|-----------------------------------------------|
| **Management**  | OS-level                               | Language/runtime-level                        |
| **Parallelism** | Can run in parallel (multi-core CPUs)  | Run cooperatively within a single thread      |
| **Overhead**    | High (context switching, memory)       | Low (lightweight, minimal context switching)  |
| **Use case**    | CPU-bound, true parallelism            | I/O-bound, high concurrency, async tasks      |
| **Control**     | Preemptive (OS decides when to switch) | Cooperative (programmer decides when to yield)|


In summary, threads are for parallel execution managed by the OS, while coroutines are for cooperative multitasking managed by Python itself. Coroutines are not a replacement for threads, but offer a more efficient and simpler way to handle many concurrent I/O-bound tasks in Python

### Concurrency and Parallelism: Key Concepts

What Is Concurrency?
Concurrency in computing means the ability of a system to handle multiple tasks or operations at (almost) the same time. These tasks may not actually run simultaneously—instead, they are managed so that progress is made on each over a period of time. The operating system or language runtime switches between them efficiently, giving the illusion that they are happening together.
    * Concurrency is about dealing with many things at once, not necessarily doing them at the same instant.
    * Examples: Handling multiple network requests in a web server, or responsive user interfaces where several activities seem active together.

What Is Parallelism?
Parallelism refers to actually performing multiple computations or tasks at the same time. This is only possible on hardware with multiple processors or cores.
    * Parallelism is about doing many things simultaneously, leveraging the hardware’s ability to run code on different cores or machines.
    * Examples: Processing parts of a large image on different CPU cores, or running multiple calculations at the same time in scientific computing.

How Do Threads and Coroutines Relate?
Threads
	•	Threads are a way to achieve both concurrency and, on multi-core systems, parallelism.
	•	In Python, threads allow multiple parts of a program to appear to run independently. Operating systems can truly run threads in parallel on different CPU cores.
	•	However, due to Python’s Global Interpreter Lock (GIL), true parallelism is limited for CPU-bound tasks in standard CPython. For I/O-bound programs (like waiting for files or network), threads are very effective.

| Feature                | Threads                                   |
|------------------------|-------------------------------------------|
| Managed by             | Operating system                          |
| Can provide parallelism| Yes, on multi-core CPUs                   |
| Use-case               | Both concurrency and parallelism          |
| Typical tasks          | I/O- or CPU-bound                         |


Coroutines
	•	Coroutines are a lightweight form of concurrency, managed by Python itself (not the OS).
	•	They enable the program to switch between tasks at certain points (like when one is waiting for data), but only one coroutine runs at a time within a single thread.
	•	Coroutines do not give true parallelism (all run on the same thread) but can provide very high concurrency for I/O-bound programs, such as thousands of simultaneous web requests.

| Feature                | Coroutines                                |
|------------------------|-------------------------------------------|
| Managed by             | Python interpreter (language-level)       |
| Can provide parallelism| No (single thread, cooperative switching) |
| Use-case               | High concurrency, I/O-bound workloads     |
| Typical tasks          | Asynchronous network/server programming   |


#### Comparing Concurrency and Parallelism in Python

| Mechanism    | Concurrency | Parallelism | Where Used       | When to Use                                   |
|--------------|-------------|-------------|------------------|-----------------------------------------------|
| Threads      | Yes         | Limited*    | OS-level tasks   | I/O-bound (good), CPU-bound (limited by GIL)  |
| Coroutines   | Yes         | No          | async/await code | High I/O concurrency, very lightweight        |

*Python threads can use multiple cores only in implementations without the GIL, or via multiprocessing modules.
Summary
	•	Concurrency is about managing many tasks efficiently, switching between them.
	•	Parallelism is about running multiple tasks at the same time, on separate processors/cores.
	•	Threads can give both, but Python’s GIL restricts true CPU-based parallelism (except for I/O tasks).
	•	Coroutines offer high concurrency by pausing and resuming tasks in the same thread, with minimal resource use, but do not provide actual parallel execution.

`asyncio` module cannot be used in Python environments that run on WebAssembly platforms. 

The main reason is that these platforms do not support many of the low-level system features required by `asyncio`, such as system calls and event loop support. 

Therefore, try running the examples in this guide in a regular terminal environment where all necessary system features are available.

| Mechanism    | Concurrency | Parallelism | Where Used       | When to Use                                   |
|--------------|-------------|-------------|------------------|-----------------------------------------------|
| Threads      | Yes         | Yes         | OS-level tasks   | I/O-bound (good), CPU-bound                   |
| Coroutines   | Yes         | No          | async/await code | High I/O concurrency, very lightweight        |

## Streaming in OpenAI API
Streaming makes large language model (LLM) apps feel faster and more interactive. Instead of waiting for the full response to be generated, the API sends it in small parts — token by token — as soon as they’re ready. This improves user experience, reduces memory load, allows early cancellation, and handles long responses more efficiently.

In OpenAI's API, this is enabled by setting stream=True in the request. When streaming is enabled, the response is delivered using **Server-Sent Events (SSE)** over HTTP with chunked transfer encoding. [Technically, stream=True enables server-sent events (SSE), so the HTTP response is kept open and data is sent incrementally using chunked transfer encoding]

Below, we’ll look at how responses differ with and without streaming, then explore how tools like LangChain and LangGraph build on this concept using Python's async and await.

### Let's learn a bit about SSE

Server-Sent Events (SSE) is a web standard for sending real-time updates from a server to a client over a single HTTP connection. SSE lets servers push data to web clients as soon as it is available, making it useful for live data feeds like chat messages or notifications.

When you set the `stream` parameter to `True` in OpenAI’s API, it enables streaming responses. This means the API sends back the model’s output in small pieces as soon as each part is ready, instead of waiting for the whole response. This allows clients to show partial results right away, reducing wait time and making applications more interactive.

#### How `stream=True` works and its link to SSE

- With `stream=True`, the OpenAI API uses Server-Sent Events (SSE). This keeps the HTTP connection open and sends data in chunks, one after another.
- Each chunk is sent as a separate event, often called a "data-only server-sent event."
- Every chunk contains a small part (called `delta`) of the generated text.
- On the client side, you receive these chunks as they arrive. In Python, you can use an asynchronous iterator (like `async for`) to process each chunk as soon as it is received.

SSE is simple, reliable, and works well for streaming data from the server to the client in real time. In the context of OpenAI’s API, it helps build responsive apps that can display or process model outputs as soon as they are generated.

To build an API that responds with streaming via Server-Sent Events (SSE), the server keeps an HTTP connection open and pushes data to the client in a special plaintext format. Here’s how SSE works and how you would implement it—especially using FastAPI:
The server sets headers:
	•	`Content-Type: text/event-stream`
	•	`Cache-Control: no-cache`
	•	`Connection: keep-alive`

It keeps the HTTP response open and writes text-formatted events such as:
The server emits new data whenever available, and the client receives each without requesting again.
	•	No special protocol needed—just HTTP, designed for one-way push from server to browser

A simple example of SSE support in FastAPI through its `StreamingResponse`:

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def sse_generator():
    counter = 1
    while counter < 10:
        yield f"data: Event {counter}\n\n"
        counter += 1
        await asyncio.sleep(2)  # simulate pushing data every 2 seconds

# cmd: fastapi dev fastapi_streaming.py
@app.get('/sse')
async def sse_endpoint():
    return StreamingResponse(sse_generator(), media_type='text/event-stream')
```
Launch postman send the request and observe the events being received in real-time.

In [1]:
import dotenv
dotenv.load_dotenv()

True

### Synchronous OpenAI API Call **without** Streaming
This code invokes the OpenAI API using the synchronous Python client and does not use streaming. The entire model output is returned in one HTTP response, and the call is blocking: program execution pauses until the response arrives. This approach is straightforward and suitable for scripts or applications where concurrent handling of multiple requests or real-time partial output is not required.
* “Synchronous” means the program waits for the API call to complete before moving forward.
* “Blocking” means no other statements after the API call are executed until the full response is available.


In [2]:
from openai import OpenAI
import json

client = OpenAI()

response = client.chat.completions.create(
    model="gpt-4.1-nano",
    messages=[
        {
            "role": "user",
            "content": "Say 'double bubble bath' ten times slowly.",
        },
    ],
)
print("typeof(response):", type(response), "\n")
print(response.choices[0].message.content)

typeof(response): <class 'openai.types.chat.chat_completion.ChatCompletion'> 

Sure! Double bubble bath, double bubble bath, double bubble bath, double bubble bath, double bubble bath, double bubble bath, double bubble bath, double bubble bath, double bubble bath, double bubble bath.


### Synchronous OpenAI API Call **with** Streaming
When `stream=True` is set, the OpenAI API keeps the HTTP connection open and transmits the model’s output incrementally as it is generated. This allows your program to receive and process each chunk of the response in real time, rather than waiting for the complete output. Internally, the API sends these chunks using server-sent events and chunked transfer encoding, enabling your application to start displaying or utilizing the model’s reply as soon as the first data arrives.


In [8]:
# This script demonstrates how to use the OpenAI Python client to create a response with stream=True

from openai import OpenAI
from types import GeneratorType
from collections.abc import Iterator

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4.1-nano",
    messages=[
        {
            "role": "user",
            "content": "Say 'double bubble bath' ten times slowly.",
        },
    ],
    stream=True,
)
print("typeof(stream):", type(stream), " Is Generator: ", isinstance(stream, GeneratorType), " Is Iterator: ", isinstance(stream, Iterator), "\n")

typeof(stream): <class 'openai.Stream'>  Is Generator:  False  Is Iterator:  True 



In [9]:
print(stream.__next__().choices[0].delta.content)
print(stream.__next__().choices[0].delta.content)
print(stream.__next__().choices[0].delta.content)


Sure
!


In [10]:
for chunk in stream:
    print(chunk.choices[0].delta.content, end="", flush=True)

 Here it is:

Double bubble bath, double bubble bath, double bubble bath, double bubble bath, double bubble bath, double bubble bath, double bubble bath, double bubble bath, double bubble bath, double bubble bath.None

In [None]:
"""
for event in stream:

    print("****" * 20)
    print(json.dumps(event.model_dump(), indent=2))
    print("@@@@" * 20)
    print(event)
    print("----" * 20)

    print(event.choices[0].delta.content, end="", flush=True)
"""

### Asynchronous OpenAI API Call


In [None]:
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(api_key="<ur key>")


async def main():
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "user", "content": "Write a one-sentence bedtime story about a unicorn."}
        ],
        stream=True,
    )

    async for event in stream:
        print(event)


asyncio.run(main())

#### AsyncOpenAI response client

In [None]:
import os
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(
    # This is the default and can be omitted
    api_key=os.environ.get("OPENAI_API_KEY"),
)


async def main() -> None:
    response = await client.responses.create(
        model="gpt-4o", input="Explain disestablishmentarianism to a smart five year old."
    )
    print(response.output_text)


asyncio.run(main())

### Generator 'yield' and Generator expression

def async
await
def asyncio.gather
async for
asyncio.sleep
asyncio.run
asyncio.create_task

### Table
||Stream=False|Stream=True|
|---|---|----|
|Synchronous (blocking call)|Entire response returned at once.|Response streamed in chunks (blocking loop).|
|Asynchronous (non-blocking call)|Entire response available after awaiting.|Response streamed in chunks (most efficient and responsive approach).|

### An example for Asynchronous, non-blocking with streaming off

Yes, you are correct: you can use the `AsyncOpenAI` client without setting `stream=True`. In this case, the operation remains asynchronous—meaning you use `async` and `await` syntax—but the response is delivered all at once after processing is complete, not incrementally streamed in chunks.
Key points:
* With `AsyncOpenAI`, if you omit `stream=True` or set `stream=False`, the call behaves like a regular async HTTP request:
* You `await` the completion, and get the entire response in one object, not as a stream.
* This is functionally similar to the synchronous (blocking) call, except your code does not block the event loop; it lets other async tasks run while waiting for the response.
* Example usage:
```python openai_asyncio_stream_off.py```

So, using `AsyncOpenAI` without streaming is valid and useful when you want non-blocking API calls, but do not need real-time chunked output


## 1. `async def` in asyncio Python (asynchronous programming)

Allow your programs to handle many tasks at once without blocking the main thread. This is particularly useful for I/O-bound operations like network requests, file I/O, database queries, or waiting for external resources that can slow down your application.

Coroutines are at the heart of asyncio programming in Python. They allow you to write asynchronous code that can pause and resume execution, enabling efficient handling of I/O-bound tasks without blocking the main thread.

Key concepts:
1. `async def` defines a coroutine, which is a special kind of function that can be paused and resumed. Calling that special function returns a coroutine object instead of running the code immediately. The way to run the coroutine is by passing it to `await`, `asyncio.run()`, `asyncio.create_task()` or `asyncio.gather()`.

The below code results in "RuntimeWarning: coroutine 'greet' was never awaited"
``` python
async def greet():
    print("Hello")
if __name__ == "__main__":
    greet()
```

Use `asyncio.run()` to run the coroutine:
```python
import asyncio
async def greet():
    print("Hello")
if __name__ == "__main__":
    asyncio.run(greet())
```
You can define an async def function without using await inside it. This is valid Python, though uncommon in practice. The function is still a coroutine and must be called as such, but it will execute just like a regular function without pausing for any asynchronous operations.

To inspect whether a function is a coroutine function, `inspect.iscoroutinefunction()` can be used. It returns True if the function is defined with `async def`, indicating it is a coroutine function.
```python
import inspect
async def greet():
    print("Hello")

if __name__ == "__main__":
    print(inspect.iscoroutinefunction(greet)) # Output: True
```

Similarly, to check if an object is a coroutine object (the result of calling a coroutine function), use `inspect.iscoroutine()` which returns `True` if the object is a coroutine.

```
cor_obj = greet()
print(inspect.iscoroutine(cor_obj))
```
Additionally, `inspect.isawaitable()` can be used to check if an object can be awaited (i.e., can be used with `await`), which includes coroutine objects, but also other awaitables like Tasks or Futures.


|Check|Use|
|---|---|
|Check if a function is coroutine|`inspect.iscoroutinefunction()`|
|Check if an object is coroutine|`inspect.iscoroutine()`|
|Check if an object is awaitable|`inspect.isawaitable()`|

## 2. `await` in asynio
1. It works only inside `async def` functions.
2. `await` is used to pause the current coroutine until the result of an “awaitable” object is ready, such as another coroutine, Tasks or an asynchronous generator.
3. It makes your code non-blocking by yielding control back to the event loop until the awaited result is available.

In summary,

|Concept|What it means|
|---|---|
|`async def`|Declares a coroutine function (returns a coroutine object)|
|`await`|Pauses current coroutine until awaitable is finished|

An **event loop** in Python’s `asyncio` is the central component that manages and schedules multiple asynchronous tasks (coroutines) to run efficiently in a single thread. It repeatedly cycles through a queue of tasks, running those that are ready, and pausing those waiting for operations like I/O, timers, or other events. This allows your program to run multiple operations concurrently without blocking. Thus, Python’s `asyncio` event loop efficiently switches between tasks that can make progress, pausing on `await`s until awaited results are ready, achieving concurrency without multi-threading.

``` python
import asyncio

async def say_after(delay, greeting_msg):
    await asyncio.sleep(delay)
    print(greeting_msg)

async def main():
    print("Start")
    # Schedule multiple coroutines concurrently
    await asyncio.gather(
        say_after(1, "Hello"),
        say_after(2, "World")
    )
    print("End")

if __name__ == "__main__":
    asyncio.run(main())
```

## 3. `asyncio.run()` in asyncio
1. This function is a simple way to start an asyncio program. It runs the main entry-point coroutine and manages the event loop.
2. It creates a new event loop, runs the specified coroutine until it completes, and then closes the loop.
3. You can checkout asyncio.new_event_loop() and asyncio.set_event_loop() for more advanced use cases (manual loop management), but `asyncio.run()` is the recommended way to run a top-level coroutine in most cases.

Note: You can call `asyncio.run()` multiple times in the same Python program — each call creates a fresh event loop and closes it after use. However, it’s generally recommended to call it only once per program execution, typically at the entry point of your asynchronous application (e.g., `asyncio.run(main())`). Repeated use is valid but discouraged in complex or performance-critical applications due to inefficiency and potential side effects. For running multiple coroutines, manage them inside a single async function using tools like `asyncio.gather()` or `asyncio.create_task()`.

## 4. `asyncio.gather()` in asyncio
1. It takes multiple awaitable objects (coroutines, Tasks, or Futures) and runs them in parallel, returning their results as a list.
2. The `asyncio.gather()` function itself only returns an awaitable that schedules the provided coroutines, but it does not create or run the event loop by itself.
3. To actually execute the coroutines passed to `asyncio.gather()`, there must be an event loop already running.
4. Calling `asyncio.gather()` without awaiting its result inside an active event loop will not execute the coroutines.
5. So, you need to be inside a coroutine running on an event loop (typically started with `asyncio.run()` or manually) to `await asyncio.gather()` and get it to execute.

## 5. `asyncio.create_task()` in asyncio
1. `asyncio.create_task()` takes a coroutine and wraps it in a Task object, scheduling it to run concurrently on the event loop right away. In contrast, simply calling an `async def` function returns a coroutine object, but does not start its execution until it is awaited or scheduled as a Task.
2. `asyncio.create_task(coroutine_object)` submits the coroutine to the event loop right away, so it runs concurrently alongside other tasks and coroutines.
3. You can later `await` the Task to get its result or just let it run independently if you don’t need the result right away.

``` python
import asyncio

async def say_after(delay, message):
    await asyncio.sleep(delay)
    print(message)

async def main():
    # Create tasks to run concurrently
    task1 = asyncio.create_task(say_after(2, "Hello"))
    task2 = asyncio.create_task(say_after(1, "World"))

    print("Tasks started")
    # Do other things here while the tasks run...

    # Await the tasks to get their result (wait until they finish)
    await task1
    await task2
    print("Both tasks finished")

if __name__ == "__main__":
asyncio.run(main())
```

Try commenting out await task1 or task2 in the above code to see how it affects the output. If you do not await a task, it will still run concurrently, but you won't wait for its completion before moving on to the next line of code.
Now, add `await asyncio.sleep(10)` after print("Both tasks finished") to see how the program behaves when you wait for a while after the tasks are done. This will keep the event loop running, allowing any remaining tasks to complete before the program exits.

## 6. `asynio.sleep()` in asyncio
This is an async version of `time.sleep()`. It lets a coroutine pause without blocking the whole program.

## 7. `async for` in asyncio
1. `async for` is used to iterate over asynchronous iterators, such as those that fetch data in chunks. Example is using AsyncOpenAI client to stream data from OpenAI API.
2. It allows you to process items as they become available, without blocking the event loop.
3. Asynchronous iterators are objects that implement the `__aiter__()` and `__anext__()` methods like __iter__() and __next__() in iterators. 
4. __aiter__() returns an asynchronous iterator object, typically self.
5. __anext__() must return an awaitable object. `async for` resolves the awaitables returned by an asynchronous iterator's __anext__() method until it raises a StopAsyncIteratoion exception.

```python
class Counter:
    def __init__(self, low, high):
        self.current = low
        self.high = high

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current > self.high:
            raise StopAsyncIteration
        await asyncio.sleep(1)  # Simulate an asynchronous operation
        self.current += 1
        return self.current - 1

async def main():
    async for number in Counter(1, 5):
        print(number)

if __name__ == "__main__":
    asyncio.run(main())
```

`AsyncOpenAI` and setting `stream=True` are two different concepts.
	•	`AsyncOpenAI` is the asynchronous version of the OpenAI Python client. It provides async methods to call OpenAI APIs, allowing you to use `async/await` syntax, run many requests concurrently, and generally leverage Python’s `asyncio` concurrency functionality. It’s about how you interact with the OpenAI API (async client vs. sync client).
	•	`stream=True` is an option you pass in a request to enable streaming responses from the API. Normally, the API sends you the whole response after it’s fully generated. When you use `stream=True`, you receive the response incrementally as it’s being generated, typically as an asynchronous iterator or async generator that yields events/chunks of the output. Streaming reduces latency and lets you process partial results early.

    You can use `AsyncOpenAI` without `stream=True` to get full responses asynchronously, or with `stream=True` to get incremental streaming responses asynchronously.
So they serve different roles:
	•	`AsyncOpenAI` is about how you call the API (async client).
	•	`stream=True` is about how the API returns the data (streaming mode).