# Building an Agentic AI System with OpenAI

This notebook demonstrates how to create an intelligent agent using OpenAI's API. We'll build a system that can understand tasks, plan their execution, and maintain context through memory.

## 1. Setup and Installation

First, we'll install the required packages and set up our OpenAI environment. We need:
- `openai`: For interacting with OpenAI's API
- `python-dotenv`: For managing environment variables
- `numpy`: For numerical operations
- `typing`: For type hints

In [1]:
import os
from typing import Dict, List, Optional, Any
import openai
from dotenv import load_dotenv
import numpy as np
import json
from datetime import datetime, date

# Load environment variables
load_dotenv()

# Configure OpenAI API
openai.api_key = os.getenv('OPENAI_API_KEY')

In [2]:
# Simple API Test
print("Current API key configuration:")
api_key = os.getenv('OPENAI_API_KEY')
if api_key:
    print(f"API key starts with: {api_key[:7]}")
    print(f"API key length: {len(api_key)} characters")
    if not api_key.startswith('sk-'):
        print("⚠️ Warning: API key should start with 'sk-'")
else:
    print("❌ No API key found in environment variables")

Current API key configuration:
API key starts with: sk-proj
API key length: 164 characters


In [3]:
# Test OpenAI API connection
try:
    # Print the first few characters of the API key (safely)
    api_key = os.getenv('OPENAI_API_KEY')
    if api_key:
        print(f"API key found: {api_key[:6]}...")
    else:
        print("No API key found in environment variables")
        
    # Test API connection
    response = openai.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": "Hello, is the API working?"}],
        max_tokens=10
    )
    print("API Connection successful!")
    print("Response:", response.choices[0].message.content)
except Exception as e:
    print("Error testing API connection:")
    print(str(e))

API key found: sk-pro...
API Connection successful!
Response: Hello! I am an AI assistant and I do
API Connection successful!
Response: Hello! I am an AI assistant and I do


## 2. Define Agent Class

Now we'll create our base Agent class that will serve as the foundation for our agentic AI system. This class will include:
- Initialization with OpenAI model settings
- Basic conversation handling
- State management

In [4]:
class Agent:
    def __init__(
        self,
        name: str,
        model: str = "gpt-3.5-turbo",  # Changed from gpt-4 to gpt-3.5-turbo
        temperature: float = 0.7,
        max_tokens: int = 1000
    ):
        self.name = name
        self.model = model
        self.temperature = temperature
        self.max_tokens = max_tokens
        self.conversation_history: List[Dict[str, str]] = []
        self.state: Dict[str, Any] = {}
        
    def __str__(self) -> str:
        return f"Agent(name={self.name}, model={self.model})"
    
    def get_completion(self, prompt: str) -> str:
        """Get a completion from the OpenAI API."""
        messages = self.conversation_history + [{"role": "user", "content": prompt}]
        
        response = openai.chat.completions.create(
            model=self.model,
            messages=messages,
            temperature=self.temperature,
            max_tokens=self.max_tokens
        )
        
        return response.choices[0].message.content

## 3. Implement Core Agent Functions

Let's implement the core functionality that will allow our agent to:
- Process and understand input
- Generate appropriate responses
- Make decisions based on context
- Update its internal state

In [5]:
class AgenticAI(Agent):
    def __init__(self, name: str, **kwargs):
        super().__init__(name, **kwargs)
        self.state["tasks"] = []
        self.state["memory"] = []
        
    def process_input(self, user_input: str) -> Dict[str, Any]:
        """Process user input and extract key information."""
        prompt = f"""
        Please analyze the following user input and extract key information.
        Return ONLY a JSON object with these exact fields:
        {{
            "main_objective": "main goal of the task",
            "required_steps": ["step1", "step2"],
            "key_constraints": ["constraint1", "constraint2"],
            "priority_level": 3
        }}

        User Input:
        {user_input}

        Remember: Return ONLY the JSON object, no other text.
        """
        
        response = self.get_completion(prompt)
        self.conversation_history.append({"role": "user", "content": user_input})
        self.conversation_history.append({"role": "assistant", "content": response})
        
        try:
            response_text = response.strip()
            # Remove any potential markdown code block markers
            response_text = response_text.replace('```json', '').replace('```', '').strip()
            return json.loads(response_text)
        except json.JSONDecodeError as e:
            print("Error parsing response:", response_text)
            raise Exception("Failed to parse the model's response as JSON. Please try again.") from e
    
    def plan_execution(self, task_info: Dict[str, Any]) -> List[str]:
        """Create a plan to execute the task."""
        prompt = f"""
        Based on this task information, create a step-by-step plan.
        Return ONLY a Python list of strings, like this: ["step1", "step2", "step3"]

        Task information:
        {task_info}
        """
        
        response = self.get_completion(prompt)
        try:
            response_text = response.strip()
            # Remove any potential markdown code block markers
            response_text = response_text.replace('```python', '').replace('```', '').strip()
            steps = json.loads(response_text)
            self.state["tasks"].extend(steps)
            return steps
        except json.JSONDecodeError as e:
            print("Error parsing response:", response_text)
            raise Exception("Failed to parse the model's response as JSON. Please try again.") from e
    
    def execute_step(self, step: str) -> str:
        """Execute a single step and return the result."""
        prompt = f"""
        Execute this step and provide the result:
        {step}
        
        Consider the current context and previous steps.
        """
        
        result = self.get_completion(prompt)
        self.state["memory"].append({"step": step, "result": result})
        return result

## 4. Task Planning System

Now we'll implement a more sophisticated task planning system that can:
- Break down complex tasks into subtasks
- Prioritize tasks
- Handle dependencies between tasks

In [6]:
class TaskPlanner:
    def __init__(self):
        self.tasks: List[Dict[str, Any]] = []
        
    def decompose_task(self, task: str) -> List[Dict[str, Any]]:
        """Break down a complex task into subtasks."""
        example = {
            "subtask_id": "task1",
            "description": "description of the task",
            "estimated_time": "30min",
            "dependencies": []
        }
        
        prompt = f"""
        Decompose this task into subtasks. Return ONLY a JSON array of subtask objects.
        Each object must have these exact fields:
        {json.dumps(example, indent=2)}

        Task to decompose:
        {task}

        Remember: Return ONLY the JSON array, no other text.
        """
        
        response = openai.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{
                "role": "system",
                "content": "You are a task planning assistant. Always respond with valid JSON arrays containing subtask objects."
            },
            {
                "role": "user",
                "content": prompt
            }],
            temperature=0.7
        )
        
        try:
            response_text = response.choices[0].message.content.strip()
            # Remove any potential markdown code block markers
            response_text = response_text.replace('```json', '').replace('```', '').strip()
            subtasks = json.loads(response_text)
            self.tasks.extend(subtasks)
            return subtasks
        except json.JSONDecodeError as e:
            print("Error parsing response:", response_text)
            raise Exception("Failed to parse the model's response as JSON. Please try again.") from e
    
    def prioritize_tasks(self) -> List[Dict[str, Any]]:
        """Prioritize tasks based on dependencies and importance."""
        if not self.tasks:
            return []
        
        # Sort tasks by dependencies
        independent_tasks = [t for t in self.tasks if not t["dependencies"]]
        dependent_tasks = [t for t in self.tasks if t["dependencies"]]
        
        # Sort dependent tasks by dependency resolution
        sorted_tasks = independent_tasks
        while dependent_tasks:
            resolved = []
            for task in dependent_tasks:
                if all(dep in [t["subtask_id"] for t in sorted_tasks] for dep in task["dependencies"]):
                    sorted_tasks.append(task)
                    resolved.append(task)
            for task in resolved:
                dependent_tasks.remove(task)
                
        return sorted_tasks

## 5. Memory Management

Let's implement a memory system that allows our agent to:
- Store and retrieve past interactions
- Learn from previous experiences
- Maintain context across conversations

In [7]:
class MemorySystem:
    def __init__(self, capacity: int = 1000):
        self.capacity = capacity
        self.short_term: List[Dict[str, Any]] = []
        self.long_term: List[Dict[str, Any]] = []
        self.embeddings: Dict[str, np.ndarray] = {}
        
    def add_memory(self, memory: Dict[str, Any], memory_type: str = "short_term"):
        """Add a new memory to the system."""
        if memory_type == "short_term":
            self.short_term.append(memory)
            if len(self.short_term) > self.capacity:
                self._consolidate_memories()
        else:
            self.long_term.append(memory)
            
    def _consolidate_memories(self):
        """Move important memories from short-term to long-term memory."""
        if not self.short_term:
            return
            
        prompt = f"""
        Review these memories and identify which are important for long-term retention:
        {self.short_term}
        
        Return a list of indices for memories to keep.
        """
        
        response = openai.chat.completions.create(
            model="gpt-3.5-turbo",  # Changed from gpt-4 to gpt-3.5-turbo
            messages=[{"role": "user", "content": prompt}]
        )
        
        indices_to_keep = eval(response.choices[0].message.content)
        
        # Move important memories to long-term
        for idx in sorted(indices_to_keep, reverse=True):
            self.long_term.append(self.short_term.pop(idx))
            
        # Clear remaining short-term memories
        self.short_term.clear()
        
    def retrieve_relevant_memories(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
        """Retrieve memories relevant to the current query."""
        prompt = f"""
        Find memories relevant to this query:
        {query}
        
        Current memories:
        Short-term: {self.short_term}
        Long-term: {self.long_term}
        
        Return up to {limit} most relevant memories as indices [short_term_idx, long_term_idx].
        """
        
        response = openai.chat.completions.create(
            model="gpt-3.5-turbo",  # Changed from gpt-4 to gpt-3.5-turbo
            messages=[{"role": "user", "content": prompt}]
        )
        
        relevant_indices = eval(response.choices[0].message.content)
        
        relevant_memories = []
        for st_idx, lt_idx in relevant_indices:
            if st_idx >= 0:
                relevant_memories.append(self.short_term[st_idx])
            if lt_idx >= 0:
                relevant_memories.append(self.long_term[lt_idx])
                
        return relevant_memories

## 6. Test Agent Interaction

Let's create an example to demonstrate how our agentic AI system works with a practical task.

In [8]:
# Create specialized financial planning agents
retirement_agent = AgenticAI(
    name="RetirementPlanner",
    model="gpt-3.5-turbo",
    temperature=0.7
)

education_agent = AgenticAI(
    name="EducationPlanner",
    model="gpt-3.5-turbo",
    temperature=0.7
)

# Create supporting systems for each agent
retirement_planner = TaskPlanner()
education_planner = TaskPlanner()
shared_memory = MemorySystem()  # Shared memory system for both agents

# Example financial planning scenario
user_profile = {
    "age": 35,
    "current_savings": 100000,
    "annual_income": 85000,
    "retirement_age": 65,
    "children": [
        {"age": 5, "education_goal": "college"},
        {"age": 2, "education_goal": "college"}
    ]
}

# Retirement planning task
retirement_task = f"""
Analyze retirement planning needs and create a comprehensive strategy based on:
- Current age: {user_profile['age']}
- Current savings: ${user_profile['current_savings']}
- Annual income: ${user_profile['annual_income']}
- Target retirement age: {user_profile['retirement_age']}
Consider inflation, investment strategies, and risk management.
"""

# Education planning task
education_task = f"""
Develop an education savings strategy for two children:
- Child 1: Age {user_profile['children'][0]['age']}, Goal: {user_profile['children'][0]['education_goal']}
- Child 2: Age {user_profile['children'][1]['age']}, Goal: {user_profile['children'][1]['education_goal']}
Consider college cost inflation, 529 plans, and timeline planning.
"""

# Process retirement planning
print("=== Retirement Planning Analysis ===")
retirement_info = retirement_agent.process_input(retirement_task)
print("\nRetirement Planning Overview:")
print(json.dumps(retirement_info, indent=2))

retirement_steps = retirement_planner.decompose_task(retirement_task)
print("\nRetirement Planning Steps:")
for step in retirement_steps:
    print(f"- {step['description']} (Est. time: {step['estimated_time']})")

print("\n=== Education Planning Analysis ===")
education_info = education_agent.process_input(education_task)
print("\nEducation Planning Overview:")
print(json.dumps(education_info, indent=2))

education_steps = education_planner.decompose_task(education_task)
print("\nEducation Planning Steps:")
for step in education_steps:
    print(f"- {step['description']} (Est. time: {step['estimated_time']})")

# Execute retirement planning steps and store in shared memory
print("\n=== Executing Retirement Planning Steps ===")
for step in retirement_planner.prioritize_tasks():
    result = retirement_agent.execute_step(step["description"])
    shared_memory.add_memory({
        "agent": "RetirementPlanner",
        "task_id": step["subtask_id"],
        "description": step["description"],
        "result": result
    })
    print(f"\nCompleted: {step['description']}")
    print(f"Result: {result[:200]}...")

# Execute education planning steps and store in shared memory
print("\n=== Executing Education Planning Steps ===")
for step in education_planner.prioritize_tasks():
    result = education_agent.execute_step(step["description"])
    shared_memory.add_memory({
        "agent": "EducationPlanner",
        "task_id": step["subtask_id"],
        "description": step["description"],
        "result": result
    })
    print(f"\nCompleted: {step['description']}")
    print(f"Result: {result}")

=== Retirement Planning Analysis ===

Retirement Planning Overview:
{
  "main_objective": "Create a comprehensive retirement planning strategy",
  "required_steps": [
    "Analyze current financial situation",
    "Develop investment strategies"
  ],
  "key_constraints": [
    "Inflation",
    "Risk management"
  ],
  "priority_level": 3
}

Retirement Planning Overview:
{
  "main_objective": "Create a comprehensive retirement planning strategy",
  "required_steps": [
    "Analyze current financial situation",
    "Develop investment strategies"
  ],
  "key_constraints": [
    "Inflation",
    "Risk management"
  ],
  "priority_level": 3
}

Retirement Planning Steps:
- Analyze retirement planning needs (Est. time: 30min)
- Create a comprehensive strategy based on current age, savings, income, and target retirement age (Est. time: 30min)
- Consider inflation impact on retirement planning (Est. time: 30min)
- Develop investment strategies for retirement funds (Est. time: 30min)
- Implemen

In [9]:
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
import numpy as np

# Set template for better-looking plots
import plotly.io as pio
pio.templates.default = "plotly_dark"
pio.renderers.default = "notebook"  # Set default renderer to notebook

In [11]:
# Create visualizations for retirement and education planning

# 1. Retirement Planning Visualization
current_age = user_profile['age']
retirement_age = user_profile['retirement_age']
years_until_retirement = retirement_age - current_age
current_savings = user_profile['current_savings']
annual_income = user_profile['annual_income']

# Assume 6% average annual return and 15% savings rate
annual_return_rate = 0.06
savings_rate = 0.15
annual_savings = annual_income * savings_rate

# Calculate retirement savings projection
years = np.arange(current_age, retirement_age + 1)
savings = []
current = current_savings

for _ in range(len(years)):
    savings.append(current)
    current = current * (1 + annual_return_rate) + annual_savings

# Create retirement savings projection plot with Plotly
fig1 = go.Figure()

fig1.add_trace(go.Scatter(
    x=years,
    y=savings,
    mode='lines+markers',
    name='Projected Savings',
    line=dict(color='#00ff00', width=3),
    marker=dict(size=8, symbol='circle')
))

fig1.update_layout(
    title='Projected Retirement Savings Growth',
    title_x=0.5,
    xaxis_title='Age',
    yaxis_title='Savings ($)',
    showlegend=True,
    hovermode='x unified',
    plot_bgcolor='rgba(0,0,0,0)',
    yaxis=dict(
        tickformat='$,.0f',
        gridcolor='rgba(128,128,128,0.2)',
    ),
    xaxis=dict(
        gridcolor='rgba(128,128,128,0.2)',
    ),
)

# Add annotation for final savings
fig1.add_annotation(
    x=retirement_age,
    y=savings[-1],
    text=f'Projected savings at retirement:<br>${savings[-1]:,.0f}',
    showarrow=True,
    arrowhead=1,
    ax=50,
    ay=-50
)

fig1.show()  # Show first plot

# 2. Education Planning Visualization
children_data = user_profile['children']
current_year = 2025
college_start_age = 18
years_to_college = [(college_start_age - child['age']) for child in children_data]
estimated_annual_college_cost = 35000  # Base cost
college_inflation_rate = 0.05  # 5% annual increase

# Calculate projected college costs for each child
current_costs = [estimated_annual_college_cost * 4] * len(children_data)  # 4 years of college
projected_costs = [estimated_annual_college_cost * 4 * (1 + college_inflation_rate) ** years 
                  for years in years_to_college]

# Create education costs comparison with Plotly
education_df = pd.DataFrame({
    'Child': [f'Child {i+1}<br>(Age: {child["age"]})' for i, child in enumerate(children_data)],
    'Current Cost': current_costs,
    'Projected Cost': projected_costs
})

fig2 = go.Figure()

# Add bars for current costs
fig2.add_trace(go.Bar(
    name='Current Cost',
    x=education_df['Child'],
    y=education_df['Current Cost'],
    marker_color='#3366cc',
    text=education_df['Current Cost'].apply(lambda x: f'${x:,.0f}'),
    textposition='inside',
))

# Add bars for projected costs
fig2.add_trace(go.Bar(
    name='Projected Cost',
    x=education_df['Child'],
    y=education_df['Projected Cost'],
    marker_color='#dc3912',
    text=education_df['Projected Cost'].apply(lambda x: f'${x:,.0f}'),
    textposition='inside',
))

fig2.update_layout(
    title='Projected 4-Year College Costs by Child',
    title_x=0.5,
    xaxis_title='Child',
    yaxis_title='Total Cost ($)',
    barmode='group',
    plot_bgcolor='rgba(0,0,0,0)',
    yaxis=dict(
        tickformat='$,.0f',
        gridcolor='rgba(128,128,128,0.2)',
    ),
)

fig2.show()  # Show second plot

# 3. Monthly Savings Requirements
retirement_monthly = annual_savings / 12
college_monthly = []

for years in years_to_college:
    future_cost = estimated_annual_college_cost * 4 * (1 + college_inflation_rate) ** years
    monthly_required = future_cost / (years * 12) if years > 0 else future_cost / 12
    college_monthly.append(monthly_required)

# Create monthly savings visualization with Plotly
goals = ['Retirement'] + [f'Child {i+1}<br>Education' for i in range(len(children_data))]
monthly_savings = [retirement_monthly] + college_monthly

fig3 = go.Figure()

# Add bars with gradient colors
fig3.add_trace(go.Bar(
    x=goals,
    y=monthly_savings,
    marker=dict(
        color=monthly_savings,
        colorscale='Viridis',
    ),
    text=[f'${x:,.0f}' for x in monthly_savings],
    textposition='inside',
))

fig3.update_layout(
    title='Required Monthly Savings by Financial Goal',
    title_x=0.5,
    xaxis_title='Financial Goal',
    yaxis_title='Monthly Savings Required ($)',
    plot_bgcolor='rgba(0,0,0,0)',
    yaxis=dict(
        tickformat='$,.0f',
        gridcolor='rgba(128,128,128,0.2)',
    ),
    showlegend=False,
)

fig3.show()  # Show third plot

# 4. Combined Monthly Savings Sunburst Chart
total_monthly = sum(monthly_savings)
labels = ['Total'] + goals
parents = [''] + ['Total'] * len(goals)
values = [total_monthly] + monthly_savings

fig4 = go.Figure(go.Sunburst(
    labels=labels,
    parents=parents,
    values=values,
    branchvalues="total",
    textinfo="label+value",
    texttemplate="%{label}<br>${%{value:,.0f}}",
    hovertemplate="Goal: %{label}<br>Monthly Savings: $%{value:,.0f}<extra></extra>"
))

fig4.update_layout(
    title='Monthly Savings Distribution',
    title_x=0.5,
    width=800,
    height=800,
)

fig4.show()  # Show fourth plot