In [None]:
from __future__ import annotations
import os
import asyncio
from pydantic import BaseModel

# Import your agent framework components
from agents import (
    Agent,
    Runner,
    TResponseInputItem,
    function_tool,
    MessageOutputItem,
    ToolCallOutputItem,
    RunContextWrapper,
    ItemHelpers,
)

import agentops
from dotenv import load_dotenv
from openai import OpenAI

load_dotenv()

True

In [3]:
agentops.init(api_key=os.getenv("AGENTOPS_API_KEY"))

In [4]:
### CONTEXT
class DataAnalysisContext(BaseModel):
    csv_data: list[dict] | None = None
    analysis_summary: str | None = None

In [5]:
### ASSISTANTS API CALL WRAPPER
def assistants_analysis(file_path: str, user_query: str, instructions: str) -> str:
    """
    Uses the OpenAI assistants API (with code interpreter) to analyze CSV data.
    """  
    client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    # Step 1: Upload the CSV file
    file = client.files.create(
        file=open(file_path, "rb"),
        purpose="assistants"
    )

    # Step 2: Create an Assistant with Code Interpreter
    assistant = client.beta.assistants.create(
        name="Data Analyst",
        instructions=instructions,
        tools=[{"type": "code_interpreter"}],
        model="gpt-4-turbo"
    )

    # Step 3: Create a Thread and Ask Questions
    thread = client.beta.threads.create()
    client.beta.threads.messages.create(
        thread_id=thread.id,
        role="user",
        content=user_query,
        attachments=[{
            "file_id": file.id,
            "tools": [{"type": "code_interpreter"}]
        }]
    )

    run = client.beta.threads.runs.create(
        thread_id=thread.id,
        assistant_id=assistant.id
    )

    # Poll for completion
    while run.status != "completed":
        run = client.beta.threads.runs.retrieve(
            thread_id=thread.id,
            run_id=run.id
        )

    messages = client.beta.threads.messages.list(thread_id=thread.id)
    # Return the result from the first message content
    return messages.data[0].content[0].text.value

In [6]:
### TOOL DEFINITION
@function_tool(
    name_override="advanced_analysis_tool",
    description_override="Uses the OpenAI assistants API with code interpreter to perform advanced CSV data analysis."
)
async def advanced_analysis_tool(
    context: RunContextWrapper[DataAnalysisContext], file_path: str, user_query: str
) -> str:
    instructions = (
        "Provide detailed feedback and suggestions on the following analysis output from the assistants_analysis function (assistants API)"
        # "Analyze CSV data. Provide insights, statistics, and visualizations if applicable."
    )
    result = assistants_analysis(file_path, user_query, instructions)
    # Save the result in context
    context.context.analysis_summary = result
    return result

In [7]:
### AGENT DEFINITION
advanced_analysis_agent = Agent[DataAnalysisContext](
    name="Advanced Data Analysis Agent",
    handoff_description="Agent for automated advanced CSV data analysis using the assistants API.",
    instructions=(
        "You are an advanced data analysis agent. Use the advanced_analysis_tool to analyze CSV data "
        "using OpenAI's code interpreter. If the analysis is complete, report the insights."
    ),
    tools=[advanced_analysis_tool],
)

In [None]:
### MAIN RUNNER
async def main():
    context = DataAnalysisContext()

    # Prompt the user for the CSV file path and a query for analysis.
    file_path = input("Enter the CSV file path: ")
    user_query = input("Enter your analysis query (e.g., 'Give me insights and statistics about the data'): ")

    # Create an initial user message that instructs the agent.
    input_items: list[TResponseInputItem] = [
        {"content": f"Perform advanced analysis on the provided CSV data {file_path} and answer the query: {user_query}.", "role": "user"}
    ]
    current_agent = advanced_analysis_agent

    # Run the agent logic using the Runner.
    result = await Runner.run(current_agent, input_items, context=context)
    for new_item in result.new_items:
        if isinstance(new_item, MessageOutputItem):
            print(f"{new_item.agent.name}: {ItemHelpers.text_message_output(new_item)}")
        elif isinstance(new_item, ToolCallOutputItem):
            print(f"{new_item.agent.name}: Tool call output: {new_item.output}")
    
    # Display the final analysis summary from the context.
    if context.analysis_summary:
        print("Final Analysis Summary:", context.analysis_summary)

In [9]:
if __name__ == "__main__":
    await main()
    # asyncio.run(main())

Advanced Data Analysis Agent: Tool call output: The dataset appears to be a Human Resources (HR) dataset that includes details about employee attributes and their professional assessment within a company. Here are some of the columns in the dataset:

- `Attrition`: Employee's current status (Yes if they left the company).
- `Business Travel`: Frequency of business travel.
- `CF_age band`: Age category of the employee.
- `CF_attrition label`: Labels employees as Current or Ex-Employees.
- `Department`: The department where the employee works.
- `Education Field`: Field of education of the employee.
- `Gender`: Gender of the employee.
- `Job Role`: The role/job title of the employee.
- `Performance Rating`: A rating of the employee's performance.
- `Total Working Years`: Total years of work experience.
- `Years At Company`: Number of years the employee has worked at the company.
- `Years Since Last Promotion`: Years since the employee's last promotion.

Based on the first glance at the d