# Building AI Agent with OpenAI Assistants API in Streaming mode with access to documents and database

In this tutorial, we will build an AI agent for finding the best job offers for candidates from our HR database utilizing [OpenAI Assistants API](https://platform.openai.com/docs/assistants/overview?context=with-streaming). Communication via natural language with the database will be provided by [**db-ally**](https://db-ally.deepsense.ai/) and our final agent will be capable of:
1. Listing job offers,
2. Matching candidates to offers,
3. Drafting an email to the candidate, informing them about job opportunities.

At the end of the tutorial, we will be able to have a conversation similar to this one:

<img src="https://drive.google.com/uc?export=view&id=1yjzd1z56KwZg-6BsYCeDEIESZEXShE0p" alt="Database schem" width="1000" height="auto">

The architecture of our system looks as follows:

<img src="https://drive.google.com/uc?export=view&id=161znWIfkWe5ehyXyVtmFakxthjyyJFdq" alt="Database schem" width="800" height="auto">

## Setup

Install [**db-ally**](https://db-ally.deepsense.ai/) with the OpenAI extension and [nest-asyncio](https://pypi.org/project/nest-asyncio/) needed for nested event-loop.


In [None]:
!pip install dbally[openai] nest_asyncio

We start by defining the `OpenAI client`.

Remember to set your `OPENAI_API_KEY`. At any point if something is not working always start by checking if this variable is set.

In [None]:
from openai import OpenAI

OPENAI_API_KEY = ""
OPENAI_CLIENT = OpenAI(api_key=OPENAI_API_KEY)

Next, we write a function that will create our [Assistant](https://platform.openai.com/docs/assistants/overview?context=with-streaming) with the provided instruction, [tools](https://platform.openai.com/docs/assistants/tools), name, and LLM model.

In [None]:
from openai.types.beta import Assistant


def get_assistant(
    instructions: str, tools: list[dict] = [], name="HR assistant", model="gpt-4-turbo-preview"
) -> Assistant:
    return OPENAI_CLIENT.beta.assistants.create(
        name=name,
        instructions=instructions,
        tools=tools,
        model=model,
    )

From the perspective of this tutorial two parameters have the biggest influence:
* `instructions` - which play a role similar to the [system prompt](https://docs.anthropic.com/claude/docs/system-prompts).
* [`tools`](https://platform.openai.com/docs/assistants/tools) -  a list containing external functions defined using JSON format that can be used by the Assistant. Currently, Assistants API supports [code-interpreter](https://platform.openai.com/docs/assistants/tools/code-interpreter), [knowledge retrieval](https://platform.openai.com/docs/assistants/tools/knowledge-retrieval), and [function calling](https://platform.openai.com/docs/guides/function-calling).

Let's inspect the returned object.

In [None]:
BASIC_INSTRUCTION = "You are a helpful AI assistant that helps in finding jobs and matching candidates to them."
first_assistant = get_assistant(BASIC_INSTRUCTION)
first_assistant

Next, we move on and define a chat function that initializes a conversation with an agent. To do so, we need:
* a [Thread](https://platform.openai.com/docs/api-reference/threads/object) representing our conversation,
* [Messages](https://platform.openai.com/docs/api-reference/messages/object),
* and a [Run](https://platform.openai.com/docs/api-reference/runs/object).

The code below does three things:
1. Defines an Event handler that takes care of every token sent by OpenAI API. It is required for the [streaming mode](https://community.openai.com/t/what-is-this-new-streaming-parameter/391558/2).
2. Creates components necessary for interaction with the Assistant.
3. Initializes and streams our run inside a context manager.

In [None]:
from typing_extensions import override
from openai import AssistantEventHandler
from typing import Callable


class EventHandler(AssistantEventHandler):
    @override
    def on_text_created(self, text) -> None:
        print(f"\nassistant > ", end="", flush=True)

    @override
    def on_text_delta(self, delta, snapshot):
        print(delta.value, end="", flush=True)

    def on_tool_call_done(self, tool_call):
        print(tool_call, end="", flush=True)


def chat(assistant: Assistant, handler_builder: Callable, questions_to_ask: list[str] = []):
    thread = OPENAI_CLIENT.beta.threads.create()
    questions_gen = (msg for msg in questions_to_ask)

    while True:
        try:
            msg = next(questions_gen)
            print(f"User: {msg}")
        except StopIteration:
            msg = input("\nProvide next message or 'q' to finish the conversation: ")

        if msg.lower() == "q":
            break

        OPENAI_CLIENT.beta.threads.messages.create(thread_id=thread.id, role="user", content=msg)

        handler = handler_builder()
        handler.thread_id = thread.id
        with OPENAI_CLIENT.beta.threads.runs.create_and_stream(
            thread_id=thread.id,
            assistant_id=assistant.id,
            event_handler=handler,
        ) as stream:
            stream.until_done()

Let's test the initial version of our Assistant. Unfortunately, without any tools, the assistant will not list offers but guide us on how we can find them.

**Remark:** At any moment in this tutorial feel free to modify `questions_to_ask` function attribute. Or set it to be an empty array for completely interactive conversation.

In [None]:
chat(
    first_assistant,
    lambda: EventHandler(),
    questions_to_ask=["What jobs are available for Software Engineer right now? Please be concise."],
)

The agent needs some refinement. Our next task is to allow the agent to access the recruitment database.

<img src="https://drive.google.com/uc?export=view&id=1A5yPt-pIyXGV94c6cMIkMf8AhiP6Nnq6" alt="Database schem" width="500" height="auto">

First, download [`the database`](https://drive.google.com/file/d/1A5yPt-pIyXGV94c6cMIkMf8AhiP6Nnq6/view?usp=drive_link) file.

In [None]:
!wget -O recruitment.db 'https://drive.google.com/uc?export=download&id=1zo3j8x7qH8opTKyQ9qFgRpS3yqU6uTRs'

And then:
1. Create the database (`ENGINE`). Thanks, [SQLite In-memory database](https://www.sqlite.org/inmemorydb.html) we can load the file directly.
2. Define ORM Model (`RECRUITMENT_MODEL`) with [sqlalchemy automap](https://docs.sqlalchemy.org/en/20/orm/extensions/automap.html)
3. Check that everything went well with [selecting](https://docs.sqlalchemy.org/en/20/tutorial/data_select.html) and printing 5 rows of every table

In [None]:
from sqlalchemy import create_engine
from sqlalchemy.ext.automap import automap_base
from sqlalchemy import select

ENGINE = create_engine("sqlite:///recruitment.db")

RECRUITMENT_MODEL = automap_base()
RECRUITMENT_MODEL.prepare(autoload_with=ENGINE, reflect=True)

# Verify that all entires were created
stmts = [
    select(RECRUITMENT_MODEL.classes.candidate),
    select(RECRUITMENT_MODEL.classes.application),
    select(RECRUITMENT_MODEL.classes.offer),
]

with ENGINE.connect() as conn:
    for stmt in stmts:
        for row in conn.execute(stmt.limit(5)):
            print(row)

Next, we create 2 [views](https://db-ally.deepsense.ai/concepts/views/) `JobOfferView` and `CandidateView`.

Please, recall the [first tutorial](https://colab.research.google.com/drive/1xUblkrUM3dtVJvQiug_IN_MTNX4bUX-4?usp=sharing) if you are unfamiliar with the View concept.

In [None]:
from dbally import SqlAlchemyBaseView, decorators
import sqlalchemy


class JobOfferView(SqlAlchemyBaseView):
    """
    View meant for answering questions about job offers.
    """

    def get_select(self) -> sqlalchemy.Select:
        return select(RECRUITMENT_MODEL.classes.offer)

    @decorators.view_filter()
    def filter_job_offers_by_position(self, position: str) -> sqlalchemy.ColumnElement:
        return RECRUITMENT_MODEL.classes.offer.position == position


class CandidateView(SqlAlchemyBaseView):
    """
    View meant for answering questions about candiates.
    """

    def get_select(self) -> sqlalchemy.Select:
        return select(RECRUITMENT_MODEL.classes.candidate)

    @decorators.view_filter()
    def filter_candidates_by_experience(self, years: int) -> sqlalchemy.ColumnElement:
        return RECRUITMENT_MODEL.classes.candidate.years_of_experience >= years

Notice, that we've created 2 filters, so db-ally can now sieve job offers for a given position and also return candidates that have more than the given amount of experience.

Next, we create a [Collection](https://db-ally.deepsense.ai/concepts/collections/) and register created Views.

In [None]:
import dbally

dbally.use_openai_llm(openai_api_key=OPENAI_API_KEY)

recruitment_db = dbally.create_collection("recruitment")
recruitment_db.add(JobOfferView, lambda: JobOfferView(ENGINE))
recruitment_db.add(CandidateView, lambda: CandidateView(ENGINE))

How about testing the `recruitment_db` Collection?

After running the cell below, you should see an `ExecutionResult` object with Software Engineer job offers.

In [None]:
await recruitment_db.ask("How many Software Engineer offers do we have?")

## Defining assistant with access to tools.

We are almost there! Now, we need to inform our assistant about the possibility of using db-ally. We can achieve it by providing it with instructions on how db-ally can be used - we need a tool definition in JSON format compliant with [OpenAI function calling](https://platform.openai.com/docs/guides/function-calling)

`OpenAIAdapter` from dbally will help us to achieve this goal. Let's initialize it with the previously created collection.



In [None]:
from dbally.assistants.openai import OpenAIAdapter
from pprint import pprint

adapter = OpenAIAdapter(recruitment_db)

tool_json = adapter.generate_function_json()
tool_json["function"]["description"]
pprint(tool_json, width=120)

Remember that:
* `description` field contains information on when and how to use db-ally:
  * It states that db-ally has access to the views defined in the collection.
  * It demonstrates how to ask questions to db-ally.
  * It asks to separate questions from different topics into separate function calls. Yes, the Assistant can call one tool multiple times during one step.
* Next, it defines `parameters` In this case it is just `query:str`.

So we expect GPT to use the function in the following way: `use_dbally("Query about job offers")`

Next, we create our second assistant, this time with provide it with tools to call.



In [None]:
second_assistant = get_assistant(BASIC_INSTRUCTION, tools=[tool_json])

And run it.

In [None]:
chat(
    second_assistant,
    lambda: EventHandler(),
    questions_to_ask=["What jobs are available for Software Engineer right now? Please be concise."],
)

Notice that the `FunctionToolCall`. Unfortunately, the Assistant will not execute the function by itself. It will return a JSON that can be used to execute a given function.

So, the next step is to write code that will call functions returned by GPT.

To achieve it we must extend `on_tool_call_done` function. `AssistantEventHandler` receives GPT response token by token. So, the entire function call is available only at the very end of streaming it. Thus, we handle it inside `on_tool_call_done`.

Additionally, we need the `ids` of:
* Run,
* Assistant,
* and Thread

to respond back with our function results.

In [None]:
import asyncio
import nest_asyncio
import json

nest_asyncio.apply()


class EventHandler(AssistantEventHandler):
    def __init__(self, assistant_id: str, functions: dict[str, callable] = {}, adapter=None):
        super().__init__()
        self.functions = functions
        self.adapter = adapter
        self.run_id = None
        self.thread_id = None
        self.assistant_id = assistant_id

    @override
    def on_text_created(self, text) -> None:
        print(f"\nassistant > ", end="", flush=True)

    @override
    def on_text_delta(self, delta, snapshot):
        print(delta.value, end="", flush=True)

    @override
    def on_run_step_created(self, run_step) -> None:
        self.run_id = run_step.run_id

    def on_tool_call_done(self, tool_call):
        if tool_call.type == "function":
            response = asyncio.run(self.adapter.process_response([tool_call]))
            response_parsed_for_gpt = self.adapter.process_functions_execution(response)

            function_name = tool_call.function.name
            if function_name in self.functions:
                function_args = json.loads(tool_call.function.arguments)
                response = self.functions[function_name](**function_args)
                response_parsed_for_gpt.append({"tool_call_id": tool_call.id, "output": response})

            with OPENAI_CLIENT.beta.threads.runs.submit_tool_outputs_stream(
                thread_id=self.thread_id,
                run_id=self.run_id,
                tool_outputs=response_parsed_for_gpt,
                event_handler=EventHandler(self.thread_id, self.assistant_id),
            ) as stream:
                stream.until_done()

Remember that:
1. dbally is an asynchronous library. Therefore, you always need to either `await` calls to it or run them in separate `asyncio.event_loop`.
2. After obtaining a response you can process it with the help of `OpenAIAdapter.process_functions_execution`. In return, you get a dictionary that can be sent back to OpenAI Assistant.

Finally, we test our agent capabilities.

In [None]:
handler_builder = lambda: EventHandler(second_assistant.id, adapter=adapter)
chat(
    second_assistant,
    handler_builder,
    questions_to_ask=[
        "What jobs are available for Software Engineer right now?",
        "Is there a candidate with enough experience for the first google job offer?",
    ],
)

## Bonus: Adding more functionalities

Let's experiment a little bit and add LinkedIn and email APIs to function calling  to build even more complex interactions with our assistant.

**Remark: for the need of this tutorial we Mock access to LinkedIn and Gmail APIs as this is not the most crucial part of it. Feel free to replace mocks with real API calls.**

As previously we need to create JSON definitions of function calls.

In [None]:
def linkedin_api(position: str) -> str:
    return """
      position: Software Engineer
      expected_years_of_experience: 10
      company: Amazon
      salary: $100000

      position: Software Engineer
      expected_years_of_experience: 2
      company: Neftlix
      salary: $60000
  """


def gmail_api(job_description: str, recipient: str):
    return f"""
        Hello {recipient},

        We have found a perfect job offer for you {job_description}
  """


li_definition = {
    "type": "function",
    "function": {
        "name": "linkedin_api",
        "description": "Use it to get most recent job offers from linkedin",
        "parameters": {
            "type": "object",
            "properties": {
                "position": {
                    "type": "string",
                    "description": "Position for which you want to get job offers",
                }
            },
            "required": ["query"],
        },
    },
}

gmail_definition = {
    "type": "function",
    "function": {
        "name": "gmail_api",
        "description": "Use it to send email with job offer to the recipient",
        "parameters": {
            "type": "object",
            "properties": {
                "recipient": {
                    "type": "string",
                    "description": "name of the recipient",
                },
                "job_description": {
                    "type": "string",
                    "description": "Description of offered job",
                },
            },
            "required": ["query"],
        },
    },
}

functions = {"linkedin_api": linkedin_api, "gmail_api": gmail_api}

After this, we just provide all the tools to our agent and test them. Feel free to modify `questions_to_ask` and extend the conversation.

In [None]:
second_assistant = get_assistant(BASIC_INSTRUCTION, tools=[tool_json, li_definition, gmail_definition])

handler_builder = lambda: EventHandler(second_assistant.id, adapter=adapter, functions=functions)
chat(
    second_assistant,
    handler_builder,
    questions_to_ask=["Send and email to sarah.carrey@gmail.com about Google Job for Software Enginner"],
)

This is it, you have built an AI agent capable of:
1. Listing job offers,
2. Matching candidates to offers,
3. Drafting an email to the candidate, informing them about job opportunities.

Congratulations!