# Working with async

`openaivec.aio` is a experimental module that provides an async interface to OpenAI's API. It is designed to be used with Python's `asyncio` library, allowing you to make non-blocking requests to the OpenAI API.

In [1]:
import os
from openai import AsyncOpenAI
from openaivec.aio import pandas_ext
import pandas as pd
from pydantic import BaseModel, Field
from enum import Enum

pandas_ext.use(AsyncOpenAI())
pandas_ext.responses_model("gpt-4.1-mini")
pandas_ext.embeddings_model("text-embedding-3-small")

The model name 'gpt-4.1-mini' is not supported by tiktoken. Instead, using the 'o200k_base' encoding.


In [2]:
files_dict = {"path": []}
for root, dirs, files in os.walk('../../src/openaivec'):
    for file in files:
        path = os.path.join(root, file)
        if "__" not in path:
            files_dict["path"].append(os.path.join(root, file))

implements_df = pd.DataFrame(files_dict).assign(
    module=lambda df: df["path"].str.split("/")
        .map(lambda x: x[3:])
        .map(lambda x: ".".join(x))
        .map(lambda x: x.replace(".py", "")),
)

In [3]:
implements_df

Unnamed: 0,path,module
0,../../src/openaivec/serialize.py,openaivec.serialize
1,../../src/openaivec/responses.py,openaivec.responses
2,../../src/openaivec/log.py,openaivec.log
3,../../src/openaivec/util.py,openaivec.util
4,../../src/openaivec/embeddings.py,openaivec.embeddings
5,../../src/openaivec/pandas_ext.py,openaivec.pandas_ext
6,../../src/openaivec/spark.py,openaivec.spark
7,../../src/openaivec/prompt.py,openaivec.prompt
8,../../src/openaivec/beta/batch.py,openaivec.beta.batch
9,../../src/openaivec/beta/schema.py,openaivec.beta.schema


In [4]:
class OpjectType(str, Enum):
    FUNCTION = "function"
    CLASS = "class"
    VARIABLE = "variable"

class Question(BaseModel):
    question: str = Field(description="The specific question related to the code section.")
    answer: str = Field(description="The corresponding answer explaining the code aspect.")

class Section(BaseModel):
    name: str = Field(description="The name of the function or class being documented.")
    type: OpjectType = Field(description="The type of the code section, either a function or a class.")
    description: str = Field(description="A concise summary of the function or class's purpose and functionality.")
    questions: list[Question] = Field(description="A list of Q&A pairs clarifying aspects of this code section.")

class Document(BaseModel):
    sections: list[Section] = Field(description="A list of sections, each documenting a specific function or class.")

Note that async methods are not available in lambda functions. We can use `aio.assign` instead of `assign` to use an async function in a lambda function.
And top-level await is allowed in notebook cells.

In [5]:
docs_df = await implements_df.aio.assign(
    code=lambda df: df["path"].map(lambda x: open(x).read()),
    doc=lambda df: df["code"].aio.responses(
        instructions="Document the code in detail, including a summary, Q&A pairs, and explanations.",
        response_format=Document,
        batch_size=1,
    ),
)


In [6]:
docs_df

Unnamed: 0,path,module,code,doc
0,../../src/openaivec/serialize.py,openaivec.serialize,"from enum import Enum\nfrom typing import Any,...","sections=[Section(name='serialize_base_model',..."
1,../../src/openaivec/responses.py,openaivec.responses,"""""""Vectorized interaction helpers for OpenAI c...",sections=[Section(name='_vectorize_system_mess...
2,../../src/openaivec/log.py,openaivec.log,import functools\nimport json\nimport time\nim...,"sections=[Section(name='observe', type=<Opject..."
3,../../src/openaivec/util.py,openaivec.util,import asyncio\nimport functools\nimport re\ni...,sections=[Section(name='get_exponential_with_c...
4,../../src/openaivec/embeddings.py,openaivec.embeddings,"""""""Embedding utilities built on top of OpenAI’...","sections=[Section(name='BatchEmbeddings', type..."
5,../../src/openaivec/pandas_ext.py,openaivec.pandas_ext,"""""""Pandas Series / DataFrame extension for Ope...","sections=[Section(name='use', type=<OpjectType..."
6,../../src/openaivec/spark.py,openaivec.spark,"""""""Spark UDFs for the OpenAI and Azure OpenAI ...","sections=[Section(name='UDFBuilder', type=<Opj..."
7,../../src/openaivec/prompt.py,openaivec.prompt,"""""""\nThis module provides a builder for creati...","sections=[Section(name='Example', type=<Opject..."
8,../../src/openaivec/beta/batch.py,openaivec.beta.batch,import json\nfrom dataclasses import dataclass...,"sections=[Section(name='JsonlUDFBuilder', type..."
9,../../src/openaivec/beta/schema.py,openaivec.beta.schema,from pydantic import BaseModel\n\n__all__ = [\...,"sections=[Section(name='Entity', type=<OpjectT..."


In [7]:
questions_df = await docs_df.aio.pipe(
    lambda df: df
    .drop(columns=["code"])
    .aio.extract("doc")
    .explode("doc_sections")
    .aio.extract("doc_sections")
    .explode("doc_sections_questions")
    .aio.extract("doc_sections_questions")
    .reset_index(drop=True)
    .aio.assign(
        doc_sections_type=lambda df: df.doc_sections_type.map(lambda x: x.value),
        embedding=lambda df: df["doc_sections_questions_question"].aio.embeddings()
    )
)



In [8]:
questions_df

Unnamed: 0,path,module,doc_sections_name,doc_sections_type,doc_sections_description,doc_sections_questions_question,doc_sections_questions_answer,embedding
0,../../src/openaivec/serialize.py,openaivec.serialize,serialize_base_model,function,Serializes a Pydantic BaseModel class into its...,What does serialize_base_model do?,It converts a Pydantic BaseModel class into a ...,"[0.007050314, 0.024263242, 0.008111037, -0.012..."
1,../../src/openaivec/serialize.py,openaivec.serialize,serialize_base_model,function,Serializes a Pydantic BaseModel class into its...,What is the input type for serialize_base_model?,It expects a Pydantic BaseModel class type (no...,"[0.011857086, 0.044352297, -0.0061163246, -0.0..."
2,../../src/openaivec/serialize.py,openaivec.serialize,serialize_base_model,function,Serializes a Pydantic BaseModel class into its...,What is the output of serialize_base_model?,A dictionary representing the JSON schema of t...,"[0.024005463, 0.036001954, 0.03053425, -0.0335..."
3,../../src/openaivec/serialize.py,openaivec.serialize,dereference_json_schema,function,Recursively resolves all JSON Schema $ref refe...,What is the purpose of dereference_json_schema?,To replace all $ref references in a JSON schem...,"[0.044604924, 0.022849388, 0.021536764, -0.003..."
4,../../src/openaivec/serialize.py,openaivec.serialize,dereference_json_schema,function,Recursively resolves all JSON Schema $ref refe...,How does dereference_json_schema handle nested...,It recursively traverses dictionaries and list...,"[0.030828789, -0.0003850625, 0.051999792, -0.0..."
...,...,...,...,...,...,...,...,...
155,../../src/openaivec/aio/spark.py,openaivec.aio.spark,EmbeddingsUDFBuilder,class,A frozen dataclass builder for creating asynch...,What is the purpose of EmbeddingsUDFBuilder?,It builds asynchronous Spark pandas UDFs to ge...,"[-0.043948814, -0.0048122015, 0.047631405, 0.0..."
156,../../src/openaivec/aio/spark.py,openaivec.aio.spark,EmbeddingsUDFBuilder,class,A frozen dataclass builder for creating asynch...,How do you create an EmbeddingsUDFBuilder for ...,"Use the class method of_openai(api_key, model_...","[-0.032549456, -0.017758746, 0.06227928, 0.006..."
157,../../src/openaivec/aio/spark.py,openaivec.aio.spark,EmbeddingsUDFBuilder,class,A frozen dataclass builder for creating asynch...,How do you create an EmbeddingsUDFBuilder for ...,"Use the class method of_azure_openai(api_key, ...","[-0.044124752, -0.021921927, 0.066667005, 0.00..."
158,../../src/openaivec/aio/spark.py,openaivec.aio.spark,EmbeddingsUDFBuilder,class,A frozen dataclass builder for creating asynch...,What does the build method return?,It returns a Spark pandas UDF that asynchronou...,"[-0.010364348, 0.024629109, 0.04178817, -0.016..."
