In [1]:
# Force reinstall all langchain packages to the latest matching versions
#%pip install -U --force-reinstall langchain langchain-community langchain-core langchain-google-genai valyu prophet yfinance matplotlib pandas

In [2]:
import os
import operator
import datetime
import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
from typing import Annotated, Literal, TypedDict, List
from prophet import Prophet


# --- LIBRARIES ---
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.agents import create_agent
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from langgraph.prebuilt import create_react_agent 
from pydantic import BaseModel, Field
from valyu import Valyu 
from langchain.messages import SystemMessage, HumanMessage
from langchain.chat_models import init_chat_model



model = init_chat_model("gpt-4.1")


  from .autonotebook import tqdm as notebook_tqdm
Importing plotly failed. Interactive plots will not work.


OpenAIError: The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable

In [None]:
from typing import TypeVar, Callable, Generic, Any
from dataclasses import dataclass

T = TypeVar("T")
U = TypeVar("U")

@dataclass
class IO(Generic[T]):
    """
    A pure description of a side-effectful computation.
    Nothing runs until .unsafe_run() is called.
    """
    effect: Callable[[], T]

    @staticmethod
    def pure(value: T) -> "IO[T]":
        """Lift a pure value into the IO context."""
        return IO(lambda: value)

    @staticmethod
    def fail(error: Exception) -> "IO[Any]":
        """Lift an error into the IO context."""
        def _raise(): raise error
        return IO(_raise)

    def map(self, f: Callable[[T], U]) -> "IO[U]":
        """Apply a pure function to the result of the effect."""
        return IO(lambda: f(self.effect()))

    def flat_map(self, f: Callable[[T], "IO[U]"]) -> "IO[U]":
        """Chain a new effect based on the result of the previous one."""
        return IO(lambda: f(self.effect()).unsafe_run())

    def attempt(self) -> "IO[T | Exception]":
        """Materialize errors into values (Better failure handling)."""
        def _safe_run():
            try:
                return self.effect()
            except Exception as e:
                return e
        return IO(_safe_run)

    def unsafe_run(self) -> T:
        """The 'Edge' - actually executes the side effects."""
        return self.effect()

# Helper for composing multiple IOs
def sequence(ios: list[IO[T]]) -> IO[list[T]]:
    def _run_all():
        return [io.unsafe_run() for io in ios]
    return IO(_run_all)



In [None]:
from typing import Annotated, Literal, TypedDict
import operator


class AgentInput(TypedDict):
    """Simple input state for each subagent."""
    query: str


class AgentOutput(TypedDict):
    """Output from each subagent."""
    source: str
    result: str


class Classification(TypedDict):
    """A single routing decision: which agent to call with what query."""
    source: Literal["quant", "research"]
    query: str


class RouterState(TypedDict):
    query: str
    classifications: list[Classification]
    results: Annotated[list[AgentOutput], operator.add]  
    final_answer: str

class BrownianParams(TypedDict):
    mu: float
    sigma: float
    last_price: float
    annual_vol: float
    annual_drift: float

In [None]:
# --- EFFECT DEFINITIONS (I/O Boundary) ---

def fetch_stock_history_io(ticker: str, years: int = 2) -> IO[pd.DataFrame]:
    """Effect: Network Call to Yahoo Finance."""
    def _fetch():
        end_date = pd.Timestamp.today().normalize()
        start_date = end_date - pd.DateOffset(years=years)
        data = yf.download(ticker, start=start_date, end=end_date, progress=False)
        
        # Cleanup logic (part of the fetch IO boundary)
        if isinstance(data.columns, pd.MultiIndex):
            data = data['Close']
            if isinstance(data, pd.DataFrame) and ticker in data.columns:
                 data = data[ticker]
        elif 'Close' in data.columns:
            data = data['Close']
        if isinstance(data, pd.DataFrame):
             data = data.iloc[:, 0]
        return data
    return IO(_fetch)

def run_monte_carlo_io(params: BrownianParams, days: int = 30, scenarios: int = 1000) -> IO[pd.DataFrame]:
    """Effect: Random Number Generation & Simulation."""
    def _sim():
        mu, sigma, S0 = params['mu'], params['sigma'], params['last_price']
        dt = 1
        returns = np.random.normal(loc=mu * dt, scale=sigma * np.sqrt(dt), size=(days, scenarios))
        price_paths = np.vstack([np.full((1, scenarios), S0), S0 * np.exp(np.cumsum(returns, axis=0))])
        return pd.DataFrame(price_paths)
    return IO(_sim)

def valyu_search_io(query: str) -> IO[dict]:
    """
    Effect: External API Search with Strict Relevance Filters.
    Returns a Dictionary (JSON), not a string, to avoid premature formatting.
    """
    def _search():
        try:
            client = Valyu(api_key=os.environ.get("VALYU_API_KEY"))
            
            # API-LEVEL FILTERING (The Real Fix)
            # max_num_results=3: Only get the top 3 most relevant hits.
            # response_length="short": Asks API for concise summaries (~25k chars total).
            return client.search(
                query=query,
                max_num_results=3, 
                response_length="short"  
            )
            
        except Exception as e:
            # Return an error dict so the pure logic can handle it gracefully
            return {"error": str(e), "results": []}
            
    return IO(_search)

def prophet_predict_io(df: pd.DataFrame, days: int = 30) -> IO[pd.DataFrame]:
    """Effect: Heavy Computation / Model Training."""
    def _train_and_predict():
        m = Prophet(daily_seasonality=True)
        m.fit(df)
        future = m.make_future_dataframe(periods=days)
        forecast = m.predict(future)
        return forecast
    return IO(_train_and_predict)

In [None]:
# --- PURE DOMAIN TYPES & LOGIC ---

class BrownianParams(TypedDict):
    mu: float
    sigma: float
    last_price: float
    annual_vol: float
    annual_drift: float

def calculate_brownian_params_pure(prices: pd.Series) -> BrownianParams:
    """Pure: Extract statistical parameters from data."""
    if len(prices) < 2:
        raise ValueError("Not enough data")

    daily_returns = ((prices / prices.shift(1)) - 1).dropna()
    mu = np.mean(daily_returns)
    sigma = np.std(daily_returns)
    last_price = float(prices.iloc[-1])
    
    return {
        "mu": mu,
        "sigma": sigma,
        "last_price": last_price,
        "annual_vol": sigma * np.sqrt(252),
        "annual_drift": mu * 252
    }

def format_brownian_output_pure(sim_df: pd.DataFrame, ticker: str, params: BrownianParams) -> str:
    """Pure: Format the simulation results into a detailed table."""
    days = sim_df.shape[0]
    future_dates = pd.date_range(start=pd.Timestamp.today(), periods=days, freq='B')
    
    stats_df = pd.DataFrame({
        'Date': future_dates,
        'Mean': sim_df.mean(axis=1),
        'Low (0.1%)': sim_df.quantile(0.001, axis=1),
        'High (99.9%)': sim_df.quantile(0.999, axis=1)
    })
    
    # Weekly snapshots
    display_df = stats_df.iloc[::5].copy()
    
    # Formatting dates to be shorter
    display_df['Date'] = display_df['Date'].dt.strftime('%Y-%m-%d')
    
    # Create ASCII table
    table_str = display_df.to_string(index=False, float_format="%.2f")
    
    return (f"Brownian Motion Analysis for {ticker}:\n"
            f"--- TECHNICAL PARAMETERS ---\n"
            f"Annualized Volatility: {params['annual_vol']:.2%}\n"
            f"Annualized Drift: {params['annual_drift']:.2%}\n"
            f"--- FORECAST TABLE (Weekly Snapshots) ---\n"
            f"```text\n{table_str}\n```")  # <--- WRAPPED IN CODE BLOCK

def prepare_prophet_data_pure(data: pd.DataFrame) -> pd.DataFrame:
    """Pure Logic: Rename columns for Prophet."""
    df = data.reset_index()
    if 'Date' in df.columns:
        df['ds'] = df['Date'].dt.tz_localize(None)
    else:
        df['ds'] = df.index.tz_localize(None)
        
    if 'Close' in df.columns:
        df['y'] = df['Close']
    elif df.shape[1] > 0:
        df['y'] = df.iloc[:, 0]
        
    return df[['ds', 'y']]

def format_prophet_output(forecast: pd.DataFrame, ticker: str) -> str:
    """Pure transformation of Prophet results to text with a table."""
    future_data = forecast.tail(30)[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
    latest_pred = forecast.iloc[-1]['yhat']
    trend = "UP" if latest_pred > forecast.iloc[0]['yhat'] else "DOWN"
    
    future_data.columns = ['Date', 'Target', 'Low', 'High']
    
    # Weekly snapshots
    display_df = future_data.iloc[::5].copy()
    display_df['Date'] = display_df['Date'].dt.strftime('%Y-%m-%d')
    
    table_str = display_df.to_string(index=False, float_format="%.2f")

    return (f"ML Analysis for {ticker}\n"
            f"Trend: {trend}\n"
            f"--- FORECAST TABLE (Next 30 Days) ---\n"
            f"```text\n{table_str}\n```") # <--- WRAPPED IN CODE BLOCK

def format_search_results_pure(response: dict) -> str:
    """Pure Logic: Convert structured API JSON into report."""
    if "error" in response and response["error"]:
        return f"Search Error: {response['error']}"
    
    results = response.get("results", [])
    if not results:
        return "No relevant news found."
    
    formatted = ["### Market Research Summary"]
    for item in results:
        title = item.get("title", "Untitled")
        source = item.get("source_domain", "Unknown Source")
        url = item.get("url", "#")
        content = item.get("content", "")[:500]
        formatted.append(f"- **{title}** ({source})\n  *\"{content}...\"*\n  [Link]({url})")
        
    return "\n\n".join(formatted)

In [None]:


@tool
def brownianModel(TICKER: str):
    """
    Uses an Effect System to model stock prediction.
    """
    # TRUE MONADIC PIPELINE (The "Winning" Architecture)
    # 1. Fetch Data (IO) -> 2. Calculate Params (Pure) -> 3. Simulate (IO) -> 4. Format (Pure)
    
    program = (
        fetch_stock_history_io(TICKER)
        .map(calculate_brownian_params_pure)
        .flat_map(lambda params: 
            # We nest this lambda so we can "capture" the 'params' variable 
            # and use it in the final formatting step.
            run_monte_carlo_io(params).map(
                lambda sim_df: format_brownian_output_pure(sim_df, TICKER, params)
            )
        )
    )

    # The ONLY place execution happens:
    result = program.attempt().unsafe_run()
    
    if isinstance(result, Exception):
        return f"Brownian Model Failed: {str(result)}"
    return result

@tool
def mlModel(ticker: str):
    """ 
    Uses an Effect System to model Facebook Prophet predictions.
    """
    program = (
        fetch_stock_history_io(ticker)
        .map(prepare_prophet_data_pure)
        .flat_map(lambda df: prophet_predict_io(df))
        .map(lambda forecast: format_prophet_output(forecast, ticker))
    )

    result = program.attempt().unsafe_run()
    
    if isinstance(result, Exception):
        return f"ML Model Failed: {str(result)}"
    return result

@tool
def valyu_search_tool(query: str):
    """
    Effectful search wrapper with Relevance Filtering.
    """
    # Pipeline: Fetch Dict (IO) -> Format String (Pure)
    program = (
        valyu_search_io(query)
        .map(format_search_results_pure)
    )
    
    # Execution
    return program.attempt().unsafe_run()
    







In [None]:
trend_prompt = (
    "You are a Quantitative Analyst. Use the provided ML and Statistical tools to analyze the stock ticker provided. "
    "ONLY ENTER THE ABBREVIATION OF THE STOCK TO THE TOOLS. "
    "Your report must be detailed and data-heavy. You MUST include:\n"
    "1. The exact current price of the stock.\n"
    "2. The specific daily price targets for the next 30 days from the models.\n"
    "3. The median prediction and confidence intervals from the Brownian motion model.\n"
    "4. A clear statement of the trend direction (UP/DOWN/FLAT) based on the math.\n"
    "5. If a tool fails, explicitly state why (e.g., 'Not enough data')."
)
trend_agent = create_agent(model, system_prompt=SystemMessage(content=[{"type": "text", "text": trend_prompt}, {"type": "text", "text": "stock markets"}], ), tools=[mlModel, brownianModel])

noise_prompt = (
    "You are a Market Researcher. Use the search tool to find recent news, sentiment, and macro factors affecting the stock. "
    "Do not just summarize; provide a detailed list of findings. You MUST include:\n"
    "1. Specific headlines, dates, and sources of the news you found.\n"
    "2. Direct quotes or key statistics from the search results.\n"
    "3. Any upcoming events (earnings dates, product launches).\n"
    "4. The overall market sentiment supported by specific evidence."
)
noise_agent = create_agent(model, [valyu_search_tool], system_prompt=SystemMessage(content=[{"type": "text", "text": noise_prompt}, {"type": "text", "text": "stock markets"}], ))


In [None]:
#result = trend_agent.invoke({"messages": [HumanMessage("analyze AMZN stock")]})

#ai_message = result["messages"][-1]


In [None]:
from pydantic import BaseModel, Field
from langgraph.types import Send
from langgraph.graph import StateGraph, START, END

class ClassificationResult(BaseModel):  
    """Result of classifying a user query into agent-specific sub-questions."""
    classifications: list[Classification] = Field(
        description="List of agents to invoke with their targeted sub-questions"
    )

def classify_query(state: RouterState) -> dict:
    """Classify query and spawn agents for BOTH quant and research."""
    structured_llm = model.with_structured_output(ClassificationResult)  

    # FIX: The system prompt now explicitly instructs to create TWO tasks
    system_prompt = """You are a Supervisor Agent. 
    When the user asks for a stock prediction, you MUST generate TWO separate instructions:
    
    1. One for the 'quant' agent to run the mathematical models (Brownian & Prophet).
    2. One for the 'research' agent to find news and sentiment.
    
    OUTPUT format:
    Return a list of TWO classifications.
    - Classification 1: source='quant', query='[Ticker Symbol]' (e.g., 'AMZN')
    - Classification 2: source='research', query='[Ticker Symbol] news and sentiment'
    """

    result = structured_llm.invoke([
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": state["query"]}
    ])

    return {"classifications": result.classifications}

def route_to_agents(state: RouterState) -> list[Send]:
    """Fan out to agents based on classifications."""
    return [
        Send(c["source"], {"query": c["query"]})  
        for c in state["classifications"]
    ]


def run_trend_agent(state: RouterState):
    """Invokes the Quant Agent"""
    print("Executing Trend Agent")
    response = trend_agent.invoke({"messages": [{"role": "user", "content": state["query"]}]})
    
    return {"results": [{"source": "quant", "result": response["messages"][-1].content}]}

def run_noise_agent(state: RouterState):
    """Invokes the Research Agent"""
    print("Executing Noise Agent")
    response = noise_agent.invoke({"messages": [{"role": "user", "content": state["query"]}]})
    
    return {"results": [{"source": "research", "result": response["messages"][-1].content}]}

def synthesize_results(state: RouterState) -> dict:
    """Combine results from all agents into a comprehensive report."""
    if not state["results"]:
        return {"final_answer": "No results found from any knowledge source."}

    formatted = [
        f"--- REPORT FROM {r['source'].upper()} DEPARTMENT ---\n{r['result']}\n------------------------------------------------"
        for r in state["results"]
    ]

    # UPDATE: Prompt now specifically demands TABLE PRESERVATION
    synthesis_prompt = f"""You are a Senior Investment Analyst compiling a comprehensive Due Diligence Report.
    The user asked: "{state['query']}"

    Your goal is to provide a "White Box" analysis—explaining NOT just the prediction, but HOW the math worked.

    STRICTLY FOLLOW THIS REPORT STRUCTURE:

    1. **Executive Summary**
       - A high-level verdict (Buy/Sell/Hold/Wait).

    2. **Methodology & Technical Deep-Dive**
       - Explain the logic behind the models.
       - **Brownian Motion:** State the "Annualized Volatility" and "Drift".
    
    3. **Quantitative Analysis (The Numbers)**
       - **CRITICAL:** The tools provided DATA TABLES (text spreadsheets) wrapped in code blocks.
       - You **MUST COPY THESE TABLES EXACTLY** into your report. 
       - **DO NOT** convert the tables into bullet points. 
       - **DO NOT** summarize the table data.
       - Just copy the Markdown code blocks containing the tables.

    4. **Market Context (The News)**
       - Summarize the news headlines and sentiment.
       - CITE SOURCES.

    5. **Risk Factors & Conclusion**
       - Specific risks (e.g., "High volatility of X% increases downside risk").

    Do not shorten the content. USE THE TECHNICAL PARAMETERS (Sigma, Mu, CI) PROVIDED IN THE TEXT."""

    synthesis_response = model.invoke([
        {"role": "system", "content": synthesis_prompt},
        {"role": "user", "content": "\n\n".join(formatted)}
    ])

    return {"final_answer": synthesis_response.content}

workflow = (
    StateGraph(RouterState)
    .add_node("classify", classify_query)
    .add_node("quant", run_trend_agent)
    .add_node("research", run_noise_agent)
    .add_node("synthesize", synthesize_results)
    
    # Start at classify
    .add_edge(START, "classify")
    
    # Fan out to both agents based on the list returned by classify_query
    .add_conditional_edges("classify", route_to_agents, ["quant", "research"])
    
    # Both agents pipe their output to synthesize
    .add_edge("quant", "synthesize")
    .add_edge("research", "synthesize")
    
    # End after synthesis
    .add_edge("synthesize", END)
    .compile()
)


result = workflow.invoke({
    "query": "can you make predictions on Amazon stock?"
})

print("Original query:", result["query"])
print("\nClassifications:")
for c in result["classifications"]:
    print(f"  {c['source']}: {c['query']}")
print("\n" + "=" * 60 + "\n")
print("Final Answer:")
print(result["final_answer"])








Executing Trend AgentExecuting Noise Agent



14:48:32 - cmdstanpy - INFO - Chain [1] start processing
14:48:32 - cmdstanpy - INFO - Chain [1] done processing


Original query: can you make predictions on Amazon stock?

Classifications:
  quant: AMZN
  research: AMZN news and sentiment


Final Answer:
Due Diligence Report: Amazon (AMZN) Stock Prediction Analysis  
Date: 2026-02-07

1. Executive Summary

Based on current quantitative forecasts and technical trend analysis, I recommend a **Hold** stance on Amazon (AMZN). While our models point to a continued upward trajectory in the stock price—with a robust positive drift and credible volatility—widening confidence intervals and some recent data anomalies warrant caution for new deployments of significant capital. For long-term investors, maintaining or moderately accumulating existing positions is justified, but “buy on the dip” strategies may be prudent for new entrants given forecast uncertainty and sector-wide volatility.

2. Methodology & Technical Deep-Dive

**Stochastic Process Used:**  
The primary engine of our predictive framework is a Geometric Brownian Motion (GBM) model. This is a 