# [STARTER] Udaplay Project

## Part 02 - Agent

In this part of the project, you'll use your VectorDB to be part of your Agent as a tool.

You're building UdaPlay, an AI Research Agent for the video game industry. The agent will:
1. Answer questions using internal knowledge (RAG)
2. Search the web when needed
3. Maintain conversation state
4. Return structured outputs
5. Store useful information for future use

### Setup

In [None]:
# # Only needed for Udacity workspace

# import importlib.util
# import sys

# # Check if 'pysqlite3' is available before importing
# if importlib.util.find_spec("pysqlite3") is not None:
#     import pysqlite3
#     sys.modules['sqlite3'] = sys.modules.pop('pysqlite3')

In [6]:
!pip install tavily-python pdfplumber

Defaulting to user installation because normal site-packages is not writeable
Collecting pdfplumber
  Downloading pdfplumber-0.11.8-py3-none-any.whl.metadata (43 kB)
Collecting pdfminer.six==20251107 (from pdfplumber)
  Downloading pdfminer_six-20251107-py3-none-any.whl.metadata (4.2 kB)
Collecting pypdfium2>=4.18.0 (from pdfplumber)
  Downloading pypdfium2-5.0.0-py3-none-win_amd64.whl.metadata (67 kB)
Collecting cryptography>=36.0.0 (from pdfminer.six==20251107->pdfplumber)
  Downloading cryptography-46.0.3-cp311-abi3-win_amd64.whl.metadata (5.7 kB)
Collecting cffi>=2.0.0 (from cryptography>=36.0.0->pdfminer.six==20251107->pdfplumber)
  Downloading cffi-2.0.0-cp312-cp312-win_amd64.whl.metadata (2.6 kB)
Collecting pycparser (from cffi>=2.0.0->cryptography>=36.0.0->pdfminer.six==20251107->pdfplumber)
  Downloading pycparser-2.23-py3-none-any.whl.metadata (993 bytes)
Downloading pdfplumber-0.11.8-py3-none-any.whl (60 kB)
Downloading pdfminer_six-20251107-py3-none-any.whl (5.6 MB)
   ----


[notice] A new release of pip is available: 24.3.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [36]:
import os
import json
import uuid
from pathlib import Path
from datetime import datetime
from typing import Any, Dict, List

import chromadb
from chromadb.utils import embedding_functions
from dotenv import load_dotenv
from tavily import TavilyClient
from typing_extensions import TypedDict

from lib.vector_db import VectorStore
from lib.state_machine import StateMachine, Step, EntryPoint, Termination, Run
from lib.llm import LLM
from lib.messages import SystemMessage, UserMessage
from lib.tooling import tool, Tool


In [37]:
ENV_PATH = Path(".env")

if not ENV_PATH.exists():
    raise FileNotFoundError(
        f"Expected environment file at {ENV_PATH.resolve()} with API keys."
    )

load_dotenv(ENV_PATH)

required_keys = ["OPENAI_API_KEY", "TAVILY_API_KEY"]
missing = [key for key in required_keys if not os.getenv(key)]
if missing:
    raise EnvironmentError(
        f"Missing required environment variables: {', '.join(missing)}"
    )

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
CHROMA_OPENAI_API_KEY = os.getenv("CHROMA_OPENAI_API_KEY") or OPENAI_API_KEY
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "https://openai.vocareum.com/v1")

if not CHROMA_OPENAI_API_KEY:
    raise EnvironmentError(
        "Either CHROMA_OPENAI_API_KEY or OPENAI_API_KEY must be defined."
    )

print("Environment loaded. Keys detected for OpenAI/Tavily.")


Environment loaded. Keys detected for OpenAI/Tavily.


In [38]:
class PersistentMemoryStore:
    def __init__(self, path: Path, embedding_function):
        self.path = path
        self.path.mkdir(parents=True, exist_ok=True)
        self.client = chromadb.PersistentClient(path=str(self.path))
        self.collection = self.client.get_or_create_collection(
            name="udaplay_long_term_memory",
            embedding_function=embedding_function,
            metadata={"description": "Web knowledge captured by UdaPlay"},
        )

    def add(self, content: str, source: str, metadata: Dict[str, Any] | None = None) -> Dict[str, Any]:
        doc_id = str(uuid.uuid4())
        payload = {
            "memory_id": doc_id,
            "source": source,
            "captured_at": datetime.utcnow().isoformat(),
        }
        if metadata:
            payload.update(metadata)

        self.collection.add(ids=[doc_id], documents=[content], metadatas=[payload])
        return payload

    def search(self, query: str, limit: int = 3) -> List[Dict[str, Any]]:
        if self.collection.count() == 0:
            return []

        results = self.collection.query(
            query_texts=[query],
            n_results=limit,
            include=["documents", "metadatas", "distances"],
        )
        documents = results.get("documents", [[]])[0]
        metadatas = results.get("metadatas", [[]])[0]
        distances = results.get("distances", [[]])[0]
        ids = results.get("ids", [[]])[0]

        matches = []
        for doc_id, doc, meta, distance in zip(ids, documents, metadatas, distances):
            matches.append(
                {
                    "id": doc_id,
                    "content": doc,
                    "metadata": meta,
                    "similarity": 1 - distance if distance is not None else None,
                    "source": meta.get("source", "memory"),
                }
            )
        return matches


CHROMA_DB_PATH = Path("chromadb")
MEMORY_DB_PATH = CHROMA_DB_PATH / "memory_store"
CHROMA_DB_PATH.mkdir(exist_ok=True)
MEMORY_DB_PATH.mkdir(parents=True, exist_ok=True)

EMBEDDING_MODEL = "text-embedding-3-small"
vector_embedding_fn = embedding_functions.OpenAIEmbeddingFunction(
    api_key=CHROMA_OPENAI_API_KEY,
    model_name=EMBEDDING_MODEL,
    api_base=OPENAI_BASE_URL,
)
memory_embedding_fn = embedding_functions.OpenAIEmbeddingFunction(
    api_key=CHROMA_OPENAI_API_KEY,
    model_name=EMBEDDING_MODEL,
    api_base=OPENAI_BASE_URL,
)

chroma_client = chromadb.PersistentClient(path=str(CHROMA_DB_PATH))
collection = chroma_client.get_or_create_collection(
    name="udaplay_games",
    embedding_function=vector_embedding_fn,
    metadata={"description": "UdaPlay core knowledge base"},
)

vector_store = VectorStore(collection)
memory_store = PersistentMemoryStore(MEMORY_DB_PATH, memory_embedding_fn)

tavily_client = TavilyClient(api_key=TAVILY_API_KEY)

answer_llm = LLM(model="gpt-4o-mini", temperature=0.2, api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL)
evaluation_llm = LLM(model="gpt-4o-mini", temperature=0.0, api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL)

print(f"Vector store ready with {collection.count()} documents.")
print("Long-term memory namespace initialized.")


Vector store ready with 22 documents.
Long-term memory namespace initialized.


### Tools

Build at least 3 tools:
- retrieve_game: To search the vector DB
- evaluate_retrieval: To assess the retrieval performance
- game_web_search: If no good, search the web


#### Retrieve Game Tool

In [39]:
@tool(name="retrieve_game", description="Search the local knowledge base for video game facts.")
def retrieve_game(question: str, limit: int = 4) -> Dict[str, Any]:
    results = vector_store.query(query_texts=[question], n_results=limit)
    documents = results.get("documents", [[]])[0]
    metadatas = results.get("metadatas", [[]])[0]
    distances = results.get("distances", [[]])[0]

    matches = []
    for idx, (doc, meta, distance) in enumerate(zip(documents, metadatas, distances), start=1):
        matches.append(
            {
                "rank": idx,
                "id": meta.get("Name", f"doc_{idx}"),
                "document": doc,
                "metadata": meta,
                "distance": distance,
                "similarity": 1 - distance if distance is not None else None,
            }
        )

    return {
        "query": question,
        "matches": matches,
        "source": "vector_db",
    }


#### Evaluate Retrieval Tool

In [40]:
@tool(name="evaluate_retrieval", description="Score the quality of retrieved documents and decide on fallback needs.")
def evaluate_retrieval(question: str, matches: List[Dict[str, Any]]) -> Dict[str, Any]:
    if not matches:
        return {
            "summary": "No local matches were returned.",
            "confidence": 0.0,
            "needs_web_search": True,
            "supporting_fact_ids": [],
        }

    context_lines = [
        f"{item['rank']}. {item['metadata'].get('Name')} ("
        f"{item['metadata'].get('Platform')}) - {item['metadata'].get('Publisher')}"
        f" | {item['document']}"
        for item in matches
    ]

    evaluation_prompt = (
        "You are the quality gate for a retrieval pipeline. "
        "Return valid JSON with fields summary (string), confidence (0-1 float), "
        "needs_web_search (boolean), and supporting_fact_ids (list of integers referencing the provided context order). "
        "Choose needs_web_search=True when the question is unanswered or context is missing critical facts."
    )

    messages = [
        SystemMessage(content=evaluation_prompt),
        UserMessage(
            content=json.dumps(
                {"question": question, "context": context_lines},
                ensure_ascii=False,
                indent=2,
            )
        ),
    ]

    ai_message = evaluation_llm.invoke(messages)

    try:
        evaluation = json.loads(ai_message.content)
    except json.JSONDecodeError:
        evaluation = {
            "summary": ai_message.content,
            "confidence": max(matches[0].get("similarity") or 0.4, 0.4),
            "needs_web_search": True,
            "supporting_fact_ids": [],
        }

    evaluation["confidence"] = float(evaluation.get("confidence", 0.4))
    evaluation["needs_web_search"] = bool(evaluation.get("needs_web_search", True))
    evaluation.setdefault("supporting_fact_ids", [])
    return evaluation


In [41]:
@tool(name="game_web_search", description="Use Tavily to gather fresh information when local knowledge is insufficient.")
def game_web_search(question: str, max_results: int = 4) -> Dict[str, Any]:
    try:
        response = tavily_client.search(
            query=question,
            max_results=max_results,
            include_answer=True,
            include_domains=True,
            search_depth="basic",
        )
    except Exception as exc:
        return {
            "query": question,
            "error": str(exc),
            "results": [],
        }

    simplified_results = [
        {
            "title": item.get("title"),
            "url": item.get("url"),
            "content": item.get("content"),
            "score": item.get("score"),
        }
        for item in response.get("results", [])
    ]

    return {
        "query": question,
        "answer": response.get("answer"),
        "results": simplified_results,
    }


### Agent

In [42]:
class GameAgentState(TypedDict, total=False):
    question: str
    retrieval: Dict[str, Any]
    evaluation: Dict[str, Any]
    needs_web_search: bool
    web_results: Dict[str, Any]
    memory_context: List[Dict[str, Any]]
    answer: str
    citations: List[Dict[str, str]]
    confidence: float
    history: List[Dict[str, Any]]


class GameResearchAgent:
    def __init__(
        self,
        retrieve_tool: Tool,
        evaluate_tool: Tool,
        web_tool: Tool,
        answer_llm: LLM,
        memory_store: PersistentMemoryStore,
    ):
        self.retrieve_tool = retrieve_tool
        self.evaluate_tool = evaluate_tool
        self.web_tool = web_tool
        self.answer_llm = answer_llm
        self.memory_store = memory_store
        self.history: List[Dict[str, Any]] = []
        self.traces: List[Run] = []
        self.workflow = self._build_workflow()

    def _build_workflow(self) -> StateMachine[GameAgentState]:
        machine = StateMachine(GameAgentState)
        entry = EntryPoint[GameAgentState]()
        retrieve = Step[GameAgentState]("retrieve", self._retrieve_step)
        evaluate = Step[GameAgentState]("evaluate", self._evaluate_step)
        web_search = Step[GameAgentState]("web_search", self._web_search_step)
        finalize = Step[GameAgentState]("finalize", self._finalize_step)
        termination = Termination[GameAgentState]()

        machine.add_steps([entry, retrieve, evaluate, web_search, finalize, termination])
        machine.connect(entry, retrieve)
        machine.connect(retrieve, evaluate)

        def route(state: GameAgentState):
            return web_search if state.get("needs_web_search") else finalize

        machine.connect(evaluate, [web_search, finalize], route)
        machine.connect(web_search, finalize)
        machine.connect(finalize, termination)
        return machine

    def _retrieve_step(self, state: GameAgentState) -> GameAgentState:
        question = state["question"]
        retrieval = self.retrieve_tool(question=question)
        memory_matches = self.memory_store.search(question)
        return {
            "retrieval": retrieval,
            "memory_context": memory_matches,
        }

    def _evaluate_step(self, state: GameAgentState) -> GameAgentState:
        matches = state.get("retrieval", {}).get("matches", [])
        evaluation = self.evaluate_tool(question=state["question"], matches=matches)
        return {
            "evaluation": evaluation,
            "needs_web_search": evaluation.get("needs_web_search", False),
        }

    def _web_search_step(self, state: GameAgentState) -> GameAgentState:
        question = state["question"]
        web_results = self.web_tool(question=question, max_results=4)

        for item in web_results.get("results", []):
            snippet = item.get("content")
            if not snippet:
                continue
            self.memory_store.add(
                content=snippet,
                source=item.get("url", "web"),
                metadata={
                    "title": item.get("title"),
                    "origin": "tavily",
                    "question": question,
                },
            )

        return {"web_results": web_results}

    def _finalize_step(self, state: GameAgentState) -> GameAgentState:
        question = state["question"]
        matches = state.get("retrieval", {}).get("matches", [])
        memory_matches = state.get("memory_context", [])
        web_results = state.get("web_results", {})
        evaluation = state.get("evaluation", {})

        context_segments: List[str] = []
        citation_sources: List[str] = []

        for match in matches:
            meta = match.get("metadata", {})
            citation = meta.get("Name") or match.get("id")
            snippet = (
                f"Local | {meta.get('Name')} ({meta.get('Platform')}, {meta.get('YearOfRelease')}) "
                f"- Publisher: {meta.get('Publisher')} | {match.get('document')}"
            )
            context_segments.append(snippet)
            citation_sources.append(citation)

        for memory_item in memory_matches:
            citation_sources.append(memory_item.get("source"))
            context_segments.append(
                f"Memory | Source: {memory_item.get('source')} | {memory_item.get('content')}"
            )

        for item in web_results.get("results", []):
            context_segments.append(
                f"Web | {item.get('title')} ({item.get('url')}): {item.get('content')}"
            )
            citation_sources.append(item.get("url"))

        if web_results.get("answer"):
            context_segments.append(f"Web Summary | {web_results['answer']}")

        if not context_segments:
            context_segments.append("No supporting context was available.")

        unique_sources: List[str] = []
        for source in citation_sources:
            if source and source not in unique_sources:
                unique_sources.append(source)

        source_catalog = (
            "\\n".join(
                [f"[{idx}] {source}" for idx, source in enumerate(unique_sources, start=1)]
            )
            if unique_sources
            else "[1] No supporting sources captured"
        )

        recent_history = self.history[-3:]
        history_text = (
            "\\n".join(
                [f"Q: {item['question']}\\nA: {item['answer']}" for item in recent_history]
            )
            if recent_history
            else "No previous turns."
        )

        payload = {
            "question": question,
            "context": "\\n\\n".join(context_segments),
            "source_catalog": source_catalog,
            "previous_turns": history_text,
        }

        system_prompt = (
            "You are UdaPlay, an analyst for the video game industry. "
            "Answer the question using ONLY the provided context. "
            "Cite sources with the provided labels (e.g., [1]) and clearly state "
            "when information is unavailable. End with a short bullet list of key facts."
        )

        messages = [
            SystemMessage(content=system_prompt),
            UserMessage(content=json.dumps(payload, ensure_ascii=False)),
        ]
        ai_message = self.answer_llm.invoke(messages)

        confidence = float(evaluation.get("confidence", 0.4))
        citations = [
            {"label": f"[{idx}]", "source": source}
            for idx, source in enumerate(unique_sources, start=1)
        ] if unique_sources else []

        answer = ai_message.content.strip()

        history_entry = {
            "question": question,
            "answer": answer,
            "citations": citations,
            "confidence": confidence,
        }
        self.history.append(history_entry)

        return {
            "answer": answer,
            "citations": citations,
            "confidence": confidence,
            "history": self.history,
        }

    def ask(self, question: str) -> Dict[str, Any]:
        initial_state: GameAgentState = {"question": question}
        run = self.workflow.run(initial_state)
        final_state = run.get_final_state() or {}
        final_state.setdefault("question", question)
        self.traces.append(run)
        return final_state


In [43]:
agent = GameResearchAgent(
    retrieve_tool=retrieve_game,
    evaluate_tool=evaluate_retrieval,
    web_tool=game_web_search,
    answer_llm=answer_llm,
    memory_store=memory_store,
)

sample_questions = [
    "Who developed FIFA 21?",
    "When was God of War Ragnarok released?",
    "What platform was Pokemon Red launched on?",
    "What is Rockstar Games working on right now?",
    "Who made Dark Souls 3?",
    "Who wrote the story for Nier Automata?",
    "What is the most recent Star Ocean game?",
    "Who is the main character of FFVII?",
]

agent_outputs = []
for question in sample_questions:
    print(f" >>> {question}")
    result = agent.ask(question)
    agent_outputs.append(result)
    print(result.get("answer", "No answer generated."))
    print("Confidence:", result.get("confidence"))
    print("Citations:", result.get("citations"))

agent_outputs


 >>> Who developed FIFA 21?
[StateMachine] Starting: __entry__
[StateMachine] Executing step: retrieve
[StateMachine] Executing step: evaluate
[StateMachine] Executing step: web_search
[StateMachine] Executing step: finalize
[StateMachine] Terminating: __termination__
The provided context does not include information about the developer of FIFA 21. Therefore, I cannot answer your question based on the available data.

Key facts:
- No information on FIFA 21 developer is available in the context provided.
Confidence: 0.0
Citations: [{'label': '[1]', 'source': 'Halo Infinite'}, {'label': '[2]', 'source': 'Minecraft'}, {'label': '[3]', 'source': "Marvel's Spider-Man 2"}, {'label': '[4]', 'source': 'Mario Kart 8 Deluxe'}]
 >>> When was God of War Ragnarok released?
[StateMachine] Starting: __entry__
[StateMachine] Executing step: retrieve
[StateMachine] Executing step: evaluate
[StateMachine] Executing step: web_search
[StateMachine] Executing step: finalize
[StateMachine] Terminating: __te

[{'question': 'Who developed FIFA 21?',
  'retrieval': {'query': 'Who developed FIFA 21?',
   'matches': [{'rank': 1,
     'id': 'Halo Infinite',
     'document': "[Xbox Series X|S] Halo Infinite (2021) - The latest installment in the Halo franchise, featuring Master Chief's return in a new open-world setting.",
     'metadata': {'Publisher': 'Xbox Game Studios',
      'Description': "The latest installment in the Halo franchise, featuring Master Chief's return in a new open-world setting.",
      'YearOfRelease': 2021,
      'Platform': 'Xbox Series X|S',
      'Name': 'Halo Infinite',
      'Genre': 'First-person shooter'},
     'distance': 0.6915041208267212,
     'similarity': 0.3084958791732788},
    {'rank': 2,
     'id': 'Minecraft',
     'document': '[Xbox One] Minecraft (2014) - A sandbox game that allows players to build and explore infinite worlds, fostering creativity and adventure.',
     'metadata': {'Publisher': 'Mojang Studios',
      'Genre': 'Sandbox, Survival',
     

### (Optional) Advanced

In [None]:
# Optional: Inspect long-term memory fragments after running the agent.
memory_store.search("Rockstar Games")
