## Async Instructor - A few notes before we go on
* Notebook by Adam Lang
* Date: 3/19/2024
* This notebook goes over some examples from the Instructor async documentation for asynchronous programming for structured LLM output using openai.
    * These examples are part of the work I have been doing on structured LLM outputs.
* Source: https://python.useinstructor.com/blog/2023/11/13/learn-async/#for-loop-running-tasks-sequentially
1. The Instructor documentation recommends using `AsyncOpenAI` from the `openai` library. We need to import it, but no need to install, its already there.
    * We will need to either write a new function or change the existing function for `azure_openai_client()` in the config file.
    * I am not going to change the current function in the config file, I can rewrite it in this notebook for now.
2. The Instructor docs also recommend using the `apatch` function call but this has since been deprecated in favor of `patch`. This is the line of code I am referring to:

3. `apatch` Enables `response_model` in `create` method:
    * client = instructor.apatch(AsyncOpenAI())

In [3]:
!pip install instructor

Collecting instructor
  Downloading instructor-0.6.4-py3-none-any.whl.metadata (10 kB)
Collecting aiohttp<4.0.0,>=3.9.1 (from instructor)
  Downloading aiohttp-3.9.3-cp311-cp311-win_amd64.whl.metadata (7.6 kB)
Collecting docstring-parser<0.16,>=0.15 (from instructor)
  Downloading docstring_parser-0.15-py3-none-any.whl.metadata (2.4 kB)
Collecting openai<2.0.0,>=1.1.0 (from instructor)
  Downloading openai-1.14.2-py3-none-any.whl.metadata (19 kB)
Collecting rich<14.0.0,>=13.7.0 (from instructor)
  Downloading rich-13.7.1-py3-none-any.whl.metadata (18 kB)
Collecting tenacity<9.0.0,>=8.2.3 (from instructor)
  Downloading tenacity-8.2.3-py3-none-any.whl.metadata (1.0 kB)
Collecting httpx<1,>=0.23.0 (from openai<2.0.0,>=1.1.0->instructor)
  Downloading httpx-0.27.0-py3-none-any.whl.metadata (7.2 kB)
Collecting markdown-it-py>=2.2.0 (from rich<14.0.0,>=13.7.0->instructor)
  Downloading markdown_it_py-3.0.0-py3-none-any.whl.metadata (6.9 kB)
Collecting httpcore==1.* (from httpx<1,>=0.23.0->o

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
semantic-kernel 0.3.1.dev0 requires openai<0.28.0,>=0.27.0, but you have openai 1.14.2 which is incompatible.


In [39]:
# import os and instantiate openai key
import os
os.environ["OPENAI_API_KEY"] = '<your-api-key>'

In [40]:
import time
import asyncio
import instructor
from pydantic import BaseModel
from openai import AsyncOpenAI

Note: for AzureOpenAI you can use AsyncAzureOpenAI

In [41]:
class Timer:
    def __init__(self, name):
        self.name = name
        self.start = None
        self.end = None

    async def __aenter__(self):
        self.start = time.time()

    async def __aexit__(self, *args, **kwargs):
        self.end = time.time()
        print(f"{self.name} took {(self.end - self.start):.2f} seconds")


In [42]:
from urllib import response
import instructor
from pydantic import BaseModel, Field, HttpUrl
from openai import AsyncOpenAI

# Enables a response model in create method
client = instructor.patch(AsyncOpenAI()) # using patch instead of apatch


# base model class
class Person (BaseModel):
    name: str
    age: int
    email: str
    # url: url_finder
    skills: list[str] = Field(..., title="List of skills")


# function to extract person information
async def extract_person(text: str) -> Person:
    return await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "user", "content": text}
        ],
        response_model=Person,
    )

In [46]:
dataset = [
        "My name is John and I am 20 years old, my email is john@gmail.com, my skills are Python, SQL and C#.",
        "My name is Mary and I am 21 years old, my email is mary@yahoo.com, my skills are Go, Rust and C++.",
        "My name is Bob and I am 22 years old, my email is bob@aol.com, my skills are node.js, react and angular.",
        "My name is Alice and I am 23 years old, my email is Alice@aol.com, my skills are data science, machine learning and deep learning",
        "My name is Jane and I am 24 years old, my email is Jane@aol.com, my skills are statistics, calculus and algebra",
        "My name is Joe and I am 25 years old, my email is Joe@google.com, my skills are Java, C++ and JavaScript",
        "My name is Jill and I am 26 years old my email is jill@mail.com, my skills are product management, project management and agile",
    ]


## 4 comparison methods
1. `for loop` using `asyncio.create_task()`
2. `asyncio.gather`
3. `asyncio.as_completed`
4. `rate-limited gather - semaphores`

In [47]:
## 1. "for loop" - simplest way but takes too long - runs sequential tasks - "asyncio.create_task()"
async with Timer("asyncio.create_task"):
        all_persons = []
        tasks_get_persons = [extract_person(text) for text in dataset]
        for task in tasks_get_persons:
            all_persons.append(await task)
        print("asyncio.create_task:", all_persons)


asyncio.create_task: [Person(name='John', age=20, email='john@gmail.com', skills=['Python', 'SQL', 'C#']), Person(name='Mary', age=21, email='mary@yahoo.com', skills=['Go', 'Rust', 'C++']), Person(name='Bob', age=22, email='bob@aol.com', skills=['node.js', 'react', 'angular']), Person(name='Alice', age=23, email='Alice@aol.com', skills=['data science', 'machine learning', 'deep learning']), Person(name='Jane', age=24, email='Jane@aol.com', skills=['statistics', 'calculus', 'algebra']), Person(name='Joe', age=25, email='Joe@google.com', skills=['Java', 'C++', 'JavaScript']), Person(name='Jill', age=26, email='jill@mail.com', skills=['product management', 'project management', 'agile'])]
asyncio.create_task took 7.06 seconds


In [48]:
## 2. "asyncio.gather" - Batch processing - runs concurrently
async with Timer("asyncio.gather"):
        tasks_get_persons = [extract_person(text) for text in dataset]
        all_person = await asyncio.gather(*tasks_get_persons)
        print("asyncio.gather:", all_person)

asyncio.gather: [Person(name='John', age=20, email='john@gmail.com', skills=['Python', 'SQL', 'C#']), Person(name='Mary', age=21, email='mary@yahoo.com', skills=['Go', 'Rust', 'C++']), Person(name='Bob', age=22, email='bob@aol.com', skills=['node.js', 'react', 'angular']), Person(name='Alice', age=23, email='Alice@aol.com', skills=['data science', 'machine learning', 'deep learning']), Person(name='Jane', age=24, email='Jane@aol.com', skills=['statistics', 'calculus', 'algebra']), Person(name='Joe', age=25, email='Joe@google.com', skills=['Java', 'C++', 'JavaScript']), Person(name='Jill', age=26, email='jill@mail.com', skills=['product management', 'project management', 'agile'])]
asyncio.gather took 1.32 seconds


In [None]:
## 3. "asyncio.as_completed" - faster than asyncio.gather - runs tasks as completed
async with Timer("asyncio.as_completed"):
        all_persons = []
        tasks_get_persons = [extract_person(text) for text in dataset]
        for person in asyncio.as_completed(tasks_get_persons):
            all_persons.append(await person)
        print("asyncio.as_completed:", all_persons)

asyncio.as_completed: [Person(name='Jane', age=24), Person(name='Joe', age=25), Person(name='John', age=20), Person(name='Mary', age=21), Person(name='Alice', age=23), Person(name='Bob', age=22), Person(name='Jill', age=26)]
asyncio.as_completed took 1.18 seconds


In [None]:
## 4. Semaphore allows 2 concurrent requests - limits the number of concurrent requests
sem = asyncio.Semaphore(2)

In [None]:
# Create a semaphore that will only allow 2 concurrent requests

async def rate_limited_extract_person(text: str) -> Person:
        async with sem:
            return await extract_person(text)

async with Timer("asyncio.gather (rate limited)"):
        tasks_get_persons = [rate_limited_extract_person(text) for text in dataset]
        resp = await asyncio.gather(*tasks_get_persons)
        print("asyncio.gather (rate limited):", resp)

async with Timer("asyncio.as_completed (rate limited)"):
        all_persons = []
        tasks_get_persons = [rate_limited_extract_person(text) for text in dataset]
        for person in asyncio.as_completed(tasks_get_persons):
            all_persons.append(await person)
        print("asyncio.as_completed (rate limited):", all_persons)



asyncio.gather (rate limited): [Person(name='John', age=20), Person(name='Mary', age=21), Person(name='Bob', age=22), Person(name='Alice', age=23), Person(name='Jane', age=24), Person(name='Joe', age=25), Person(name='Jill', age=26)]
asyncio.gather (rate limited) took 2.83 seconds
asyncio.as_completed (rate limited): [Person(name='Joe', age=25), Person(name='Jane', age=24), Person(name='Jill', age=26), Person(name='Mary', age=21), Person(name='Bob', age=22), Person(name='John', age=20), Person(name='Alice', age=23)]
asyncio.as_completed (rate limited) took 2.93 seconds


### Summary:
1. `asyncio.gather` - multiple independent tasks quickly
2. `asyncio.as_completed` for LARGE datasets to process tasks as they complete
3. `rate-limiting` to avoid flooding or overwhelming the mongoDB server or API endpoints => this is probably what we want to use

### Example function for using AsyncAzureOpenAI

In [None]:
from openai import AzureOpenAI, AsyncAzureOpenAI

In [None]:
def async_azure_openai_client(self) -> AsyncAzureOpenAI:
        kwargs: dict[str, Any] = {
            "azure_endpoint": str(self.openai_api_base),
            "api_key": self.openai_api_key,
            "api_version": self.openai_api_version,
        }
        empty_kwargs = {k for k, v in kwargs.items() if v is None}
        if empty_kwargs:
            raise ValueError(f"Cannot create Azure OpenAI connection: missing required args: {empty_kwargs}")
        return AsyncOpenAI(**kwargs)
