<a href="https://colab.research.google.com/github/Kvnhooman/AAI520_Final-Project_Group5/blob/tommy-dev-1/Financial_Review_AI_Agents_v_10_5_2212.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Title
**Group 5**

Final Project




___
## Outline


**Core Agent Functions**

* Planning Agent: Plans research steps for stock analysis


* Tool Use Agent: Integrates APIs (Yahoo Finance, SEC EDGAR, News APIs)


* Self-Reflection Agent: Evaluates output quality and iterates


* Learning Agent: Maintains memory across analysis runs


**Workflow Patterns**
* Prompt Chaining: News ingestion → preprocessing → classification → extraction → summarization


* Routing: Directs content to specialist analyzers (earnings, news, market)


* Evaluator-Optimizer: Generates analysis → evaluates quality → refines using feedback

In [4]:
# Install dependencis
# Attribution: Geek for Geeks tutorial - https://www.geeksforgeeks.org/artificial-intelligence/introduction-to-langchain/

# -- CORE LANGCHAIN AND PLUGINS --
!pip install -U langchain langchain-openai langchain-community langchain-google-genai

# -- LARGE MODEL PROVIDERS/SKILL ADAPTERS --
!pip install -U google-generativeai huggingface_hub openai

# -- COMMON TOOLING AND UTILITIES --
!pip install -U python-dotenv yfinance duckduckgo-search

# -- DUCKDUCKGO SEARCH API WRAPPER --
!pip install -U ddgs

# -- OPTIONAL: For advanced memory (semantic search/vector db) --
!pip install -U faiss-cpu  # For in-memory vector DBs (Lightweight)
# If you want persistent/production memory, add chromadb or qdrant-client

# -- Install LangGraph for advanced agent orchestration --
!pip install -U langgraph

# -- (OPTIONAL) For running Python tool actions securely --
!pip install -U restrictedpython


!pip install wikipedia

# Restart the runtime after running this cell if prompted!

Collecting langchain-openai
  Downloading langchain_openai-0.3.34-py3-none-any.whl.metadata (2.4 kB)
Collecting langchain-community
  Downloading langchain_community-0.3.30-py3-none-any.whl.metadata (3.0 kB)
Collecting langchain-google-genai
  Downloading langchain_google_genai-2.1.12-py3-none-any.whl.metadata (7.1 kB)
Collecting requests<3,>=2 (from langchain)
  Downloading requests-2.32.5-py3-none-any.whl.metadata (4.9 kB)
Collecting dataclasses-json<0.7.0,>=0.6.7 (from langchain-community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting google-ai-generativelanguage<1,>=0.7 (from langchain-google-genai)
  Downloading google_ai_generativelanguage-0.7.0-py3-none-any.whl.metadata (10 kB)
Collecting filetype<2,>=1.2 (from langchain-google-genai)
  Downloading filetype-1.2.0-py2.py3-none-any.whl.metadata (6.5 kB)
Collecting marshmallow<4.0.0,>=3.18.0 (from dataclasses-json<0.7.0,>=0.6.7->langchain-community)
  Downloading marshmallow-3.26.1-py3-none-any.w

Collecting openai
  Downloading openai-2.1.0-py3-none-any.whl.metadata (29 kB)
Collecting google-ai-generativelanguage==0.6.15 (from google-generativeai)
  Downloading google_ai_generativelanguage-0.6.15-py3-none-any.whl.metadata (5.7 kB)
Downloading google_ai_generativelanguage-0.6.15-py3-none-any.whl (1.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m28.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading openai-2.1.0-py3-none-any.whl (964 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m964.9/964.9 kB[0m [31m37.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: openai, google-ai-generativelanguage
  Attempting uninstall: openai
    Found existing installation: openai 1.109.1
    Uninstalling openai-1.109.1:
      Successfully uninstalled openai-1.109.1
  Attempting uninstall: google-ai-generativelanguage
    Found existing installation: google-ai-generativelanguage 0.7.0
    Uninstalling google-ai-generativela

Collecting duckduckgo-search
  Downloading duckduckgo_search-8.1.1-py3-none-any.whl.metadata (16 kB)
Collecting primp>=0.15.0 (from duckduckgo-search)
  Downloading primp-0.15.0-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Downloading duckduckgo_search-8.1.1-py3-none-any.whl (18 kB)
Downloading primp-0.15.0-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m41.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: primp, duckduckgo-search
Successfully installed duckduckgo-search-8.1.1 primp-0.15.0
Collecting ddgs
  Downloading ddgs-9.6.0-py3-none-any.whl.metadata (18 kB)
Collecting lxml>=6.0.0 (from ddgs)
  Downloading lxml-6.0.2-cp312-cp312-manylinux_2_26_x86_64.manylinux_2_28_x86_64.whl.metadata (3.6 kB)
Collecting socksio==1.* (from httpx[brotli,http2,socks]>=0.28.1->ddgs)
  Downloading socksio-1.0.0-py3-none-any.whl.metadata (6.1 kB)
Downloadin

In [21]:
#Import key libraries
import os
import requests
import re

from langchain_openai import OpenAI
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain.prompts.chat import ChatPromptTemplate, MessagesPlaceholder
from langchain.agents import create_react_agent, AgentExecutor, initialize_agent, Tool, AgentType
from langchain_core.tools import tool
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.chains import LLMChain
from langchain.tools import tool  # newer import for @tool decorator
from langchain.llms import OpenAI

from langgraph.prebuilt import create_react_agent

#Correct import path for YahooFinanceAPI
import yfinance as yf
from langchain_community.tools.yahoo_finance_news import YahooFinanceNewsTool
from langchain.prompts import PromptTemplate

#Memory
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List, Optional, Any, Union

In [2]:
#Set up LLM API Calls
from google.colab import userdata

gemini_key = userdata.get('GEMINI')
hf_key = userdata.get('HF_TOKEN')
openai_key = userdata.get('OPENAI')
fin_news = userdata.get('FIN_API_KEY')
tavily_key = userdata.get('TAVILY_API_KEY')

In [3]:
# Setup Gemini to use in Agents
from langchain_google_genai import ChatGoogleGenerativeAI
llm_gemini = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash",
    google_api_key=gemini_key
)

In [4]:
# Test if Gemini LLM is reachable
try:
    response = llm_gemini.invoke("Say 'GoogleAPI-success-check'")
    print("Gemini API Response:", response)
except Exception as e:
    print("Error reaching Gemini API:", e)


Gemini API Response: content='GoogleAPI-success-check' additional_kwargs={} response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'model_name': 'gemini-2.0-flash', 'safety_ratings': []} id='run--426b0b71-d3e5-49c8-9080-6df4542d314a-0'


In [5]:
from langchain_openai import ChatOpenAI

llm_openai = ChatOpenAI(
    model="gpt-3.5-turbo",  # "gpt-3.5-turbo", "gpt-4-mini", "gpt-5-mini" ...
    openai_api_key=openai_key,  # Your OpenAI API key
    temperature=0.0             # (optional) set as needed
)

In [6]:
#Test OpenAI API
#response = llm_openai.invoke("Explain how neural networks work in 10 words or less.")
response = llm_gemini.invoke("Explain how neural networks work in 10 words or less.")
print(response.content)

Learns patterns from data to make predictions.


## User Input Context Extractor Function

In [86]:
def create_initial_state(user_input):
    return {
        "user_input": user_input,
        "symbol": extract_symbol_simple(user_input),  # Simple regex
        "date": datetime.now().date().isoformat(),
        "tools_output": "",
        "evaluation_output": "",
        "final_output": ""
    }

In [87]:
def extract_symbol_simple(text):
    """Just grab first 2-5 char uppercase word"""
    match = re.search(r"\b([A-Z]{2,5})\b", text)
    return match.group(1) if match else "UNKNOWN"

In [27]:
def extract_context(user_input):
    """
    Extracts key context fields from user input for agent API use.

    Fields:
      - input (str): raw user text
      - symbol (str): detected uppercase ticker (2-5 chars)
      - company_name (str): placeholder for future NLP extraction
      - date (str): today's date in ISO format (YYYY-MM-DD)
      - agent_scratchpad (str): empty, reserved for intermediate agent notes
      - research_notes (str): empty, reserved for additional notes
    """
    # Extract uppercase ticker symbol (2-5 chars)
    match = re.search(r"\b([A-Z]{2,5})\b", user_input)
    symbol = match.group(1) if match else ""

    # Placeholder for company name extraction
    company_name = ""  # Optional: implement NER for this

    # Today's date in ISO format
    today = datetime.now().date().isoformat()

    # Build context dictionary
    context = {
        'input': user_input,
        'symbol': symbol,
        'company_name': company_name,
        'date': today,
        'agent_scratchpad': '',
        'research_notes': ''
    }
    return context

In [85]:
#Input extraction output
context = extract_context(user_input)
print(context)

{'input': 'Analyze the AAPL stock', 'symbol': 'AAPL', 'company_name': '', 'date': '2025-10-06', 'agent_scratchpad': '', 'research_notes': ''}


## Tool Agent
Integrates APIs (Yahoo Finance, SEC EDGAR, News APIs)

List of Tools:
- Web Search
- API call - Yahoo Finance
- to be updated...

### Tool Functions


In [None]:
#For potential future use
#from fredapi import Fred
##fred = Fred(api_key='YOUR_API_KEY')
#data = fred.get_series('SP500')

In [28]:
#Yahoo Finance Tool
def get_stock_price(ticker: str) -> str:
    try:
        price = yf.Ticker(ticker).info['regularMarketPrice']
        return f"{ticker} price is {price}"
    except Exception as e:
        return f"Error fetching price for {ticker}: {e}"

yahoo_api_tool = Tool(
    name="YahooFinanceAPI",
    func=get_stock_price,
    description="Queries Yahoo Finance for stock price and financials"
)

In [31]:
#SEC Filings
def get_sec_filings(ticker: str) -> str:
    cik_url = "https://www.sec.gov/files/company_tickers.json"
    headers = {"User-Agent": "tpool@sandiego.edu"}  # Use your real email!
    cik_resp = requests.get(cik_url, headers=headers)
    print(f"cik_resp: {cik_resp.status_code}, {cik_resp.text[:100]}")

    if cik_resp.status_code != 200:
        return f"SEC.gov rejected our request: {cik_resp.status_code}\n{cik_resp.text[:200]}"

    try:
        cik_data = cik_resp.json()
        cik_lookup = {item['ticker']: item['cik_str'] for item in cik_data.values()}
        cik = cik_lookup.get(ticker.upper())
    except Exception as e:
        return f"Error parsing CIK data: {e}"

    if not cik:
        return f"CIK for {ticker} not found."

    filings_url = f"https://data.sec.gov/submissions/CIK{cik:0>10}.json"
    filings_resp = requests.get(filings_url, headers=headers)

    try:
        data = filings_resp.json() if filings_resp.status_code == 200 else {}
        filings = data.get('filings', {}).get('recent', {})
        if filings:
            forms = filings.get('form', [])[:3]
            filing_dates = filings.get('filingDate', [])[:3] #If successful, get filing dates
            filing_links = filings.get('primaryDocument', [])[:3] #Also get three primary documents for agent
            result = []
            for f, d, l in zip(forms, filing_dates, filing_links):
                result.append(f"{f} on {d}: {l}")
            return f"Latest filings for {ticker}:\n" + "\n".join(result)
        else:
            return f"No filings found for {ticker}."
    except Exception as e:
        return f"Error loading filings for {ticker}: {e}"


sec_api_tool = Tool(
    name="SECEDGARAPI",
    func=get_sec_filings,
    description="Retrieves SEC filings on stock symbol"
)

In [32]:
#For debugging
#print(get_sec_filings("TSLA"))

In [33]:
def get_fin_news(symbol: str) -> str:
    api_key = fin_news
    url = "https://newsapi.org/v2/everything"
    params = {
        "q": symbol,
        "apiKey": api_key,
        "sortBy": "publishedAt",
        "language": "en"
    }
    response = requests.get(url, params=params)
    if response.status_code != 200:
        return f"API error {response.status_code}: {response.text[:200]}"
    data = response.json()
    articles = data.get("articles", [])
    if not articles:
        return f"No news found for {symbol}. Full message: {data.get('message', '')}"
    return "\n".join([a["description"] or a["title"] for a in articles[:3]])


news_api_tool = Tool(
    name="NewsAPI",
    func=get_fin_news,  #Assumes you've defined this class
    description="Finds recent financial news on stock symbol"
)

In [34]:
def get_fin_news_tavily(symbol: str) -> str:
    api_key = tavily_key
    url = "https://api.tavily.com/search"
    # You can optimize the query for financial news by including keywords:
    query = f"{symbol} financial news"
    payload = {
        "query": query,
        "api_key": api_key,
        "max_results": 3,  # You can set number of results as you prefer
    }
    response = requests.post(url, json=payload)
    if response.status_code == 200:
        data = response.json()
        results = data.get("results", [])
        # Each result contains title, link, snippet, etc.
        return "\n".join([r.get("title", "No title") for r in results]) if results else "No news found."
    else:
        return f"Error from Tavily: {response.status_code} {response.text}"

tavily_news_tool = Tool(
    name="TavilyNewsSearch",
    func=get_fin_news_tavily,
    description="Searches web for recent financial news about a stock symbol using Tavily API."
)

In [35]:
#Wikipedia Search Tool
from wikipedia import summary

def search_wikipedia(query: str) -> str:
    # If query looks like a ticker (e.g., all caps, 2-5 chars), get company name
    if query.isupper() and 2 <= len(query) <= 5:
        company_name = get_company_name(query)
        search_term = f"{company_name} stock"
    else:
        search_term = query

    try:
        return summary(search_term, sentences=2)
    except Exception:
        # Fallback, try just company name (without "stock")
        if search_term != query:
            try:
                return summary(company_name, sentences=2)
            except Exception:
                pass
        return "I couldn't find any information on that."

wikipedia_tool = Tool(
    name="WikipediaSearch",
    func=search_wikipedia,
    description="Searches Wikipedia and returns a summary for a stock symbol or company name."
)

In [36]:
from transformers import pipeline

class HuggingFaceSentimentTool:
    def __init__(self):
        self.classifier = pipeline("sentiment-analysis")

    def analyze(self, text):
        result = self.classifier(text)[0]
        # result['label'] is 'POSITIVE' or 'NEGATIVE'
        return 1 if result['label'] == "POSITIVE" else -1

sentiment_api_tool = Tool (
    name="HuggingFaceSentiment",
    func=HuggingFaceSentimentTool().analyze,
    description="Analyzes sentiment of text using HuggingFace"
)

No model was supplied, defaulted to distilbert/distilbert-base-uncased-finetuned-sst-2-english and revision 714eb0f (https://huggingface.co/distilbert/distilbert-base-uncased-finetuned-sst-2-english).
Using a pipeline without specifying a model name and revision in production is not recommended.


config.json:   0%|          | 0.00/629 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

Device set to use cpu


In [37]:
#Analyzer specialist tool
class EarningsAnalyzer:
    def __init__(self, yahoo_tool, sec_tool):
        self.yahoo_tool = yahoo_tool
        self.sec_tool = sec_tool

    def analyze(self, symbol, messages=None):
        # 1. Try to find earnings info in past messages
        yahoo_data = None
        if messages:
            for msg in reversed(messages):
                # ToolMessage expected structure
                # Check by tool name and relevant symbol
                if (
                    hasattr(msg, "name") and msg.name == "YahooFinanceAPI"
                    and hasattr(msg, "content") and symbol in msg.content
                ):
                    yahoo_data = msg.content  # or parse accordingly
                    break
        # 2. If not found, call API
        if yahoo_data is None:
            yahoo_data = self.yahoo_tool.fetch_earnings(symbol)
        # ... do similar for sec filings ...
        # 3. Build and return your summary
        return f"Analysis based on earnings: {yahoo_data}"

earnings_specialist = Tool(
    name="EarningsAnalyzer",
    func=lambda symbol, messages: EarningsAnalyzer(yahoo_api_tool, sec_api_tool).analyze(symbol, messages=messages),
    description="Expert analysis of earnings via Yahoo Finance and SEC filings"
)

In [38]:
class SentimentAnalyzer:
    def __init__(self, sentiment_tool, news_tool):
        self.sentiment_tool = sentiment_tool  # expects .analyze(text)
        self.news_tool = news_tool            # expects .get_news(symbol)

    def analyze(self, symbol, messages=None):
        # 1. Find any recent news/sentiment in agent messages
        headlines = []
        if messages:
            for msg in reversed(messages):
                # If you have ToolMessage for news and symbol in content
                if hasattr(msg, "name") and msg.name in ["NewsAPI", "TavilyNewsSearch"]:
                    if hasattr(msg, "content") and symbol in msg.content:
                        # Try splitting headlines (change this parsing as needed)
                        headlines += msg.content.split("\n")
                # If you cache sentiment scores/results, you can parse those too!

        # 2. If no headlines found, call news API/tool
        if not headlines:
            news_items = self.news_tool.get_news(symbol)
            headlines = [item['headline'] for item in news_items]

        # 3. Get sentiment scores for each headline
        if headlines:
            scores = [self.sentiment_tool.analyze(h) for h in headlines]
            if scores:
                mean_score = sum(scores) / len(scores)
                recommendation = self._interpret_score(mean_score)
                return (f"Average news sentiment for {symbol}: {mean_score:.2f}\n"
                        f"Recommendation: {recommendation}")
            else:
                return "No sentiment scores could be computed."
        else:
            return "No recent news headlines found."

    def _interpret_score(self, score):
        # Customize thresholds to your model's scoring scale
        if score > 0.2:
            return "Buy"
        elif score > -0.2:
            return "Hold"
        else:
            return "Sell"

sentiment_anaylsis_specialist = Tool(
    name="SentimentAnalyzer",
    func=lambda symbol, messages: SentimentAnalyzer(sentiment_api_tool, news_api_tool).analyze(symbol, messages=messages),
    description="Aggregates news sentiment for a stock and suggests buy/hold/sell."
)

In [None]:
#simpler way to create tool?
#def get_weather(city: str) -> str:
#    """Get weather for a given city."""
#    return f"It's always sunny in {city}!"

In [39]:
#Create the tools list
tools = [
    yahoo_api_tool,
    sec_api_tool,
    news_api_tool,
    tavily_news_tool,
    wikipedia_tool,
    sentiment_api_tool,
    earnings_specialist,
    sentiment_anaylsis_specialist,
    wikipedia_tool
]

In [80]:
'''
tools_prompt = PromptTemplate(
    input_variables=["input", "symbol", "company_name", "date", "agent_scratchpad", "research_notes"],
    template="""You are an autonomous investment research agent.
Today's Date: {date}
User Input: {input}
Stock Symbol: {symbol}

Company: {company_name}
Scratchpad: {agent_scratchpad}
Research Notes: {research_notes}

You are an autonomous investment research agent.
Instructions:
1. Identify the relevant stock symbol for {input}. If {symbol} is not provided, use the tools or reasoning to determine it from the company name.
2. Plan your research steps dynamically for {input} ({symbol}) (select relevant data & order).
3. Use available tools for financials, news, and economics as required.
4. Route to specialist modules (earnings, market sentiment, news) when appropriate.
5. Evaluate your results and refine if necessary to improve analysis quality.
6. Save lessons or notes in research_notes for future runs.
Output a complete investment report with evidence and recommendations.

{agent_scratchpad}
Persistent notes: {research_notes}

Start your workflow:

"""
)


### Tools Agent Function

In [61]:
tools_agent = create_react_agent(
    model=llm_openai,
    tools=tools,
    prompt=prompt="You are a financial research agent. Analyze the user's request and use your tools to gather information."
)

In [None]:
#For testing
#Example:
#tools_output = tools_agent.invoke({"messages": [{'role': 'user', 'content': 'Research TSLA'}]})
#print(result)

## Self Reflection & Evaluation Agent
Evaluates output quality

In [93]:
# Define Self Evaluation agent

EVAL_PROMPT = """
You are an expert evaluator. Ensure the result is no more than 100 words and is formatted for easy reading including bulleted lists, and a clear title at the top.

In addition, review the analysis/summary below for:

- Completeness (all important steps/points covered)
- Succinctness (concise, minimal repetition)
- Accuracy (supported by real or tool-sourced information)
- Clarity (well organized, easy to follow)

If human feedback is provided, incorporate any specific suggestions or corrections if appropriate.

Give clear, specific suggestions if any improvement is needed.
Return the revised summary/answer if changes are warranted.
Otherwise, state that the answer is adequate.

--- Analysis To Evaluate ---
{input}

--- Human Feedback ---
{human_feedback}
"""

evaluator_agent = create_react_agent(
    model=llm_openai, # Or gemini_model if that's your variable
    tools = [],
    prompt=EVAL_PROMPT
)

In [None]:
#Run for testing
#evaluator_output = self_evaluator.invoke({"input": tools_output})
#print(evaluator_output)

## Optimization Agent

In [44]:
# Define optimizer agent

OPTIMIZER_PROMPT = """
You are an optimization agent. Given the evaluator's feedback and the initial summary below,
produce a revised version that fixes any weaknesses cited (completeness, succinctness, accuracy, clarity).
Ensure the answer is relevant, including the stock symobl and data retrieved.

Limit response to 100 words with a simple bulleted format for key info like stock symobl, company name, today's date, etc.. Only write what you have found, don't make anything up.

Evaluator Feedback:
{feedback}
Initial Answer:
{answer}

---
Optimized Revised Answer:


"""

optimizer_agent = create_react_agent(
    model=llm_openai, # Or gemini_model if that's your variable
    tools = [],
    prompt=OPTIMIZER_PROMPT
)

In [2]:
#For testing
#answer = tools_output
#feedback = evaluator_output
#print(answer)
#print(feedback)
#optimized_result = optimizer_agent.invoke({
#    "answer": answer,
#    "feedback": feedback
#})
#print(optimized_result)

## Learning Agent
Maintains memory across analysis runs

In [83]:
#MEMORY AGENT CELL 1
#Session-scoped memory

@dataclass
class MemoryItem:
    symbol: str
    question: str
    answer: str
    created_at: str
    meta: Dict[str, Any]

class SessionMemory:
    def __init__(self, max_items: int = 200, max_per_symbol: int = 10):
        self._store: Dict[str, List[MemoryItem]] = {}
        self.max_items = max_items
        self.max_per_symbol = max_per_symbol

    def remember(self, symbol: str, question: str, answer: str, **meta) -> None:
        symbol = (symbol or "GENERIC").upper().strip()
        item = MemoryItem(
            symbol=symbol,
            question=(question or "").strip(),
            answer=(answer or "").strip(),
            created_at=datetime.utcnow().isoformat(timespec="seconds"),
            meta=meta or {}
        )
        bucket = self._store.setdefault(symbol, [])
        bucket.append(item)
        if len(bucket) > self.max_per_symbol:
            del bucket[0 : len(bucket) - self.max_per_symbol]
        self._cap_global()

    def recall(self, symbol: str, question: Optional[str] = None) -> Optional[str]:
        symbol = (symbol or "GENERIC").upper().strip()
        bucket = self._store.get(symbol, [])
        if not bucket:
            return None
        if not question:
            return bucket[-1].answer
        q = (question or "").strip()
        for item in reversed(bucket):
            if item.question == q:
                return item.answer
        return None

    def latest(self, symbol: str) -> Optional[MemoryItem]:
        symbol = (symbol or "GENERIC").upper().strip()
        bucket = self._store.get(symbol, [])
        return bucket[-1] if bucket else None

    def _cap_global(self):
        all_items = []
        for sym, bucket in self._store.items():
            for it in bucket:
                all_items.append((it.created_at, sym, it))
        if len(all_items) <= self.max_items:
            return
        all_items.sort(key=lambda x: x[0])  # oldest first
        to_drop = len(all_items) - self.max_items
        cutoff = set(id(it) for _, _, it in all_items[:to_drop])
        for sym in list(self._store.keys()):
            self._store[sym] = [it for it in self._store[sym] if id(it) not in cutoff]

SESSION_MEMORY = SessionMemory()

def extract_symbol(text: str) -> str:
    """
    Grab a likely ticker from the user_input like 'Analyze the SPY stock ticker'.
    Simple heuristic: first ALL-CAPS token 1-5 chars (e.g., AAPL, MSFT, SPY).
    Falls back to 'GENERIC' if none found.
    """
    if not text:
        return "GENERIC"
    candidates = re.findall(r"\b[A-Z]{1,5}\b", text)
    # Light filter for common English words
    stop = {"THE","AND","FOR","WITH","FROM","THIS","THAT","YOUR","HAVE","HOLD"}
    for c in candidates:
        if c not in stop:
            return c
    return "GENERIC"

def as_text(x: Any) -> str:
    """
    Normalize whatever comes back from planner/tools/evaluator/optimizer into a string.
    Works with LangChain AgentExecutor outputs (dict), AIMessage, or raw str.
    """
    try:
        # AIMessage / ChatMessage
        if hasattr(x, "content"):
            return str(x.content)
        # Agent-like dicts
        if isinstance(x, dict):
            if "output" in x and isinstance(x["output"], str):
                return x["output"]
            if "messages" in x and isinstance(x["messages"], list):
                return "\n\n".join(
                    (m.content if hasattr(m, "content") else str(m))
                    for m in x["messages"]
                )
        # plain string
        if isinstance(x, str):
            return x
        return str(x)
    except Exception:
        return str(x)

## Multiple Agent Setup

In [81]:
user_input = "Analyze the AAPL stock" #Example user input.

In [94]:
# 1. Start with a simple state dictionary
def create_initial_state(user_input):
    return {
        "user_input": user_input,
        "symbol": extract_symbol_simple(user_input),  # Simple regex
        "date": datetime.now().date().isoformat(),
        "tools_output": "",
        "evaluation_output": "",
        "final_output": ""
    }

def extract_symbol_simple(text):
    """Just grab first 2-5 char uppercase word"""
    match = re.search(r"\b([A-Z]{2,5})\b", text)
    return match.group(1) if match else "UNKNOWN"

# 2. Simple agent creation - let the agent handle its own prompting
tools_agent = create_react_agent(
    model=llm_openai,
    tools=tools,
    # Use a simple string prompt, not a template
    prompt="You are a financial research agent. Analyze the user's request and use your tools to gather information."
)

# 3. Simplified execution pipeline
def run_analysis_pipeline(user_input):
    # Create state
    state = create_initial_state(user_input)
    print(f"Analyzing: {user_input}")
    print(f"Symbol: {state['symbol']}")

    # Tools agent - pass the user input directly
    try:
        tools_result = tools_agent.invoke({"messages": [{"role": "user", "content": user_input}]})
        state["tools_output"] = str(tools_result)
        print("✓ Tools agent completed")
    except Exception as e:
        print(f"✗ Tools agent failed: {e}")
        return state

    # Evaluator agent
    try:
        eval_result = evaluator_agent.invoke({"messages": [{"role": "user", "content": f"Evaluate this analysis: {state['tools_output']}"}]})
        state["evaluation_output"] = str(eval_result)
        print("✓ Evaluator completed")
    except Exception as e:
        print(f"✗ Evaluator failed: {e}")
        return state

    # Optimizer agent
    try:
        optimizer_result = optimizer_agent.invoke({"messages": [{"role": "user", "content": f"Optimize this analysis based on evaluation:\nAnalysis: {state['tools_output']}\nEvaluation: {state['evaluation_output']}"}]})
        state["final_output"] = str(optimizer_result)
        print("✓ Optimizer completed")
    except Exception as e:
        print(f"✗ Optimizer failed: {e}")
        return state

    return state

In [95]:
#Execute code above
result = run_analysis_pipeline(user_input)

Analyzing: Analyze the AAPL stock
Symbol: AAPL
cik_resp: 200, {"0":{"cik_str":1045810,"ticker":"NVDA","title":"NVIDIA CORP"},"1":{"cik_str":789019,"ticker":"MSFT"
✓ Tools agent completed
✓ Evaluator completed
✓ Optimizer completed


In [82]:
print("Evaluating user input...")

# Run the context extraction function directly
context_output = extract_context(user_input)
print(context_output)

#Tools Agent
print("Tools Agent Reasoning...")


try:
    tools_output = tools_agent.invoke(context_output)
    print("Tools Agent Output:", tools_output)
except Exception as e:
    print("Tools Agent Exception:", type(e), e)
print()

#Evaluator Agent
print("Evaluator Agent Reasoning...")
evaluation_output = self_evaluator.invoke({"input": tools_output})
print("Evaluator Agent Output:", evaluation_output)
print()

#Optimizer Agent
print("Optimizer Agent Reasoning...")
optimizer_input = {
    "tools_output": tools_output,
    "evaluation_output": evaluation_output
}
optimizer_output = optimizer_agent.invoke(optimizer_input)
print()

def print_optimizer_output(optimizer_output):
    print("Raw optimizer_output:")
    print(optimizer_output)
    print("\n---\n")

    if isinstance(optimizer_output, str):
        for para in optimizer_output.split('\n\n'):
            print(para)
            print()
    elif isinstance(optimizer_output, dict):
        # Check if 'messages' is present and is a list of message objects
        if "messages" in optimizer_output:
            messages = optimizer_output["messages"]
            for idx, msg in enumerate(messages):
                # Handles AIMessage or generic object with .content
                try:
                    content = msg.content
                except AttributeError:
                    content = str(msg)
                print(f"Message {idx+1} content:\n")
                for para in content.split('\n\n'):
                    print(para)
                    print()
        else:
            print("optimizer_output is a dict but no 'messages' key found. Printing dict keys/values:")
            for k, v in optimizer_output.items():
                print(f"{k}: {v}\n")
    else:
        print("optimizer_output type could not be handled:")
        print(type(optimizer_output))

# Usage:
print_optimizer_output(optimizer_output)

Evaluating user input...
{'input': 'Analyze the AAPL stock', 'symbol': 'AAPL', 'company_name': '', 'date': '2025-10-06', 'agent_scratchpad': '', 'research_notes': ''}
Tools Agent Reasoning...
Tools Agent Exception: <class 'KeyError'> "Input to PromptTemplate is missing variables {'research_notes', 'agent_scratchpad', 'input', 'symbol'}.  Expected: ['agent_scratchpad', 'input', 'research_notes', 'symbol'] Received: ['messages', 'remaining_steps']\nNote: if you intended {research_notes} to be part of the string and not a variable, please escape it with double curly braces like: '{{research_notes}}'.\nFor troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/INVALID_PROMPT_INPUT "

Evaluator Agent Reasoning...
Evaluator Agent Output: {'messages': [AIMessage(content='### Evaluation of Analysis on Evaluating a Business Idea\n\nThe analysis provided a comprehensive guide on evaluating a business idea, covering key steps such as market research, identifying target c

In [84]:
# After running the user query through your whole pipeline:
user_question = user_input
symbol = extract_symbol(user_input)
final_answer = as_text(optimizer_output)   # Use your utility function

SESSION_MEMORY.remember(symbol, user_question, final_answer)

# Later, you can recall the latest answer for "AAPL":
prev = SESSION_MEMORY.recall("AAPL")
if prev:
    print("Previous answer for AAPL:", prev)

Previous answer for AAPL: - Stock Symbol: AAPL
- Company Name: Apple Inc.
- Date: October 15, 2021
- Closing Price: $143.76


  created_at=datetime.utcnow().isoformat(timespec="seconds"),
