# Building your first OpenAI Assistant

In [None]:
from dotenv import load_dotenv
import json
from openai import  AsyncOpenAI
from openai.types.beta import Thread
from openai.types.beta.threads import Run
from pydantic import BaseModel
from typing import Dict, List

In [None]:
load_dotenv(override=True)

True

## 1. Define the Assistant class

In [13]:
class AssistantResult(BaseModel):
    response: str
    thread_id: str

class MaxTurnsReachedException(Exception):
    def __init__(self):
        super().__init__("Reached maximum number of turns")

In [14]:
class Assistant:
    def __init__(self, assistant_id: str, tools: Dict[str, callable]):
        self.client = AsyncOpenAI()
        self.assistant_id = assistant_id
        self.assistant = None
        self.tools = tools
    
    async def retrieve_assistant(self):
        if self.assistant is None:
            self.assistant = await self.client.beta.assistants.retrieve(self.assistant_id)
        return self.assistant
    
    async def run(self, query: str, thread_id: str = None, max_turns: int = 5) -> AssistantResult:
        await self.retrieve_assistant()

        print(f"Running assistant with thread_id {thread_id}")

        if(thread_id is None):
            thread: Thread = await self.client.beta.threads.create()
            print(f"Created new thread with id {thread.id}")
        else:
            thread: Thread = await self.client.beta.threads.retrieve(thread_id)
            print(f"Retrieved thread with id {thread.id}")
        
        print(f"Sending query to thread {thread.id}: {query}")
        await self.client.beta.threads.messages.create(
            thread_id=thread.id, role="user", content=query
        )

        run: Run = await self.client.beta.threads.runs.create_and_poll(
            thread_id=thread.id,
            assistant_id=self.assistant_id,
        )

        for turn in range(max_turns):

            # Fetch the last message from the thread
            messages = await self.client.beta.threads.messages.list(
                thread_id=thread.id,
                run_id=run.id,
                order="desc",
                limit=1,
            )
            print(f"Fetched last message from thread {thread.id}: {messages}")

            # Check for the terminal state of the Run.
            # If state is "completed", exit agent loop and return the LLM response.
            if run.status == "completed":
                print(f"Run completed for thread {thread.id}")
                assistant_res: str = next(
                    (
                        content.text.value
                        for content in messages.data[0].content
                        if content.type == "text"
                    ),
                    None,
                )

                return AssistantResult(thread_id=thread.id, response=assistant_res)
            
            # If state is "requires_action", function calls are required. Execute the functions and send their outputs to the LLM.
            if run.status == "requires_action":
                func_tool_outputs = []

                # LLM can ask for multiple functions to be executed. Execute all function calls in loop and
                # append the results into `func_tool_outputs` list.
                for tool in run.required_action.submit_tool_outputs.tool_calls:
                    # parse the arguments required for the function call from the LLM response
                    args = (
                        json.loads(tool.function.arguments)
                        if tool.function.arguments
                        else {}
                    )

                    try:
                        print("Running function {} with args {} for thread {}".format(tool.function.name, args, thread.id))
                        func_output = await self.tools[tool.function.name](**args)
                        print("Function outputs: {}".format(func_output))
                    except Exception as e:
                        print("Error in running function {}: {}".format(tool.function.name, e))
                        func_output = f'Error in running function {tool.function.name}: {e}'

                    # OpenAI needs the output of the function call against the tool_call_id
                    func_tool_outputs.append(
                        {"tool_call_id": tool.id, "output": str(func_output)}
                    )

                # Submit the function call outputs back to OpenAI
                run = await self.client.beta.threads.runs.submit_tool_outputs_and_poll(
                    thread_id=thread.id, run_id=run.id, tool_outputs=func_tool_outputs
                )

                # Continue the agent loop.
                # Agent will check the output of the function output submission as part of next iteration.
                continue

            # Handle errors if terminal state is "failed"
            else:
                if run.status == "failed":
                    print(
                        f"OpenAIFunctionAgent turn-{turn+1} | Run failure reason: {run.last_error}"
                    )

                raise Exception(
                    f"Failed to generate text due to: {run.last_error}"
                )
        
        # Raise error if turn-limit is reached.
        await self.client.beta.threads.runs.cancel(run.id,thread_id=thread_id)
        raise MaxTurnsReachedException()
    
    async def cancel_thread_run(self, thread_id: str):
        thread: Thread = await self.client.beta.threads.retrieve(thread_id)
        run: Run = await self.client.beta.threads.runs.list(thread_id=thread.id).data[0]

        await self.client.beta.threads.runs.cancel(run.id,thread_id=thread_id)

## 2. Define the Assistant Tools

In [None]:
import subprocess
import os


# define an isolated working directory for the agent
AGENT_WORKING_DIRECTORY = "./workdir"

async def list_repositories(directory : str = AGENT_WORKING_DIRECTORY) -> str:
    """
    List all repositories in the given directory, by the last modified date
    """
    return os.popen("ls -lat " + directory + " | awk '{print $6, $7, $8, $9}'").read()


async def run_git_command(command_arguments: str) -> str:
    """
    Run a git command with the given arguments
    """
    try:
        result = subprocess.run("git " + command_arguments, shell=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
        return result.stdout.strip()
    except Exception as e:
        return str(e)

## 3. Create the OpenAI Assistant using the OpenAI Playground

1. Navigate to [OpenAI Assistant Playground](https://platform.openai.com/playground/assistants)
2. Create a new Assistant and name following the format: `<name_first_letter>_<surname>_git_assistant`
3. Define the System Instructions
4. Set gpt-4o-mini as model
5. Generate and add the function schemas (you can use the "generate" button and provide the function code above)

## 4. Instantiate the Assistant

In [None]:
ASSISTANT_ID = "<assistant-id>"

TOOLS = {
    "run_git_command": run_git_command,
    "list_repositories": list_repositories
    }

In [32]:
assistant = Assistant(ASSISTANT_ID, TOOLS)

## 5. Use the Assistant

In [None]:
query = "TBD"

res = await assistant.run(query=query, max_turns=5)
print("###########################################\n")
print(f"Thread ID: {res.thread_id}")
print(f"Response:\n{res.response}")