# [STARTER] Udaplay Project

## Part 02 - Agent

In this part of the project, you'll use your VectorDB to be part of your Agent as a tool.

You're building UdaPlay, an AI Research Agent for the video game industry. The agent will:
1. Answer questions using internal knowledge (RAG)
2. Search the web when needed
3. Maintain conversation state
4. Return structured outputs
5. Store useful information for future use

### Setup

In [23]:
# Only needed for Udacity workspace

import importlib.util
import sys

# Check if 'pysqlite3' is available before importing
if importlib.util.find_spec("pysqlite3") is not None:
    import pysqlite3
    sys.modules['sqlite3'] = sys.modules.pop('pysqlite3')

In [24]:
# TODO: Import the necessary libs
# For example: 
import os

from lib.agents import Agent
from lib.llm import LLM
from lib.messages import UserMessage, SystemMessage, ToolMessage, AIMessage
from lib.tooling import tool
from dotenv import load_dotenv
import json
import chromadb
from chromadb.utils import embedding_functions
from tavily import TavilyClient
from datetime import datetime
from lib.vector_db import VectorStoreManager, CorpusLoaderService
from lib.rag import RAG
from pydantic import BaseModel, Field
from lib.parsers import PydanticOutputParser
from typing import TypedDict, Optional, List, Dict, Any, Union
from lib.state_machine import StateMachine, Step, EntryPoint, Termination, Resource
from lib.vector_db import VectorStoreManager
from lib.memory import LongTermMemory, MemoryFragment

In [25]:
# TODO: Load environment variables
load_dotenv()

# OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")

True

### Tools

Build at least 3 tools:
- retrieve_game: To search the vector DB
- evaluate_retrieval: To assess the retrieval performance
- game_web_search: If no good, search the web


#### Retrieve Game Tool

In [32]:
# TODO: Create retrieve_game tool
# It should use chroma client and collection you created
# chroma_client = chromadb.PersistentClient(path="chromadb")
# collection = chroma_client.get_collection("udaplay")
# Tool Docstring:
#    Semantic search: Finds most results in the vector DB
#    args:
#    - query: a question about game industry. 
#
#    You'll receive results as list. Each element contains:
#    - Platform: like Game Boy, Playstation 5, Xbox 360...)
#    - Name: Name of the Game
#    - YearOfRelease: Year when that game was released for that platform
#    - Description: Additional details about the game

chroma_client = chromadb.PersistentClient(path="chromadb")
collection = chroma_client.get_collection("udaplay")

@tool
def retrieve_game(query: str) -> List[Dict]:
    """
    Semantic search: Finds most results in the vector DB
        args:
        - query: a question about game industry. 

        You'll receive results as list. Each element contains:
        - Platform: like Game Boy, Playstation 5, Xbox 360...)
        - Name: Name of the Game
        - YearOfRelease: Year when that game was released for that platform
        - Description: Additional details about the game
    """
    results = collection.query(
        query_texts = [query],
        n_results=5,
        include=["metadatas", "distances"]
    )
    games = []
    metas = results["metadatas"][0]
    dists = results["distances"][0]
    for meta, dist in zip(metas, dists):
        games.append({
            "Platform": meta.get("Platform"),
            "Name": meta.get("Name"),
            "YearOfRelease": meta.get("YearOfRelease"),
            "Description": meta.get("Description"),
            "distance": float(dist),
        })
    return games

In [35]:
print(collection.count())

15


In [33]:
demo_q = "first 3D platformer Mario game"
demo_results = retrieve_game(demo_q)

print("DEMO QUERY:", demo_q)
for i, r in enumerate(demo_results, 1):
    print(f"{i}. {r['Name']} ({r['YearOfRelease']}) [{r['Platform']}]  distance={r['distance']:.4f}")

DEMO QUERY: first 3D platformer Mario game
1. Super Mario 64 (1996) [Nintendo 64]  distance=0.1032
2. Super Mario World (1990) [Super Nintendo Entertainment System (SNES)]  distance=0.1278
3. Mario Kart 8 Deluxe (2017) [Nintendo Switch]  distance=0.1880
4. Super Smash Bros. Melee (2001) [GameCube]  distance=0.1920
5. Pokémon Gold and Silver (1999) [Game Boy Color]  distance=0.2042


#### Evaluate Retrieval Tool

In [None]:
# TODO: Create evaluate_retrieval tool
# You might use an LLM as judge in this tool to evaluate the performance
# You need to prompt that LLM with something like:
# "Your task is to evaluate if the documents are enough to respond the query. "
# "Give a detailed explanation, so it's possible to take an action to accept it or not."
# Use EvaluationReport to parse the result
# Tool Docstring:
#    Based on the user's question and on the list of retrieved documents, 
#    it will analyze the usability of the documents to respond to that question. 
#    args: 
#    - question: original question from user
#    - retrieved_docs: retrieved documents most similar to the user query in the Vector Database
#    The result includes:
#    - useful: whether the documents are useful to answer the question
#    - description: description about the evaluation result


class EvaluationReport(BaseModel):
    useful: bool
    description: str

@tool
def evaluate_retrieval(question: str, retrieved_docs: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Based on the user's question and on the list of retrieved documents, 
    it will analyze the usability of the documents to respond to that question. 
        args: 
        - question: original question from user
        - retrieved_docs: retrieved documents most similar to the user query in the Vector Database
        The result includes:
        - useful: whether the documents are useful to answer the question
        - description: description about the evaluation result
    """
    docs_text = "\n\n".join(
        f"Doc {i+1}:\n"
        f"Name: {d.get('Name')}\n"
        f"Platform: {d.get('Platform')}\n"
        f"YearOfRelease: {d.get('YearOfRelease')}\n"
        f"Description: {d.get('Description')}"
        for i, d in enumerate(retrieved_docs or [])
    )

    prompt = f"""
    You are a strict evaluator for a RAG system.

    Task:
    Decide if the retrieved documents contain enough specific information to answer the question.

    Question: 
    
    {question}
    
    Retrieved Documents: 
    {docs_text}
    
    Return ONLY valid JSON in this schema:
    {{
    "useful": true/false,
    "description": "Explain what key info is present/missing and end with ACCEPT or REJECT"
    }}

    Rules:
    - useful=true ONLY if the docs directly contain the answer (not just vaguely related).
    - useful=false if missing dates/platform/version details or if the docs are irrelevant.
    """
    
    judge = LLM(model="gpt-4o-mini", temperature=0)
    ai_message = judge.invoke(prompt)
    parser = PydanticOutputParser(model_class=EvaluationReport)
    report = parser.parse(ai_message)
    return report.model_dump() 

#### Game Web Search Tool

In [None]:
# TODO: Create game_web_search tool
# Please use Tavily client to search the web
# Tool Docstring:
#    Semantic search: Finds most results in the vector DB
#    args:
#    - question: a question about game industry. 

@tool
def game_web_search(question: str, search_depth: str = "advanced") -> Dict:
    """
    Web search: Uses Tavily to search the web for game industry information
    when internal vector database results are insufficient.
        args:
        - question: a question about game industry.
        - search_depth (str): The depth of the search, either 'basic' or 'advanced (default)'. 
    """
    tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
    search_results = tavily_client.search(
        question, 
        search_depth=search_depth,  
        include_answer=True, 
        include_raw_results=False, 
        include_images=False)
    formatted_results = {
        "answer": search_results.get("answer", ""),
        "results": [
            {
                "title": r.get("title"),
                "url": r.get("url"),
                "content": r.get("content"),
            }
            for r in (search_results.get("results") or [])
        ],
        "search_metadata": {
            "query": question,
            "search_depth": search_depth,
            "timestamp": datetime.now().isoformat()
        }
    }
    return formatted_results

In [29]:
## Long Term Memory Adapter


OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
db_manager = VectorStoreManager(openai_api_key=OPENAI_API_KEY)
ltm = LongTermMemory(db_manager)

class MemoryAdapter:
    """
    Adapter so your agent can do:
      - memory.search(question) -> returns a string or None
      - memory.store(question, answer)
    while internally using LongTermMemory's API.
    """

    def __init__(self, ltm:LongTermMemory, owner: str = "default_agent", namespace: str = "udaplay"):

        self.ltm = ltm
        self.owner = owner
        self.namespace = namespace
    
    def search(self, query: str = None, query_text: str = None, question: str = None, limit: int = 1):
        text = query_text or query or question
        if not text:
            return None

        results = self.ltm.search(
            query_text=text,
            owner=self.owner,
            limit=limit,
            namespace=self.namespace
        )
        if results.fragments:
            return results.fragments[0].content
        return None
    
    def store(self, question: str, answer: str):
        content = f"Q: {question}\nA: {answer}"
        self.ltm.register(MemoryFragment(content=content, owner=self.owner, namespace=self.namespace))

memory = MemoryAdapter(ltm)

### Agent

In [None]:
# TODO: Create your Agent abstraction using StateMachine
# Equip with an appropriate model
# Craft a good set of instructions 
# Plug all Tools you developed
from pdb import run


class AgentState(TypedDict, total=False):
    question: str
    history: List[Dict[str, str]]
    retrieved_docs: List[Dict[str, Any]]
    evaluation: Dict[str, Any] #{useful: bool, description: str}
    web: Dict[str, Any] #Tavily respond
    final_context: str
    answer: str
    memory_hit: Optional[str]
    used_memory: bool



class ResearchAgent:
    def __init__(self, llm: LLM,memory):
        self.llm = llm
        self.workflow = self._create_state_machine()
        self.memory = memory
        self.history = []

    def _retrieve(self, state:AgentState) -> AgentState:
        """Retrieve relevant documents from the vector database """
        docs = retrieve_game(state["question"])
        return {"retrieved_docs": docs}
    
    def _evaluate(self, state:AgentState) -> AgentState:
        """Evaluate the usefulness of the retrieved documents """
        report= evaluate_retrieval(state["question"], state["retrieved_docs"])
        return {"evaluation": report}
    
    def _memory_check(self, state:AgentState) -> AgentState:
        """Check memory for relevant past interactions """
        hit= self.memory.search(state["question"])
        if hit:
            return {
                "memory_hit": hit,
                "used_memory": True
            }
        return {"used_memory": False}
    
    def _web_search(self, state:AgentState) -> AgentState:
        web_info = game_web_search(state["question"])
        return {"web": web_info}
    
    
    def _build_context(self, state:AgentState) -> AgentState:
        # 1) Check if memory hit exists, use it as context
        if state.get("used_memory") and state.get("memory_hit"):
            return {"final_context": state["memory_hit"]}
        
        # 2) If retrieved docs are useful, use DB docs as context

        if state.get("evaluation", {}).get("useful"):
            context = "\n\n".join(
                f"{d.get('Name')} ({d.get('YearOfRelease')}) [{d.get('Platform')}]: {d.get('Description')}"
                for d in state.get("retrieved_docs", [])
            )
            return {"final_context": context}
        
        # 3) Otherwise, use web search results as context
        parts = [state.get("web", {}).get("answer", "")]
        for r in state.get("web", {}).get("results", [])[:3]:
            parts.append(
                f"Title: {r.get('title')}\n"
                f"URL: {r.get('url')}\n"
                f"Snippet: {r.get('content', '')}"
            )
        return {"final_context": "\n\n".join(parts)}
    
    def _answer(self, state:AgentState) -> AgentState:

        history_text = ""
        for turn in state.get("history", [])[-6:]:  # last 3 exchanges
            history_text += f"{turn['role'].upper()}: {turn['content']}\n"
        message = [
            SystemMessage(content=(
                "You are a game industry research assistant. "
                "Use the conversation history ONLY if it helps resolve references. "
                "Answer using ONLY the provided context for factual claims. "
                "If URLs are present in the context, cite them as (Source: <url>)."
            )),
           UserMessage(content=(
                f"Conversation history:\n{history_text}\n"
                f"Question: {state['question']}\n\n"
                f"Context:\n{state.get('final_context','')}\n\nAnswer:"
            ))
        ]
        ai = self.llm.invoke(message)
        return {"answer": ai.content}
    
    def _store_memory(self, state:AgentState) -> AgentState:
         # store ONLY when we used web fallback (simple rule)
        if state.get("web") and state.get("answer"):
            self.memory.store(question=state["question"], answer=state["answer"])
        return {}
    
    def _create_state_machine(self) -> StateMachine[AgentState]:
        """Create the internal state machine for the agent"""
        machine = StateMachine[AgentState](AgentState)
        
        #Steps
        entry = EntryPoint[AgentState]()
        retrieve = Step[AgentState]("retrieve", self._retrieve)
        evaluate = Step[AgentState]("evaluate", self._evaluate)
        web_search = Step[AgentState]("web_search", self._web_search)
        memory_check = Step[AgentState]("memory_check", self._memory_check)
        context = Step[AgentState]("context", self._build_context)
        answer = Step[AgentState]("answer", self._answer)
        store_memory = Step[AgentState]("store_memory", self._store_memory)
        termination = Termination[AgentState]()

        machine.add_steps([
            entry,
            retrieve,
            evaluate,
            web_search,
            memory_check,
            context,
            answer,
            store_memory,
            termination,
            
        ])
        #Transitions

        machine.connect(entry, retrieve)
        machine.connect(retrieve, evaluate)

        def route_after_eval(state: AgentState) -> Union[Step[AgentState], str]: 
            """After evaluate: if useful -> context else -> memory_check"""
            if state.get("evaluation", {}).get("useful"):
                return context
            return memory_check
        machine.connect(evaluate,[memory_check, context], route_after_eval)

        def route_after_memory(state: AgentState) -> Union[Step[AgentState], str]: 
            """After memory check: if used_memory -> context else -> web_search"""
            if state.get("used_memory"):
                return context
            return web_search
        
        machine.connect(memory_check, [context, web_search], route_after_memory)
        machine.connect(web_search, context)
        machine.connect(context, answer)

        def route_after_answer(state: AgentState) -> Union[Step[AgentState], str]: 
            """After answer: if web used -> store_memory else -> termination"""
            if state.get("web"):
                return store_memory
            return termination
        machine.connect(answer, [store_memory, termination], route_after_answer)
        machine.connect(store_memory, termination)

        return machine
    
    def invoke(self, question: str):
        initial_state: AgentState = {"question": question, "history": self.history}
        run_object = self.workflow.run(initial_state)
        final_state=  run_object.get_final_state()

        if final_state and final_state.get("answer"):
            self.history.append({"role": "user", "content": question})
            self.history.append({"role": "assistant", "content": final_state["answer"]})
        return run_object

In [None]:
# TODO: Invoke your agent
# - When Pokémon Gold and Silver was released?
# - Which one was the first 3D platformer Mario game?
# - Was Mortal Kombat X realeased for Playstation 5?

research_agent = ResearchAgent(llm=LLM(model="gpt-4o-mini", temperature=0), memory=memory)
questions = [
    "When Pokémon Gold and Silver was released?",
    "Which one was the first 3D platformer Mario game?",
    "Was Mortal Kombat X realeased for Playstation 5?"
]
for q in questions:
    run = research_agent.invoke(q)
    final_state = run.get_final_state()
    print(f"Question: {q}\nAnswer: {final_state.get('answer')}\n{'-'*50}\n")   

#For debugging purposes, you can check the final state
# to see which paths were taken

    #print("useful:", final_state.get("evaluation", {}).get("useful"))
    #print("used_web:", "web" in final_state)

[StateMachine] Starting: __entry__
[StateMachine] Executing step: retrieve
[StateMachine] Executing step: evaluate
[StateMachine] Executing step: context
[StateMachine] Executing step: answer
[StateMachine] Terminating: __termination__
Question: When Pokémon Gold and Silver was released?
Answer: Pokémon Gold and Silver was released in 1999.
--------------------------------------------------

[StateMachine] Starting: __entry__
[StateMachine] Executing step: retrieve
[StateMachine] Executing step: evaluate
[StateMachine] Executing step: context
[StateMachine] Executing step: answer
[StateMachine] Terminating: __termination__
Question: Which one was the first 3D platformer Mario game?
Answer: The first 3D platformer Mario game is Super Mario 64, released in 1996.
--------------------------------------------------

[StateMachine] Starting: __entry__
[StateMachine] Executing step: retrieve
[StateMachine] Executing step: evaluate
[StateMachine] Executing step: memory_check
[StateMachine] Exe

### (Optional) Advanced

In [None]:
# TODO: Update your agent with long-term memory
# TODO: Convert the agent to be a state machine, with the tools being pre-defined nodes

# I implement everything above