In [1]:
# First, let's install the library
%pip install langgraph

# Now, import the basics we will need
import os
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END

print("LangGraph installed and libraries imported. Ready for next step!")

Collecting langgraph
  Downloading langgraph-0.6.10-py3-none-any.whl.metadata (6.8 kB)
Collecting langchain-core>=0.1 (from langgraph)
  Downloading langchain_core-0.3.79-py3-none-any.whl.metadata (3.2 kB)
Collecting langgraph-checkpoint<3.0.0,>=2.1.0 (from langgraph)
  Downloading langgraph_checkpoint-2.1.2-py3-none-any.whl.metadata (4.2 kB)
Collecting langgraph-prebuilt<0.7.0,>=0.6.0 (from langgraph)
  Downloading langgraph_prebuilt-0.6.4-py3-none-any.whl.metadata (4.5 kB)
Collecting langgraph-sdk<0.3.0,>=0.2.2 (from langgraph)
  Downloading langgraph_sdk-0.2.9-py3-none-any.whl.metadata (1.5 kB)
Collecting xxhash>=3.5.0 (from langgraph)
  Downloading xxhash-3.6.0-cp311-cp311-macosx_11_0_arm64.whl.metadata (13 kB)
Collecting ormsgpack>=1.10.0 (from langgraph-checkpoint<3.0.0,>=2.1.0->langgraph)
  Downloading ormsgpack-1.11.0-cp311-cp311-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl.metadata (1.2 kB)
Collecting langsmith<1.0.0,>=0.3.45 (from langchain-core>=0.1->lan

In [None]:
import os

os.environ["OPENAI_API_KEY"] = "OpenAI API key goes here"


# --- Check ---
assert "OPENAI_API_KEY" in os.environ, "Please set your OPENAI_API_KEY"
print("✅ API key is ready.")

✅ API key is ready.


In [4]:
from pathlib import Path
import pandas as pd
import chromadb

def retrieve_documents(target_date: str, query_text: str = "Apple AAPL", window_days: int = 2, top_k: int = 8) -> list:
    """
    This function finds relevant news articles from the ChromaDB database for a specific date.
    It takes a date as an ingredient and returns a list of documents as its finished dish.
    """
    print(f"--- Running Retriever for date: {target_date} ---")

    # --- Define file paths ---
    DATA_DIR = Path("../data").resolve()
    INDEX_DIR = (DATA_DIR / "chroma_index" / "why-move-v1").resolve()
    COLLECTION_NAME = "why-move-v1"

    # --- Connect to the database ---
    client = chromadb.PersistentClient(path=str(INDEX_DIR))
    collection = client.get_collection(name=COLLECTION_NAME) # Use get_collection, assuming it exists

    # --- Build the date window to search within ---
    td = pd.to_datetime(target_date)
    window = pd.date_range(td - pd.Timedelta(days=window_days),
                           td + pd.Timedelta(days=window_days), freq="D")
    window_str = window.strftime("%Y-%m-%d").tolist()

    # --- Query the database with a filter for the date window ---
    res = collection.query(
        query_texts=[query_text],
        n_results=top_k,
        where={"date": {"$in": window_str}},
    )

    # The "documents" key contains the list of news article texts we found.
    documents = res.get("documents", [[]])[0]

    print(f"Retrieved {len(documents)} documents.")

    # This is the "finished dish" that our function serves.
    return documents

In [5]:
# Tool 2: The Generator

import json
import re
from typing import List, Literal
from pydantic import BaseModel, Field, ValidationError, constr, confloat
from openai import OpenAI

# --- Pydantic schema. It defines the structure of our desired JSON output. ---
class Citation(BaseModel):
    title: constr(min_length=2)
    url: str = Field(default="")

class WhyMove(BaseModel):
    date: constr(pattern=r"\d{4}-\d{2}-\d{2}")
    ticker: constr(min_length=1)
    explanation: constr(min_length=10, max_length=120)
    sentiment: Literal["positive", "neutral", "negative"]
    confidence: confloat(ge=0.0, le=1.0)
    citations: List[Citation] = Field(min_length=1)


def generate_answer(retrieved_docs: list, target_date: str, ticker: str = "AAPL") -> dict:
    """
    This function takes a list of documents and generates a structured JSON answer using an AI model.
    Its ingredients are the documents from the Retriever, and its dish is the final, validated answer.
    """
    print("--- Running Generator ---")

    # --- If no documents were found, return a default "insufficient context" message ---
    if not retrieved_docs:
        print("Generator received no documents. Returning default answer.")
        default_explanation = {
            "date": target_date,
            "ticker": ticker,
            "explanation": "Insufficient context; no documents were found to analyze.",
            "sentiment": "neutral",
            "confidence": 0.1,
            "citations": [{"title": "No documents found", "url": ""}],
        }
        return default_explanation

    # --- Build the context string from the documents ---
    context_str = "\n".join(f"- {doc}" for doc in retrieved_docs)

    # --- Create the prompt for the AI ---
    prompt = f"""
    You are a careful financial analyst. Your task is to return STRICT JSON answering: "Why did {ticker} move on {target_date}?"
    Use ONLY the following context:
    {context_str}

    Rules:
    - If context is insufficient, say so briefly in the explanation.
    - The explanation must be 120 characters or less.
    - sentiment must be one of: "positive", "neutral", or "negative".
    - confidence is a score from 0.0 to 1.0.
    - The citations list must have at least one item. You can use the document text as the "title".

    Return ONLY a valid JSON object matching this schema:
    {{
      "date": "{target_date}",
      "ticker": "{ticker}",
      "explanation": "...",
      "sentiment": "...",
      "confidence": 0.0,
      "citations": [{{"title":"...", "url":""}}]
    }}
    """

    # --- Call the AI model ---
    client = OpenAI()
    resp = client.chat.completions.create(
        model="gpt-4o-mini", # Using a more modern and reliable model
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"},
    )

    raw_json_output = resp.choices[0].message.content

    # --- Validate the output with our Pydantic schema ---
    try:
        parsed_output = WhyMove.model_validate_json(raw_json_output)
        print("Generator successfully created a valid answer.")
        return parsed_output.model_dump()
    except ValidationError as e:
        print(f"ERROR: AI returned invalid JSON. Error: {e}")
        # In case of an error, we return a structured error message
        error_explanation = {
            "date": target_date,
            "ticker": ticker,
            "explanation": "Error: The AI model returned a malformed response.",
            "sentiment": "neutral",
            "confidence": 0.0,
            "citations": [{"title": "Malformed AI response", "url": ""}],
        }
        return error_explanation

In [16]:
# STEP 2 - DESIGNING THE AGENT
# Action 2.1: Define the Agent's Memory (the "State")

class AgentState(TypedDict):
    # The original question, the date we are asking about.
    target_date: str

    # A list to hold the news articles our retriever finds.
    retrieved_docs: list

    # A dictionary to hold the final JSON answer from the generator.
    generation: dict
        # NEW: Add a counter for retries
    retries: int 

In [18]:
# Action 2.2: Define the Agent's Jobs (the "Nodes")

# This is the first job: retrieving documents.
# UPGRADE to the retrieve_node
def retrieve_node(state: AgentState) -> dict:
    print("--- Node: RETRIEVE ---")
    # NEW: Increment the retry counter each time this node is run.
    retries = state.get("retries", 0) + 1 

    target_date = state["target_date"]
    documents = retrieve_documents(target_date)

    # Write both the documents and the new retry count back to the clipboard.
    return {"retrieved_docs": documents, "retries": retries}

# This is the second job: generating the answer.
def generate_node(state: AgentState) -> dict:
    """
    This node calls our generator tool.
    It reads the documents from the state, runs the generator,
    and writes the final JSON answer back to the state.
    """
    print("--- Node: GENERATE ---")
    # Get the ingredients from the state's clipboard.
    target_date = state["target_date"]
    retrieved_docs = state["retrieved_docs"]

    # Run our "toolbox" function from Step 1.
    generation = generate_answer(retrieved_docs, target_date)

    # Write the final dish back to the clipboard.
    return {"generation": generation}

In [19]:
# This is the third job: critiquing the answer.
def critique_node(state: AgentState) -> dict:
    """
    This node checks the answer generated in the previous step.
    It looks for keywords that suggest the answer is not good.
    For now, it just prints its finding. It doesn't return anything.
    """
    print("--- Node: CRITIQUE ---")
    # Get the generated answer from the clipboard.
    generation = state["generation"]

    # Check if the explanation contains "insufficient" or "error".
    if "insufficient" in generation["explanation"].lower() or "error" in generation["explanation"].lower():
        # If it does, we consider the answer to be bad.
        print("Critique: Generation is not acceptable. Needs a retry.")
    else:
        # Otherwise, the answer looks good.
        print("Critique: Generation looks good.")

    # This node doesn't need to change the state, so we return an empty dictionary.
    return {}

In [20]:
# STEP 3 - CONNECTING THE DOTS
# Action 3.1: Create the Whiteboard (Initialize the Graph)

# We create an instance of the StateGraph class.
# We pass it our AgentState blueprint to tell it how to structure its memory.
workflow = StateGraph(AgentState)

In [21]:
# Action 3.2: Add the Jobs to the Whiteboard (Add the Nodes)

# We add each job (node) to our workflow.
# We give each node a simple name (a string) and tell it which function to run for that job.
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)
workflow.add_node("critique", critique_node)

<langgraph.graph.state.StateGraph at 0x30b122010>

In [22]:
# Action 3.3: Draw the Arrows (Add the Edges)

# First, the simple arrows.
# After "retrieve", it should always go to "generate".
workflow.add_edge("retrieve", "generate")
# After "generate", it should always go to "critique".
workflow.add_edge("generate", "critique")

<langgraph.graph.state.StateGraph at 0x30b122010>

In [23]:
# This is our "traffic controller" function.
# UPGRADE to the decision logic
def decide_to_finish_or_retry(state: AgentState) -> str:
    print("--- Node: DECIDE ---")
    generation = state["generation"]
    retries = state["retries"]

    # NEW: Check the retry counter first.
    if retries > 2:
        print(f"Decision: Reached max retries ({retries}). Finishing.")
        return "finish"

    if "insufficient" in generation["explanation"].lower() or "error" in generation["explanation"].lower():
        print(f"Decision: Answer not good. Retrying (attempt {retries}).")
        return "retry"
    else:
        print("Decision: Answer is good. Finishing.")
        return "finish"

In [24]:
# Now we add the special conditional arrow to the workflow.
workflow.add_conditional_edges(
    # The starting point of the decision is the "critique" node.
    "critique",
    # The function that makes the decision is our "traffic controller".
    decide_to_finish_or_retry,
    # This is the "map" that tells the agent where to go based on the decision.
    {
        "retry": "retrieve", # If the decision is "retry", go back to the "retrieve" job.
        "finish": END,       # If the decision is "finish", end the entire process.
    }
)

<langgraph.graph.state.StateGraph at 0x30b122010>

In [25]:
# Action 3.4: Set the Start Line

# We tell our workflow that the "retrieve" node is always the first step.
workflow.set_entry_point("retrieve")

<langgraph.graph.state.StateGraph at 0x30b122010>

In [26]:
# STEP 4 - BUILD AND RUN
# Action 4.2: Compile the workflow into a runnable app

app = workflow.compile()
print("Agent workflow compiled successfully!")

Agent workflow compiled successfully!


In [27]:
# Action 4.3: Run the agent and stream the events

# This is the initial "clipboard" we give the agent. 
# It only contains the question we want to ask.
inputs = {
    "target_date": "2011-08-09"
}

# We run the app by streaming its events.
# This lets us see the output of each node as it runs.
for event in app.stream(inputs):
    for key, value in event.items():
        # Print the name of the node that just ran
        print(f"--- Event: Node '{key}' Finished ---")
        # Print the content of the agent's clipboard (the state)
        print(value)
        print("\n")

Failed to send telemetry event ClientStartEvent: capture() takes 1 positional argument but 3 were given


--- Node: RETRIEVE ---
--- Running Retriever for date: 2011-08-09 ---


Failed to send telemetry event CollectionQueryEvent: capture() takes 1 positional argument but 3 were given


Retrieved 8 documents.
--- Event: Node 'retrieve' Finished ---
{'retrieved_docs': ["Nokia's U.S. N9 Plans; AOL Tanks", 'Smartphones Drive U.S. Cellular 2Q - Analyst Blog', 'AOL Plans $250 Million Stock Buyback', 'TV on Tablets - A Reality - Analyst Blog', 'AOL to Buy Back $250M in Stock', 'Activision Promotes PROTOTYPE 2 - Analyst Blog', '5 Stocks to Watch: Bank of America, Cisco', 'StanCorp Financial (SFG) - Bear of the Day'], 'retries': 1}


--- Node: GENERATE ---
--- Running Generator ---


Failed to send telemetry event ClientStartEvent: capture() takes 1 positional argument but 3 were given


Generator successfully created a valid answer.
--- Event: Node 'generate' Finished ---
{'generation': {'date': '2011-08-09', 'ticker': 'AAPL', 'explanation': 'Insufficient context to determine reasons for AAPL movement.', 'sentiment': 'neutral', 'confidence': 0.2, 'citations': [{'title': "Nokia's U.S. N9 Plans; AOL Tanks", 'url': ''}]}}


--- Node: CRITIQUE ---
Critique: Generation is not acceptable. Needs a retry.
--- Node: DECIDE ---
Decision: Answer not good. Retrying (attempt 1).
--- Event: Node 'critique' Finished ---
None


--- Node: RETRIEVE ---
--- Running Retriever for date: 2011-08-09 ---
Retrieved 8 documents.
--- Event: Node 'retrieve' Finished ---
{'retrieved_docs': ["Nokia's U.S. N9 Plans; AOL Tanks", 'Smartphones Drive U.S. Cellular 2Q - Analyst Blog', 'AOL Plans $250 Million Stock Buyback', 'TV on Tablets - A Reality - Analyst Blog', 'AOL to Buy Back $250M in Stock', 'Activision Promotes PROTOTYPE 2 - Analyst Blog', '5 Stocks to Watch: Bank of America, Cisco', 'StanCorp

In [28]:
# The Final Test

inputs = {
    "target_date": "2011-10-05"  # The day Steve Jobs passed away, a major news event for Apple.
}

# Run the app again with this new date
for event in app.stream(inputs):
    for key, value in event.items():
        print(f"--- Event: Node '{key}' Finished ---")
        print(value)
        print("\n")

Failed to send telemetry event ClientStartEvent: capture() takes 1 positional argument but 3 were given


--- Node: RETRIEVE ---
--- Running Retriever for date: 2011-10-05 ---
Retrieved 8 documents.
--- Event: Node 'retrieve' Finished ---
{'retrieved_docs': ["They Just Don't Get Apple!", "'Mad Money Lightning Round': Hold on Apple", '6 Stocks to Watch: Apple, Yahoo!', "Analysts' Actions: AAPL, NFLX, HD, VZ, S", 'Nuance Acquires Swype; iPhone 4S Pre-Orders Begin', 'Philips Electronics NV - ADR (PHG) - Bear of the Day', 'Netflix, AOL: Tech Winners & Losers', 'A Bright Stock in Display Technology'], 'retries': 1}


--- Node: GENERATE ---
--- Running Generator ---
Generator successfully created a valid answer.
--- Event: Node 'generate' Finished ---
{'generation': {'date': '2011-10-05', 'ticker': 'AAPL', 'explanation': "Context does not provide specific reasons for AAPL's movement on this date.", 'sentiment': 'neutral', 'confidence': 0.2, 'citations': [{'title': "They Just Don't Get Apple!", 'url': ''}]}}


--- Node: CRITIQUE ---
Critique: Generation looks good.
--- Node: DECIDE ---
Decision: 