# LangGraph Project: Crypto/Financial Signal Generator (Notebook)

This notebook implements a cyclical LangGraph agent (think-act loop) for live crypto data using free APIs (CoinGecko).

Core flow: router → tool_caller → analysis_loop_check → executor (loop) or final_report (end).

Features:
- Live price data via CoinGecko (no API key).
- Technical indicators using `ta` (RSI, SMA, MACD).
- Stateful loop with `past_steps` memory.
- Deterministic planner and heuristic final report (runs without LLM).

Optional: If you wire an LLM later (e.g., OpenAI through LangChain), you can swap the deterministic planner/report to model-driven.

In [None]:
# Imports
from __future__ import annotations
import json
from typing import Any, Dict, List, Literal, Optional, Tuple, TypedDict

import pandas as pd
import numpy as np
import requests
from datetime import datetime, timezone

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

# Technical analysis library
import ta

pd.options.display.width = 140
pd.options.display.max_columns = 50


## 1. State Definition (Shared Memory)
The state captures goal, ticker, raw data, current action, and a log of past steps.

In [None]:
# Define the State schema as a TypedDict for LangGraph
class AnalyzerState(TypedDict, total=False):
    goal: str
    ticker: str  # e.g., 'BTC-USD'
    data_points: List[Dict[str, Any]]  # raw OHLCV points or indicator outputs
    current_action: str  # e.g., 'FETCH_DATA', 'CALCULATE_RSI', 'FINISH'
    past_steps: List[Dict[str, Any]]  # log of actions and results
    final_report: str

def empty_state(goal: str, ticker: str) -> AnalyzerState:
    return {
        'goal': goal,
        'ticker': ticker,
        'data_points': [],
        'current_action': '',
        'past_steps': [],
    }


## 2. Tools (External Capabilities)
- fetch_data: Pull OHLC data from CoinGecko.
- calculate_indicator: Compute RSI/SMA/MACD.
- analyze_and_generate_report: Summarize into a signal.

These are standard Python functions. In a more advanced setup, you can wrap them as LangChain tools.

In [None]:
# --- Helper: map generic ticker to CoinGecko coin id ---
COINGECKO_IDS = {
    'BTC': 'bitcoin',
    'ETH': 'ethereum',
    'SOL': 'solana',
    'ADA': 'cardano',
}

def parse_symbol(symbol: str) -> Tuple[str, str]:
    """
    Parse tickers like 'BTC-USD' or 'BTC' into (base, quote). If only base, assume USD.
    """
    s = symbol.upper().replace('/', '-')
    parts = s.split('-')
    if len(parts) == 1:
        return parts[0], 'USD'
    return parts[0], parts[1]

def coingecko_id_for(symbol: str) -> Optional[str]:
    base, _ = parse_symbol(symbol)
    return COINGECKO_IDS.get(base)

def fetch_data(symbol: str, days: int = 1, interval: str = 'hourly') -> pd.DataFrame:
    """
    Fetch OHLC data from CoinGecko Market Chart endpoint.
    - days: 1, 7, 30, 90, 180, 365, 'max'
    - interval: 'minutely' (only for 1 day), 'hourly', 'daily'
    Returns a DataFrame with columns: ['timestamp','open','high','low','close','volume']
    """
    coin_id = coingecko_id_for(symbol)
    if not coin_id:
        raise ValueError(f"Unsupported or unmapped symbol for CoinGecko: {symbol}")

    url = f"https://api.coingecko.com/api/v3/coins/{coin_id}/market_chart"
    params = {
        'vs_currency': 'usd',
        'days': str(days),
        'interval': interval,
    }
    r = requests.get(url, params=params, timeout=30)
    r.raise_for_status()
    data = r.json()

    # CoinGecko returns prices as [timestamp_ms, price] at a fixed resolution.
    # For OHLC approximation, we can aggregate into windows if needed.
    # Simpler approach: construct close series and fake OHLC by rolling window.
    prices = data.get('prices', [])
    volumes = data.get('total_volumes', [])
    if not prices:
        raise ValueError('No price data returned by CoinGecko')

    df_p = pd.DataFrame(prices, columns=['ts_ms', 'price'])
    df_p['timestamp'] = pd.to_datetime(df_p['ts_ms'], unit='ms', utc=True).dt.tz_convert(None)

    df_v = pd.DataFrame(volumes, columns=['ts_ms', 'volume']) if volumes else pd.DataFrame(columns=['ts_ms','volume'])
    if not df_v.empty:
        df_v['timestamp'] = pd.to_datetime(df_v['ts_ms'], unit='ms', utc=True).dt.tz_convert(None)
        df = pd.merge_asof(df_p.sort_values('timestamp'), df_v.sort_values('timestamp'), on='timestamp')
    else:
        df = df_p.copy()
        df['volume'] = np.nan

    # Create pseudo OHLC by resampling if interval is hourly/daily
    if interval in ('hourly', 'daily'):
        rule = 'H' if interval == 'hourly' else 'D'
        ohlc = df.set_index('timestamp')['price'].resample(rule).ohlc()
        vol = df.set_index('timestamp')['volume'].resample(rule).sum(min_count=1)
        out = ohlc.merge(vol, left_index=True, right_index=True, how='left')
        out = out.reset_index().rename(columns={'index':'timestamp'})
    else:
        # minutely: treat each row as a bar with same OHLC = price
        out = df[['timestamp','price','volume']].copy()
        out['open'] = out['price']
        out['high'] = out['price']
        out['low'] = out['price']
        out['close'] = out['price']
        out = out[['timestamp','open','high','low','close','volume']]

    out = out.dropna(subset=['close']).reset_index(drop=True)
    return out[['timestamp','open','high','low','close','volume']]

def calculate_indicator(df: pd.DataFrame, indicator_name: str, **kwargs) -> pd.DataFrame:
    """
    Compute common indicators and return df with new columns.
    Supported: RSI, SMA, MACD
    """
    out = df.copy()
    ind = indicator_name.upper()
    if ind == 'RSI':
        window = int(kwargs.get('window', 14))
        out['rsi'] = ta.momentum.RSIIndicator(close=out['close'], window=window).rsi()
    elif ind == 'SMA':
        window = int(kwargs.get('window', 20))
        out[f'sma_{window}'] = out['close'].rolling(window).mean()
    elif ind == 'MACD':
        macd = ta.trend.MACD(close=out['close'], window_slow=26, window_fast=12, window_sign=9)
        out['macd'] = macd.macd()
        out['macd_signal'] = macd.macd_signal()
        out['macd_hist'] = macd.macd_diff()
    else:
        raise ValueError(f"Unsupported indicator: {indicator_name}")
    return out

def analyze_and_generate_report(past_steps: List[Dict[str, Any]], goal: str, ticker: str) -> str:
    """
    Heuristic summary that converts last known indicators into a BUY/SELL/HOLD view.
    This is deterministic and runs without an LLM.
    """
    # Extract latest indicator frame if present
    df_ind = None
    for step in reversed(past_steps):
        if step.get('action') == 'CALCULATE_INDICATOR' and isinstance(step.get('result'), dict):
            payload = step['result']
            if isinstance(payload.get('dataframe_json'), str):
                df_ind = pd.read_json(payload['dataframe_json'])
                break

    decision = 'HOLD'
    reasons = []
    if df_ind is not None and not df_ind.empty:
        last = df_ind.iloc[-1]
        # RSI heuristics
        if 'rsi' in df_ind.columns and pd.notna(last.get('rsi')):
            rsi = float(last['rsi'])
            if rsi < 30:
                decision = 'BUY'
                reasons.append(f'RSI={rsi:.1f} (oversold)')
            elif rsi > 70:
                decision = 'SELL'
                reasons.append(f'RSI={rsi:.1f} (overbought)')
            else:
                reasons.append(f'RSI={rsi:.1f} (neutral)')
        # MACD heuristics
        if all(c in df_ind.columns for c in ['macd','macd_signal']) and pd.notna(last.get('macd')) and pd.notna(last.get('macd_signal')):
            if last['macd'] > last['macd_signal']:
                reasons.append('MACD>Signal (bullish)')
                if decision == 'HOLD':
                    decision = 'BUY'
            elif last['macd'] < last['macd_signal']:
                reasons.append('MACD<Signal (bearish)')
                if decision == 'HOLD':
                    decision = 'SELL'
        # SMA slope
        sma_cols = [c for c in df_ind.columns if c.startswith('sma_')]
        if sma_cols:
            s = df_ind[sma_cols[0]].dropna()
            if len(s) >= 2:
                if s.iloc[-1] > s.iloc[-2]:
                    reasons.append('SMA rising')
                elif s.iloc[-1] < s.iloc[-2]:
                    reasons.append('SMA falling')

    ts = datetime.now(timezone.utc).astimezone().strftime('%Y-%m-%d %H:%M:%S %Z')
    reason_str = '; '.join(reasons) if reasons else 'Insufficient indicators; default HOLD'
    return (
        f'Goal: {goal}
'
        f'Ticker: {ticker}
'
        f'Timestamp: {ts}
'
        f'Decision: {decision}
'
        f'Reasons: {reason_str}
'
        f'Past steps: {len(past_steps)}
'
    )


## 3. Nodes (Router, Executor, Tool Caller, Loop Check, Final Report)
Nodes operate over the shared AnalyzerState and implement the loop logic.

In [None]:
# Utility to append a step to memory
def log_step(state: AnalyzerState, action: str, result: Any) -> AnalyzerState:
    entry = {
        'action': action,
        'result': result,
    }
    past = list(state.get('past_steps') or [])
    past.append(entry)
    state['past_steps'] = past
    return state

# Router: initialize ticker/current_action based on goal
def router(state: AnalyzerState) -> AnalyzerState:
    goal = state.get('goal', '').strip()
    ticker = state.get('ticker', '').strip()
    if not ticker:
        # naive parse: look for tokens like BTC, ETH; default BTC-USD
        fallback = 'BTC-USD'
        for sym in COINGECKO_IDS.keys():
            if sym in goal.upper():
                ticker = f'{sym}-USD'
                break
        if not ticker:
            ticker = fallback
        state['ticker'] = ticker
    # Start by fetching data
    state['current_action'] = 'FETCH_DATA'
    return state

# Executor: decide next action (deterministic planner)
def executor(state: AnalyzerState) -> AnalyzerState:
    past = state.get('past_steps') or []
    # Have we fetched data?
    has_data = any(s.get('action') == 'FETCH_DATA' and s.get('result', {}).get('status') == 'ok' for s in past)
    # Have we computed any indicators?
    has_indicator = any(s.get('action') == 'CALCULATE_INDICATOR' for s in past)

    if not has_data:
        state['current_action'] = 'FETCH_DATA'
    elif not has_indicator:
        # In a more advanced flow, we could stack actions or choose which indicator next
        state['current_action'] = 'CALCULATE_RSI'
    else:
        state['current_action'] = 'FINISH'
    return state

# Tool caller: perform the action and log results
def tool_caller(state: AnalyzerState) -> AnalyzerState:
    action = (state.get('current_action') or '').upper()
    ticker = state['ticker']

    if action == 'FETCH_DATA':
        try:
            df = fetch_data(ticker, days=1, interval='hourly')
            state['data_points'] = df.tail(5).to_dict(orient='records')
            result = {
                'status': 'ok',
                'rows': len(df),
                'preview': df.tail(3).to_dict(orient='records'),
            }
        except Exception as e:
            result = {'status': 'error', 'message': str(e)}
        state = log_step(state, 'FETCH_DATA', result)
        return state

    if action == 'CALCULATE_RSI':
        # get the freshest fetched data from past steps
        try:
            # Rebuild full df by refetching for simplicity (could also cache full df in state)
            df = fetch_data(ticker, days=1, interval='hourly')
            df_ind = calculate_indicator(df, 'RSI', window=14)
            # Also add a simple SMA to enrich the decision
            df_ind = calculate_indicator(df_ind, 'SMA', window=20)
            # And MACD
            df_ind = calculate_indicator(df_ind, 'MACD')
            payload = {
                'status': 'ok',
                'last_row': df_ind.tail(1).to_dict(orient='records')[0],
                'dataframe_json': df_ind.to_json(date_format='iso'),
            }
        except Exception as e:
            payload = {'status': 'error', 'message': str(e)}
        state = log_step(state, 'CALCULATE_INDICATOR', payload)
        return state

    if action == 'FINISH':
        # No tool call, proceed to final report
        state = log_step(state, 'NOOP', {'status': 'ok', 'message': 'Finishing'})
        return state

    # Unknown action fallback
    state = log_step(state, 'UNKNOWN_ACTION', {'status': 'error', 'message': action})
    return state

# Analysis loop check: decide next node
def analysis_loop_check(state: AnalyzerState) -> Literal['executor', 'final_report']:
    if (state.get('current_action') or '').upper() == 'FINISH':
        return 'final_report'
    return 'executor'

# Final report node
def final_report(state: AnalyzerState) -> AnalyzerState:
    report = analyze_and_generate_report(state.get('past_steps') or [], state.get('goal',''), state.get('ticker',''))
    state['final_report'] = report
    return state


## 4. Build the Graph (Cyclical)
We wire nodes and conditional edges to achieve the iterative think-act loop.

In [None]:
# Construct the cyclic graph
builder = StateGraph(AnalyzerState)

builder.add_node('router', router)
builder.add_node('executor', executor)
builder.add_node('tool_caller', tool_caller)
builder.add_node('final_report', final_report)

# Start at router
builder.set_entry_point('router')

# router -> tool_caller
builder.add_edge('router', 'tool_caller')

# tool_caller -> analysis_loop_check (conditional)
builder.add_conditional_edges('tool_caller', analysis_loop_check, {
    'executor': 'executor',
    'final_report': 'final_report',
})

# executor -> tool_caller (loop)
builder.add_edge('executor', 'tool_caller')

# final_report -> END
builder.add_edge('final_report', END)

# Optional in-memory checkpointing
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)


## 5. Run an Example
Goal: Analyze BTC price for the last 24h and produce a signal.

In [None]:
# Prepare initial state
initial = empty_state(
    goal="Analyze BTC price for the last 24h and generate a buy/sell signal",
    ticker="BTC-USD",
)

# Thread id allows checkpointing/memory per run
config = {
    'configurable': {'thread_id': 'signal_run_btc'}
}

final_state = graph.invoke(initial, config=config)
print(final_state.get('final_report','<no report>'))


## 6. Inspect the Past Steps (Memory)

In [None]:
import pprint
pp = pprint.PrettyPrinter(depth=2)
pp.pprint(final_state.get('past_steps'))


## 7. Notes
- For other crypto tickers, extend COINGECKO_IDS mapping or dynamically fetch coin list.
- To add stock data (e.g., TSLA), integrate `yfinance` or a REST source, then branch in `fetch_data`.
- To use an LLM for planning/reporting, replace `executor` and `analyze_and_generate_report` with LLM calls (LangChain + provider).
- This is for educational purposes and not financial advice.