In [53]:
from typing import List, Optional, Literal
from enum import Enum
from datetime import date
from pydantic import BaseModel, Field, model_validator
import re

class AnalysisIntent(str, Enum):
    PERFORMANCE = "performance"
    RISK = "risk"
    VOLATILITY = "volatility"
    LIQUIDITY = "liquidity"
    CORRELATION = "correlation"
    VALUATION = "valuation"
    EVENT_IMPACT = "event_impact"
    REGIME_DETECTION = "regime_detection"
    EXPLORATORY = "exploratory"

class MarketType(str, Enum):
    EQUITIES = "equities"
    CRYPTO = "crypto"
    RATES = "rates"
    FX = "fx"
    COMMODITIES = "commodities"
    MIXED = "mixed"


class Metric(str, Enum):
    TOTAL_RETURN = "total_return"
    CAGR = "cagr"
    SHARPE_RATIO = "sharpe_ratio"
    MAX_DRAWDOWN = "max_drawdown"

    REALIZED_VOLATILITY = "realized_volatility"
    ROLLING_VOLATILITY = "rolling_volatility"

    ROLLING_CORRELATION = "rolling_correlation"
    BETA = "beta"

    VOLUME = "volume"
    AMIHUD_ILLIQUIDITY = "amihud_illiquidity"

    REGIME_SWITCHING = "regime_switching"


class Universe(BaseModel):
    assets: List[str] = Field(min_length=1)
    benchmark: Optional[str] = None

    @model_validator(mode="before")
    def normalize_symbols(cls, v):
        def normalize(s: str) -> str:
            m = re.search(r"\(([A-Z\.]+)\)", s)
            return m.group(1) if m else s.upper().strip()

        v["assets"] = [normalize(a) for a in v.get("assets", [])]

        if v.get("benchmark"):
            v["benchmark"] = normalize(v["benchmark"])

        return v
    
class TimeWindow(BaseModel):
    start: Optional[date] = None
    end: Optional[date] = None
    lookback_years: Optional[int] = Field(gt=0)

    @model_validator(mode="after")
    def validate_time_window(self):
        explicit = self.start is not None or self.end is not None
        relative = self.lookback_years is not None

        if explicit and relative:
            raise ValueError("Use either start/end or lookback_years, not both")

        if not explicit and not relative:
            raise ValueError("Time window must be specified")

        if explicit and not (self.start and self.end):
            raise ValueError("Both start and end must be provided")

        return self

class ConfidenceRequirements(BaseModel):
    confidence_level: float = Field(default=0.95, gt=0.5, lt=1.0)
    alpha: float = Field(default=0.05, gt=0.0, lt=0.2)
    require_confidence_intervals: bool = True



class AnalysisPlan(BaseModel):
    topic: str = Field(min_length=10)

    intent: AnalysisIntent
    market: MarketType

    universe: Universe
    metrics: List[Metric] = Field(min_length=1)

    time_window: TimeWindow
    frequency: Literal["daily", "weekly", "monthly"]

    confidence_requirements: Optional[ConfidenceRequirements] = None




In [54]:
# state.py
from typing import Any, Dict, List, Optional
from typing_extensions import TypedDict
import os
from dotenv import load_dotenv

load_dotenv()


class MarketAnalysisState(TypedDict):
    # User input
    user_prompt: str

    # Planning output
    analysis_plan: Optional[AnalysisPlan]

    # Market data
    raw_market_data: Optional[Any]
    cleaned_market_data: Optional[Any]

    # Statistics
    computed_metrics: Optional[Dict[str, Any]]

    # Qualitative context
    external_context: List[str]

    # Validation
    validation_results: Optional[Dict[str, Any]]
    sufficient_data: Optional[bool]

    # Control flow
    iteration_count: int

    # Outputs
    final_report: Optional[Dict[str, Any]]
    pdf_path: Optional[str]


In [55]:
# entrypoint.py
#from state import MarketAnalysisState


def initialize_state(user_prompt: str) -> dict:
    """
    Entry point for the entire graph.
    Initializes shared state with safe defaults.
    """

    return {
        "user_prompt": user_prompt
    }


In [56]:
from typing import List, Literal, Optional, TypedDict
from pydantic import BaseModel
from langchain_openai import ChatOpenAI

# ------------------------
# 1. LLM INITIALIZATION
# ------------------------
llm = ChatOpenAI(
    model="gpt-5-mini",
    temperature=0
)


# ------------------------
# 3. STRUCTURED LLM
# ------------------------
planner_llm = llm.with_structured_output(AnalysisPlan)

# ------------------------
# 5. PLANNER NODE
# ------------------------
def analysis_planner_node(state: MarketAnalysisState) -> dict:
    plan = planner_llm.invoke(
        f"""
        You are a financial analysis planner.
        Produce a valid AnalysisPlan.

        User prompt:
        {state["user_prompt"]}
        """
    )

    return {
        **state,
        "analysis_plan": plan
    }


In [57]:
from typing import Optional, List
from typing_extensions import TypedDict

from pydantic import BaseModel
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END

# =========================
# 5. GRAPH
# =========================
graph = StateGraph(MarketAnalysisState)

def terminal_node(state: MarketAnalysisState) -> MarketAnalysisState:
    return state


graph.add_node("planner", analysis_planner_node)
graph.add_node("terminal", terminal_node)

graph.set_entry_point("planner")
graph.add_edge("planner", "terminal")
graph.add_edge("terminal", END)

app = graph.compile()


# =========================
# 6. TEST RUN
# =========================
initial_state: MarketAnalysisState = {
    "user_prompt": "Analyze how US mega-cap tech stocks reacted to rising interest rates",
    "analysis_plan": None
}

final_state = app.invoke(initial_state)

print("\nFINAL STATE\n")
print(final_state)
print("\nANALYSIS PLAN\n")
print(final_state["analysis_plan"].model_dump())


ValidationError: 1 validation error for AnalysisPlan
time_window
  Value error, Use either start/end or lookback_years, not both [type=value_error, input_value={'start': '2016-01-01', '...e, 'lookback_years': 10}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.12/v/value_error

In [86]:
# tools/maket_data.py
import yfinance as yf
import pandas as pd
from typing import List, Dict


def _flatten_yfinance_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Flatten yfinance MultiIndex columns to single level.
    Keeps only price field names (Open, Close, etc).
    """

    if isinstance(df.columns, pd.MultiIndex):
        # Drop the ticker level
        df.columns = df.columns.get_level_values(0)

    return df


def fetch_market_data(
    symbols: List[str],
    start_date: str,
    end_date: str,
    frequency: str
) -> Dict[str, pd.DataFrame]:
    """
    Fetch OHLCV data for given symbols.
    Returns a dict: {symbol: DataFrame}
    """

    interval_map = {
        "daily": "1d",
        "weekly": "1wk",
        "monthly": "1mo"
    }

    interval = interval_map[frequency]

    data = {}

    for symbol in symbols:
        df = yf.download(
            symbol,
            start=start_date,
            end=end_date,
            interval=interval,
            auto_adjust=False,
            progress=False
        )

        if df.empty:
            raise ValueError(f"No data returned for {symbol}")
        
        df = _flatten_yfinance_columns(df)

        # Ensure standard column order if present
        preferred_order = ["Open", "High", "Low", "Close", "Adj Close", "Volume"]
        df = df[[c for c in preferred_order if c in df.columns]]

        data[symbol] = df

    return data


In [72]:
# utils/time.py.  This will evolve later. For now, it is enough.
from datetime import datetime, timedelta
from typing import Tuple

# from analysis_plan import TimeWindow  # wherever your model lives


def resolve_time_window(time_window: TimeWindow) -> Tuple[str, str]:
    """
    Resolve a structured TimeWindow into ISO start and end dates.
    """

    end = datetime.today()

    # Relative window
    if time_window.lookback_years is not None:
        start = end - timedelta(days=365 * time_window.lookback_years)
        return start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d")

    # Explicit window
    if time_window.start and time_window.end:
        return (
            time_window.start.isoformat(),
            time_window.end.isoformat(),
        )

    # This should never happen due to model validation
    raise ValueError("Invalid TimeWindow state")


# from analysis_plan import TimeWindow


def format_time_window_for_context(time_window: TimeWindow) -> str:
    """
    Convert TimeWindow into human-readable phrasing for search queries.
    """

    if time_window.lookback_years is not None:
        return f"last {time_window.lookback_years} years"

    if time_window.start and time_window.end:
        return f"from {time_window.start.isoformat()} to {time_window.end.isoformat()}"

    return ""




In [66]:
#nodes/market_data_collection.py
# 
# from state import MarketAnalysisState
# from tools.market_data import fetch_market_data
# from utils.time import resolve_time_window


def market_data_collection_node(state: MarketAnalysisState) -> dict:
    """
    Collects raw market time series data based on AnalysisPlan.
    Safe for re-execution and looping.
    """

    plan = state["analysis_plan"]

    if plan is None:
        raise ValueError("analysis_plan is required before data collection")

    # Resolve structured time window
    start_date, end_date = resolve_time_window(plan.time_window)

    # Extract assets only (benchmarks handled separately later)
    symbols = plan.universe.assets

    raw_data = fetch_market_data(
        symbols=symbols,
        start_date=start_date,
        end_date=end_date,
        frequency=plan.frequency
    )

    return {
        "raw_market_data": raw_data
    }


In [87]:
from langgraph.graph import StateGraph, END

# from state import MarketAnalysisState
# from entrypoint import initialize_state
# from planner import analysis_planner_node
# from nodes.market_data_collection import market_data_collection_node


graph = StateGraph(MarketAnalysisState)

graph.add_node("planner", analysis_planner_node)
graph.add_node("data", market_data_collection_node)

graph.set_entry_point("planner")
graph.add_edge("planner", "data")
graph.add_edge("data", END)

app = graph.compile()

state = initialize_state(
    "Analyze volatility of Apple and Microsoft over the last 3 years"
)

final_state = app.invoke(state)

print(final_state["raw_market_data"].keys())


dict_keys(['AAPL', 'MSFT'])


In [88]:
final_state["raw_market_data"]["AAPL"].head()

Price,Open,High,Low,Close,Adj Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2023-02-13,150.949997,154.259995,150.919998,153.850006,151.653015,62199000
2023-02-14,152.119995,153.770004,150.860001,153.199997,151.012299,61707600
2023-02-15,153.110001,155.5,152.880005,155.330002,153.111862,65573800
2023-02-16,153.509995,156.330002,153.350006,153.710007,151.514999,68167900
2023-02-17,152.350006,153.0,150.850006,152.550003,150.371536,59144100


In [89]:
final_state["raw_market_data"]["MSFT"].columns

Index(['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume'], dtype='object', name='Price')

In [73]:
# tools/context_research.py
from ddgs import DDGS
from langchain_openai import ChatOpenAI


llm = ChatOpenAI(
    model="gpt-5-nano",
    temperature=0
)


def generate_symbol_topic(
    global_topic: str,
    symbol: str,
    market: str
) -> str:
    response = llm.invoke(
        f"""
        You are refining a financial research topic.

        Global topic:
        "{global_topic}"

        Asset:
        {symbol}

        Market:
        {market}

        Produce a single, focused research topic specific to this asset.
        Do not introduce new themes.
        One sentence only.
        """
    )

    return response.content.strip()


def research_market_context_for_symbol(
    topic: str,
    symbol: str,
    time_phrase: str
) -> str:
    """
    Search and summarize qualitative context for a single symbol.
    """

    query = " ".join([symbol, topic, time_phrase])

    results = []

    with DDGS() as ddgs:
        for r in ddgs.text(query, max_results=5):
            if r.get("body"):
                results.append(r["body"])

    if not results:
        return "No significant qualitative market context found."

    joined_text = "\n".join(results)

    summary = llm.invoke(
        f"""
        You are a financial analyst.

        Summarize the following information strictly in relation to:
        "{topic}"

        Focus on drivers, risks, and regime-level factors.
        Avoid generic macro commentary.

        Source material:
        {joined_text}
        """
    )

    return summary.content.strip()


In [76]:
# # nodes/context_research.py
# from state import MarketAnalysisState
# from tools.context_research import (
#     generate_symbol_topic,
#     research_market_context_for_symbol
# )
# from utils.context_time import format_time_window_for_context


def context_research_node(state: MarketAnalysisState) -> dict:
    """
    Collects qualitative market context per symbol
    and appends results to external_context.
    """

    plan = state["analysis_plan"]

    if plan is None:
        raise ValueError("analysis_plan is required before context research")

    time_phrase = format_time_window_for_context(plan.time_window)

    new_entries = []

    for symbol in plan.universe.assets:
        symbol_topic = generate_symbol_topic(
            global_topic=plan.topic,
            symbol=symbol,
            market=plan.market.value
        )

        summary = research_market_context_for_symbol(
            topic=symbol_topic,
            symbol=symbol,
            time_phrase=time_phrase
        )

        new_entries.append(
            {
                "symbol": symbol,
                "topic": symbol_topic,
                "summary": summary
            }
        )

    return {
        "external_context": new_entries
    }


In [77]:
from langgraph.graph import StateGraph, END

# from state import MarketAnalysisState
# from entrypoint import initialize_state
# from planner import analysis_planner_node
# from nodes.market_data_collection import market_data_collection_node


graph = StateGraph(MarketAnalysisState)

graph.add_node("planner", analysis_planner_node)
graph.add_node("data", market_data_collection_node)
graph.add_node("context", context_research_node)

graph.set_entry_point("planner")
graph.add_edge("planner", "data")
graph.add_edge("planner", "context")
graph.add_edge("data", END)
graph.add_edge("context", END)

app = graph.compile()

state = initialize_state(
    "Analyze volatility of Apple and Microsoft over the last 3 years"
)

final_state = app.invoke(state)

print(final_state["raw_market_data"].keys())
print(final_state["external_context"] if "external_context" in final_state else "No context collected")

dict_keys(['AAPL', 'MSFT'])
[{'symbol': 'AAPL', 'topic': 'Quantitatively analyze the volatility of Apple Inc. (AAPL) stock over the last three years by applying realized-volatility measures and GARCH-family models to characterize persistence, regime changes, and reaction to major market events.', 'summary': 'Limitations of the provided material\n- The source material does not include actual realized-volatility figures, GARCH-estimated parameters, or any daily price series for Apple (AAPL) over the last three years.\n- It only provides: (i) a 52-week average Apple price of 182.81, and (ii) a Q3 earnings snapshot: revenue up 10% YoY to $94B. There is no explicit volatility data or event-dated volatility signals.\n\nWhat a quantitative analysis would entail (methodology and expected outputs)\n- Realized-volatility (RV)\n  - Compute daily log returns r_t = ln(P_t/P_{t-1}) over roughly the past three years (about 750–780 trading days).\n  - Daily RV: sum of squared returns over a moving win

In [90]:
# nodes/data_cleaning.py
# from typing import Dict
# import pandas as pd

# from state import MarketAnalysisState


def data_cleaning_node(state: MarketAnalysisState) -> dict:
    """
    Clean and preprocess raw market data.

    Produces a canonical dataset per symbol with:
    - price (Adj Close preferred, else Close)
    - price_raw (Close)
    - returns
    - normalized price (base 100)
    - volume (if available)

    Deterministic, metric-agnostic, loop-safe.
    """

    raw_data = state.get("raw_market_data")

    if raw_data is None:
        raise ValueError("raw_market_data is required before cleaning")

    cleaned: Dict[str, pd.DataFrame] = {}

    # ---- Per-symbol cleaning ----
    for symbol, df in raw_data.items():
        if not isinstance(df, pd.DataFrame):
            raise TypeError(f"Expected DataFrame for {symbol}")

        df = df.copy()
        df.index = pd.to_datetime(df.index)

        # Validate required columns
        if "Close" not in df.columns:
            raise ValueError(f"'Close' column missing for {symbol}")

        use_adj = "Adj Close" in df.columns

        # Build canonical frame
        out = pd.DataFrame(index=df.index)

        out["price"] = df["Adj Close"] if use_adj else df["Close"]
        out["price_raw"] = df["Close"]

        if "Volume" in df.columns:
            out["volume"] = df["Volume"]

        # Drop rows with missing prices
        out = out.dropna(subset=["price"])

        # Normalized price (base 100)
        out["price_normalized"] = out["price"] / out["price"].iloc[0] * 100

        # Returns
        out["returns"] = out["price"].pct_change()

        # Drop first row introduced by pct_change
        out = out.dropna()

        cleaned[symbol] = out

    # ---- Align time index across all symbols ----
    common_index = None
    for df in cleaned.values():
        common_index = df.index if common_index is None else common_index.intersection(df.index)

    for symbol in cleaned:
        cleaned[symbol] = cleaned[symbol].loc[common_index]

    return {
        "cleaned_market_data": cleaned
    }


In [None]:
# tools/market_metrics.py

import numpy as np
import pandas as pd
from typing import Dict, Any
# from analysis_plan import Metric

TRADING_DAYS = 252


def annualize_return(r: float, periods: int) -> float:
    return (1 + r) ** (TRADING_DAYS / periods) - 1



def compute_total_return(df: pd.DataFrame) -> Dict[str, Any]:
    r = df["price"].iloc[-1] / df["price"].iloc[0] - 1
    return {
        "value": float(r),
        "sample_size": len(df)
    }


def compute_cagr(df: pd.DataFrame) -> Dict[str, Any]:
    n = len(df)
    r = df["price"].iloc[-1] / df["price"].iloc[0] - 1
    cagr = annualize_return(r, n)
    return {
        "value": float(cagr),
        "sample_size": n
    }

def compute_sharpe(df: pd.DataFrame) -> Dict[str, Any]:
    returns = df["returns"]
    sharpe = np.sqrt(TRADING_DAYS) * returns.mean() / returns.std()
    return {
        "value": float(sharpe),
        "sample_size": len(returns)
    }

def compute_max_drawdown(df: pd.DataFrame) -> Dict[str, Any]:
    cum = df["price"] / df["price"].iloc[0]
    peak = cum.cummax()
    drawdown = (cum - peak) / peak
    return {
        "value": float(drawdown.min()),
        "sample_size": len(df)
    }

def compute_realized_volatility(df: pd.DataFrame) -> Dict[str, Any]:
    vol = np.sqrt(TRADING_DAYS) * df["returns"].std()
    return {
        "value": float(vol),
        "sample_size": len(df)
    }


def compute_rolling_volatility(df: pd.DataFrame, window: int = 30) -> Dict[str, Any]:
    rv = np.sqrt(TRADING_DAYS) * df["returns"].rolling(window).std()
    return {
        "value": rv.dropna().to_dict(),
        "window": window,
        "sample_size": len(rv.dropna())
    }

def compute_rolling_correlation(
    data: Dict[str, pd.DataFrame],
    window: int = 30
) -> Dict[str, Any]:
    symbols = list(data.keys())
    if len(symbols) < 2:
        raise ValueError("Rolling correlation requires at least two assets")

    s1, s2 = symbols[:2]
    r1 = data[s1]["returns"]
    r2 = data[s2]["returns"]

    corr = r1.rolling(window).corr(r2)

    return {
        "pair": [s1, s2],
        "value": corr.dropna().to_dict(),
        "window": window,
        "sample_size": len(corr.dropna())
    }


def compute_beta(
    asset_df: pd.DataFrame,
    benchmark_df: pd.DataFrame
) -> Dict[str, Any]:
    r_a = asset_df["returns"]
    r_b = benchmark_df["returns"]

    cov = np.cov(r_a, r_b)[0, 1]
    var = np.var(r_b)

    beta = cov / var

    return {
        "value": float(beta),
        "sample_size": len(r_a)
    }

def compute_volume(df: pd.DataFrame) -> Dict[str, Any]:
    if "volume" not in df.columns:
        return {"value": None, "note": "volume not available"}

    return {
        "value": float(df["volume"].mean()),
        "sample_size": len(df)
    }


def compute_amihud(df: pd.DataFrame) -> Dict[str, Any]:
    if "volume" not in df.columns:
        return {"value": None, "note": "volume not available"}

    illiq = (df["returns"].abs() / df["volume"]).replace([np.inf, -np.inf], np.nan)
    return {
        "value": float(illiq.mean()),
        "sample_size": len(illiq.dropna())
    }

def compute_regime_switching(df: pd.DataFrame) -> Dict[str, Any]:
    # Placeholder: simple volatility thresholding
    vol = df["returns"].rolling(30).std()
    regimes = (vol > vol.median()).astype(int)

    return {
        "regimes": regimes.dropna().to_dict(),
        "sample_size": len(regimes.dropna())
    }

METRIC_FUNCTIONS = {
    Metric.TOTAL_RETURN: compute_total_return,
    Metric.CAGR: compute_cagr,
    Metric.SHARPE_RATIO: compute_sharpe,
    Metric.MAX_DRAWDOWN: compute_max_drawdown,
    Metric.REALIZED_VOLATILITY: compute_realized_volatility,
    Metric.ROLLING_VOLATILITY: compute_rolling_volatility,
    Metric.VOLUME: compute_volume,
    Metric.AMIHUD_ILLIQUIDITY: compute_amihud,
    Metric.REGIME_SWITCHING: compute_regime_switching,
}



In [96]:
# tools/compute_market_metrics.py
from typing import Dict, Any, List, Optional
import pandas as pd
# Metric and metric functions are defined elsewhere in the notebook (Metric, METRIC_FUNCTIONS, compute_rolling_correlation, compute_beta)

def compute_market_metrics(
    cleaned_data: Dict[str, pd.DataFrame],
    metrics: List[Metric],
    benchmark: Optional[str] = None
) -> Dict[str, Any]:

    results: Dict[str, Any] = {}

    # Per-asset metrics
    for symbol, df in cleaned_data.items():
        symbol_results = {}

        for metric in metrics:
            if metric in METRIC_FUNCTIONS:
                symbol_results[metric.value] = METRIC_FUNCTIONS[metric](df)

        results[symbol] = symbol_results

    # Cross-asset metrics
    if Metric.ROLLING_CORRELATION in metrics:
        results["rolling_correlation"] = compute_rolling_correlation(cleaned_data)

    if Metric.BETA in metrics:
        if benchmark is None:
            raise ValueError("Benchmark required for beta")

        for symbol in cleaned_data:
            if symbol == benchmark:
                continue

            results[symbol]["beta"] = compute_beta(
                cleaned_data[symbol],
                cleaned_data[benchmark]
            )

    return results



In [97]:
# nodes/statistical_analysis.py

# from state import MarketAnalysisState
# from tools.compute_market_metrics import compute_market_metrics


def statistical_analysis_node(state: MarketAnalysisState) -> dict:
    """
    Computes all requested metrics from cleaned market data.
    """

    plan = state["analysis_plan"]
    cleaned_data = state["cleaned_market_data"]

    if plan is None or cleaned_data is None:
        raise ValueError("analysis_plan and cleaned_market_data are required")

    results = compute_market_metrics(
        cleaned_data=cleaned_data,
        metrics=plan.metrics,
        benchmark=plan.universe.benchmark
    )

    return {
        "computed_metrics": results
    }


In [98]:
from langgraph.graph import StateGraph, END

# from state import MarketAnalysisState
# from entrypoint import initialize_state
# from planner import analysis_planner_node
# from nodes.market_data_collection import market_data_collection_node


graph = StateGraph(MarketAnalysisState)

graph.add_node("planner", analysis_planner_node)
graph.add_node("data", market_data_collection_node)
graph.add_node("cleaning", data_cleaning_node)
graph.add_node("analysis", statistical_analysis_node)

graph.set_entry_point("planner")
graph.add_edge("planner", "data")
graph.add_edge("data", "cleaning")
graph.add_edge("cleaning", "analysis")
graph.add_edge("analysis", END)

app = graph.compile()

state = initialize_state(
    "Analyze volatility of Apple and Microsoft over the last 3 years"
)

final_state = app.invoke(state)

print(final_state["computed_metrics"].keys())

dict_keys(['AAPL', 'MSFT', 'rolling_correlation'])


In [107]:
# report_schema.py
from pydantic import BaseModel, Field
from typing import List, Dict, Any


class ResearchReport(BaseModel):
    topic: str = Field(description="Research topic")

    executive_summary: str = Field(
        min_length=100,
        description="High-level summary of findings and implications"
    )

    key_findings: List[str] = Field(
        min_items=1,
        description="Bullet-point list of key quantitative and qualitative findings"
    )

    conclusion: str = Field(
        min_length=50,
        description="Concise concluding assessment"
    )




/var/folders/x4/6xjb_tfs6vdg6llm0_l4vjz00000gn/T/ipykernel_28042/1035011798.py:14: PydanticDeprecatedSince20: `min_items` is deprecated and will be removed, use `min_length` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.12/migration/
  key_findings: List[str] = Field(


In [108]:
# nodes/report_generation.py
# from state import MarketAnalysisState
# from report_schema import ResearchReport

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model="gpt-5-nano",
    temperature=0
)

report_llm = llm.with_structured_output(ResearchReport)



def report_generation_node(state: MarketAnalysisState) -> dict:
    """
    Generate a comprehensive research report from computed metrics
    and external qualitative context.
    """

    plan = state.get("analysis_plan")
    computed_metrics = state.get("computed_metrics")
    external_context = state.get("external_context", [])

    if plan is None or computed_metrics is None:
        raise ValueError("analysis_plan and computed_metrics are required")

    prompt = f"""
You are a financial research analyst.

Create a comprehensive research report based on the following information.

Topic:
{plan.topic}

Computed quantitative metrics:
{computed_metrics}

Qualitative external context:
{external_context}

Instructions:
- Base conclusions strictly on the provided data.
- Do not invent metrics or facts.
- Integrate quantitative results with qualitative context where relevant.
- Keep the report concise, analytical, and professional.

Generate a well-structured report with:
1. Executive Summary
2. Key Findings
3. Conclusion
"""

    report: ResearchReport = report_llm.invoke(prompt)

    return {
        "final_report": report
    }


In [None]:
from langgraph.graph import StateGraph, END

# from state import MarketAnalysisState
# from entrypoint import initialize_state
# from planner import analysis_planner_node
# from nodes.market_data_collection import market_data_collection_node

graph = StateGraph(MarketAnalysisState)

graph.add_node("planner", analysis_planner_node)
graph.add_node("data", market_data_collection_node)
graph.add_node("context", context_research_node)
graph.add_node("cleaning", data_cleaning_node)
graph.add_node("analysis", statistical_analysis_node)
graph.add_node("report", report_generation_node)

# Join node as lambda
graph.add_node("join", lambda state: {})

graph.set_entry_point("planner")

graph.add_edge("planner", "data")
graph.add_edge("planner", "context")

graph.add_edge("data", "join")
graph.add_edge("context", "join")
graph.add_edge("join", "cleaning")

graph.add_edge("cleaning", "analysis")
graph.add_edge("analysis", "report")
graph.add_edge("report", END)

app = graph.compile()

state = initialize_state(
    "Analyze volatility of Apple and Microsoft over the last 3 years"
)

final_state = app.invoke(state)

# return final_state["final_report"].model_dump_json(indent=2)

AttributeError: 'ResearchReport' object has no attribute 'keys'

In [112]:
print(final_state["final_report"].model_dump_json(indent=2))

{
  "topic": "Volatility analysis for Apple (AAPL) and Microsoft (MSFT) over the last 3 years",
  "executive_summary": "Over the three-year window, Apple (AAPL) exhibits higher realized volatility than Microsoft (MSFT). AAPL’s realized volatility is 0.2550 (25.50%), based on 751 samples, versus MSFT’s 0.2364 (23.64%) over the same sample span. For 30-day rolling volatility (window = 30), AAPL generally travels in a lower-to-moderate band (roughly 0.10–0.32 observed range, with peaks near 0.322) and has a trough around 0.102–0.109 in early 2026. MSFT’s 30-day rolling volatility shows a pronounced high-volatility episode mid-2023, with values approaching 0.70–0.72, followed by a reversion to lower-to-moderate levels. The rolling correlation between AAPL and MSFT remains positive and historically high, centering in the 0.60s to high-0.60s range during the early portion of the window and fluctuating over time. Qualitative context from the external material identifies a regime-based view of