# [STARTER] Udaplay Project

## Part 02 - Agent

In this part of the project, you'll use your VectorDB to be part of your Agent as a tool.

You're building UdaPlay, an AI Research Agent for the video game industry. The agent will:
1. Answer questions using internal knowledge (RAG)
2. Search the web when needed
3. Maintain conversation state
4. Return structured outputs
5. Store useful information for future use

### Setup

In [26]:
# Only needed for Udacity workspace

import importlib.util
import sys

# Check if 'pysqlite3' is available before importing
if importlib.util.find_spec("pysqlite3") is not None:
    import pysqlite3
    sys.modules['sqlite3'] = sys.modules.pop('pysqlite3')

In [27]:
# TODO: Import the necessary libs
# For example: 
import os
from typing import TypedDict, List, Optional, Union
from lib.agents import Agent
from lib.llm import LLM
from lib.messages import UserMessage, SystemMessage, ToolMessage, AIMessage
from lib.tooling import tool
from dotenv import load_dotenv
from lib.state_machine import Run
from lib.vector_db import VectorStoreManager, CorpusLoaderService
import chromadb
from chromadb.utils import embedding_functions
from lib.rag import RAG
from lib.tooling import Tool, ToolCall, tool
import json
from datetime import datetime
from typing import List, Dict
from dotenv import load_dotenv
from tavily import TavilyClient

In [28]:
# TODO: Load environment variables
load_dotenv()

OPENAI_API_KEY = os.getenv("CHROMA_OPENAI_API_KEY")
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")

In [29]:
db = VectorStoreManager(OPENAI_API_KEY)
loader_service = CorpusLoaderService(db)
rag_llm = LLM(
    model="gpt-4o-mini",
    temperature=0.3,
)
store_name="gamesrepo"
vector_store = loader_service.load_pdf(
       store_name,
       pdf_path="TheGamingIndustry2024.pdf"
)
games_market_rag = RAG(
    rag_llm,
    vector_store
)

VectorStore `gamesrepo` ready!
Pages from `TheGamingIndustry2024.pdf` added!


### Tools

Build at least 3 tools:
- retrieve_game: To search the vector DB
- evaluate_retrieval: To assess the retrieval performance
- game_web_search: If no good, search the web


#### Retrieve Game Tool

In [30]:
# TODO: Create retrieve_game tool
# It should use chroma client and collection you created
@tool
def retrieve_game(game_name:str, collection: str, path: str) -> str:
    path = "gamestoredb"
    collectionname = "gamesrepo"
    #game_name = "Super Mario"
    chroma_client = chromadb.PersistentClient(path)
    collection = chroma_client.get_collection("gamesrepo")
    user_query = f"List the details of `{game_name}`"
    print(f"`{user_query}`")
    result:Run = games_market_rag.invoke(user_query)
    return (result.get_final_state()["answer"])

#### Evaluate Retrieval Tool

In [31]:
# TODO: Create evaluate_retrieval tool
# You might use an LLM as judge in this tool to evaluate the performance
# You need to prompt that LLM with something like:
# "Your task is to evaluate if the documents are enough to respond the query. "
# "Give a detailed explanation, so it's possible to take an action to accept it or not."
# Use EvaluationReport to parse the result
# Tool Docstring:
#    Based on the user's question and on the list of retrieved documents, 
#    it will analyze the usability of the documents to respond to that question. 
#    args: 
#    - question: original question from user
#    - retrieved_docs: retrieved documents most similar to the user query in the Vector Database
#    The result includes:
#    - useful: whether the documents are useful to answer the question
#    - description: description about the evaluation result
@tool
def evaluate_retrieval(game_name:str, collection: str, path: str) -> str:
 retrieve_game("Need For Speed","gamesrepo","gamestoredb")
# Tool Docstring:
#    Semantic search: Finds most results in the vector DB
#    args:
#    - query: a question about game industry. 
#
#    You'll receive results as list. Each element contains:
#    - Platform: like Game Boy, Playstation 5, Xbox 360...)
#    - Name: Name of the Game
#    - YearOfRelease: Year when that game was released for that platform
#    - Description: Additional details about the game



#### Game Web Search Tool

In [32]:
# TODO: Create game_web_search tool
# Please use Tavily client to search the web
# Tool Docstring:
#    Semantic search: Finds most results in the vector DB
#    args:
#    - question: a question about game industry. 
@tool
def game_web_search(query:str) -> str:
    client = TavilyClient(api_key=TAVILY_API_KEY)
    search_depth = "advanced"
    search_result = client.search(
        query=query,
        search_depth=search_depth,
        include_answer=True,
        include_raw_content=False,
        include_images=False
    )
    formatted_results = {
        "answer": search_result.get("answer", ""),
        "results": search_result.get("results", []),
        "search_metadata": {
            "timestamp": datetime.now().isoformat(),
            "query": query
        }
    }
    return formatted_results

print(game_web_search(query="Provide me details of the game Max Payne"))

{'answer': 'Max Payne is a 2001 third-person shooter developed by Remedy Entertainment. It follows NYPD detective Max Payne seeking revenge on drug dealers. The game features a complex narrative and intense combat.', 'results': [{'url': 'https://en.wikipedia.org/wiki/Max_Payne_(video_game)', 'title': 'Max Payne (video game) - Wikipedia', 'content': 'Max Payne is a 2001 third-person shooter game developed by Remedy Entertainment. It was originally released for Windows by Gathering of Developers in July 2001. The game centers on former NYPD detective Max Payne "Max Payne (character)"), who attempts to solve the murder of his family while investigating a mysterious new designer drug called "Valkyr". While doing so, Max becomes entangled in a large and complex conspiracy involving a major pharmaceutical company, organized crime, a secret [...] Max Payne is played from a third-person perspective "Third-person (video games)"), in which players assume the role of the game\'s titular character

### Agent

In [33]:
# class GameQueryAgent:
#     """An AI Agent that can use tools to help answer questions"""
    
#     def __init__(
#         self,
#         role: str = "Game Query Assistant",
#         instructions: str = "Help users with any question",
#         model: str = "gpt-4o-mini",
#         temperature: float = 0.2,
#         tools: List[Tool] = [retrieve_game,evaluate_retrieval,game_web_search]
#     ):
#         """Initialize the agent with its configuration and tools
        
#         Args:
#             role: The agent's role/persona
#             instructions: Basic instructions for the agent
#             model: The LLM model to use
#             temperature: Creativity parameter (0.0 = more 'deterministic')
#             tools: List of tools the agent can use
#         """
#         self.model = model
#         self.role = role
#         self.instructions = instructions
#         self.tools = tools

#         # Load environment variables (e.g. API keys)
#         load_dotenv()
        
#         # Initialize the LLM with tools if provided
#         self.llm = LLM(
#             model=model,
#             temperature=temperature,
#             tools=tools,
#         )

#     def invoke(self, user_message: str) -> str:
#         """Process a user message and return a response
        
#         Args:
#             user_message: The user's input message
            
#         Returns:
#             The agent's response after processing tools if needed
#         """
#         messages = [
#             SystemMessage(
#                 content=(
#                     f"You're an AI Agent and your role is {self.role}. "  
#                     f"Your instructions: {self.instructions}"
#                 )
#             )
#         ]
#         # Add user message to conversation
#         messages.append(UserMessage(content=user_message))
        
#         # Get AI response and add to conversation
#         ai_message = self.llm.invoke(messages)
#         messages.append(ai_message)

#         # Check if tools were required
#         while ai_message.tool_calls:
#             # Process each tool call                    
#             for call in ai_message.tool_calls:
#                 # Access tool call data correctly
#                 function_name = call.function.name
#                 function_args = json.loads(call.function.arguments)
#                 tool_call_id = call.id
#                 # Find the matching tool
#                 tool = next((t for t in self.tools if t.name == function_name), None)
#                 if tool:
#                     result = tool(**function_args)
#                     messages.append(
#                         ToolMessage(
#                             content=json.dumps(result), 
#                             tool_call_id=tool_call_id, 
#                             name=function_name, 
#                         )
#                     )
                
#             # Get final AI response after tool usage and add to conversation
#             ai_message = self.llm.invoke(messages)
#             messages.append(ai_message)

#         for m in messages:
#             print(m)
#         return ai_message.content

from typing import TypedDict, List, Optional, Union, TypeVar
import json

from lib.state_machine import StateMachine, Step, EntryPoint, Termination, Run
from lib.llm import LLM
from lib.messages import AIMessage, UserMessage, SystemMessage, ToolMessage
from lib.tooling import Tool, ToolCall
from lib.memory import ShortTermMemory
class AgentState(TypedDict):
    user_query: str  # The current user query being processed
    instructions: str  # System instructions for the agent
    messages: List[dict]  # List of conversation messages
    current_tool_calls: Optional[List[ToolCall]]  # Current pending tool calls
    total_tokens: int  # Track the cumulative total

class GameAgent:
    def __init__(self, 
                 model_name: str,
                 instructions: str, 
                 tools: List[Tool] = None,
                 temperature: float = 0.7):
        """
        Initialize an Agent
        
        Args:
            model_name: Name/identifier of the LLM model to use
            instructions: System instructions for the agent
            tools: Optional list of tools available to the agent
            temperature: Temperature parameter for LLM (default: 0.7)
        """
        self.instructions = instructions
        self.tools = tools if tools else []
        self.model_name = model_name
        self.temperature = temperature
        
        # Initialize memory and state machine
        self.memory = ShortTermMemory()
        self.workflow = self._create_state_machine()

    def _prepare_messages_step(self, state: AgentState) -> AgentState:
        """Step logic: Prepare messages for LLM consumption"""
        messages = state.get("messages", [])
        
        # If no messages exist, start with system message
        if not messages:
            messages = [SystemMessage(content=state["instructions"])]
            
        # Add the new user message
        messages.append(UserMessage(content=state["user_query"]))
        
        return {
            "messages": messages,
            "session_id": state["session_id"]
        }

    def _llm_step(self, state: AgentState) -> AgentState:
        """Step logic: Process the current state through the LLM"""
        # Initialize LLM
        llm = LLM(
            model=self.model_name,
            temperature=self.temperature,
            tools=self.tools
        )

        response = llm.invoke(state["messages"])
        tool_calls = response.tool_calls if response.tool_calls else None

        current_total = state.get("total_tokens", 0)
        if response.token_usage:
            current_total += response.token_usage.total_tokens

        # Create AI message with content and tool calls
        ai_message = AIMessage(
            content=response.content, 
            tool_calls=tool_calls,
        )

        return {
            "messages": state["messages"] + [ai_message],
            "current_tool_calls": tool_calls,
            "session_id": state["session_id"],
            "total_tokens": current_total,
        }

    def _tool_step(self, state: AgentState) -> AgentState:
        """Step logic: Execute any pending tool calls"""
        tool_calls = state["current_tool_calls"] or []
        tool_messages = []
        
        for call in tool_calls:
            # Access tool call data correctly
            function_name = call.function.name
            function_args = json.loads(call.function.arguments)
            tool_call_id = call.id
            # Find the matching tool
            tool = next((t for t in self.tools if t.name == function_name), None)
            if tool:
                result = str(tool(**function_args))
                tool_message = ToolMessage(
                    content=json.dumps(result), 
                    tool_call_id=tool_call_id, 
                    name=function_name, 
                )
                tool_messages.append(tool_message)
        
        # Clear tool calls and add results to messages
        return {
            "messages": state["messages"] + tool_messages,
            "current_tool_calls": None,
            "session_id": state["session_id"]
        }

    def _create_state_machine(self) -> StateMachine[AgentState]:
        """Create the internal state machine for the agent"""
        machine = StateMachine[AgentState](AgentState)
        
        # Create steps
        entry = EntryPoint[AgentState]()
        message_prep = Step[AgentState]("message_prep", self._prepare_messages_step)
        llm_processor = Step[AgentState]("llm_processor", self._llm_step)
        tool_executor = Step[AgentState]("tool_executor", self._tool_step)
        termination = Termination[AgentState]()
        
        machine.add_steps([entry, message_prep, llm_processor, tool_executor, termination])
        
        # Add transitions
        machine.connect(entry, message_prep)
        machine.connect(message_prep, llm_processor)
        
        # Transition based on whether there are tool calls
        def check_tool_calls(state: AgentState) -> Union[Step[AgentState], str]:
            """Transition logic: Check if there are tool calls"""
            if state.get("current_tool_calls"):
                return tool_executor
            return termination
        
        machine.connect(llm_processor, [tool_executor, termination], check_tool_calls)
        machine.connect(tool_executor, llm_processor)  # Go back to llm after tool execution
        
        return machine

    def invoke(self, query: str, session_id: Optional[str] = None) -> Run:
        """
        Run the agent on a query
        
        Args:
            query: The user's query to process
            session_id: Optional session identifier (uses "default" if None)
            
        Returns:
            The final run object after processing
        """
        session_id = session_id or "default"

        # Create session if it doesn't exist
        self.memory.create_session(session_id)

        # Get previous messages from last run if available
        previous_messages = []
        last_run: Run = self.memory.get_last_object(session_id)
        if last_run:
            last_state = last_run.get_final_state()
            if last_state:
                previous_messages = last_state["messages"]

        initial_state: AgentState = {
            "user_query": query,
            "instructions": self.instructions,
            "messages": previous_messages,
            "current_tool_calls": None,
            "session_id": session_id,
        }

        run_object = self.workflow.run(initial_state)
        
        # Store the complete run object in memory
        self.memory.add(run_object, session_id)
        
        return run_object

    def get_session_runs(self, session_id: Optional[str] = None) -> List[Run]:
        """Get all Run objects for a session
        
        Args:
            session_id: Optional session ID (uses "default" if None)
            
        Returns:
            List of Run objects in the session
        """
        return self.memory.get_all_objects(session_id)

    def reset_session(self, session_id: Optional[str] = None):
        """Reset memory for a specific session
        
        Args:
            session_id: Optional session to reset (uses "default" if None)
        """
        self.memory.reset(session_id)

In [34]:
# TODO: Invoke your agent
# - When Pokémon Gold and Silver was released?
# - Which one was the first 3D platformer Mario game?
# - Was Mortal Kombat X realeased for Playstation 5?
tools: List[Tool] = [retrieve_game,evaluate_retrieval,game_web_search]
agent = GameAgent(
    model_name="gpt-4o-mini",
    instructions=(
        "You are an assistant that can help with:\n"
        "1. Getting details of the game Super Mario\n"
        "Use the available tools to help answer questions about these topics.\n"
        "Maintain context across conversations within the same session."
    ),
    tools=tools
)
session_id = "project"
run1 = agent.invoke(
    query="Provide me the details of the game  Need for Speed ?"
)

print("\nMessages from run 1:")
messages = run1.get_final_state()["messages"]
print(messages)

[StateMachine] Starting: __entry__
[StateMachine] Executing step: message_prep
[StateMachine] Executing step: llm_processor
`List the details of `Need for Speed``
[StateMachine] Starting: __entry__
[StateMachine] Executing step: retrieve
[StateMachine] Executing step: augment
[StateMachine] Executing step: generate
[StateMachine] Terminating: __termination__
[StateMachine] Executing step: tool_executor
[StateMachine] Executing step: llm_processor
[StateMachine] Executing step: tool_executor
[StateMachine] Executing step: llm_processor
[StateMachine] Terminating: __termination__

Messages from run 1:
[SystemMessage(role='system', content='You are an assistant that can help with:\n1. Getting details of the game Super Mario\nUse the available tools to help answer questions about these topics.\nMaintain context across conversations within the same session.'), UserMessage(role='user', content='Provide me the details of the game  Need for Speed ?'), AIMessage(role='assistant', content=None, 

### (Optional) Advanced

In [None]:
# TODO: Update your agent with long-term memory
# TODO: Convert the agent to be a state machine, with the tools being pre-defined nodes