<a href="https://colab.research.google.com/github/Aradhyakapil/Financial-analysis-agent/blob/main/Stock_analysis_Agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%pip install --upgrade pip numpy==1.26.4 pandas-ta vaderSentiment langchain langchain-community langchain-openai openai requests yfinance

# ── 1. Imports & API Keys ──────────────────────────────────────────────────────
import os
import time
from datetime import datetime, timedelta
import pandas as pd
import pandas_ta as ta
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import requests
import yfinance as yf

from langchain_community.utilities.polygon import PolygonAPIWrapper
from langchain_openai import ChatOpenAI
from langchain.tools import Tool
from langchain.agents import initialize_agent, AgentType

# Load keys
os.environ["POLYGON_API_KEY"] = os.getenv("POLYGON_API_KEY")
os.environ["OPENAI_API_KEY"]  = os.getenv("OPENAI_API_KEY")

# ── 2. Rate‑Limiter Setup ──────────────────────────────────────────────────────
RATE_LIMIT = 5
RATE_PERIOD = 60
_call_count = 0
_window_start = time.time()

def _rate_limit():
    global _call_count, _window_start
    now = time.time()
    if now - _window_start >= RATE_PERIOD:
        _window_start = now
        _call_count = 0
    if _call_count >= RATE_LIMIT:
        time.sleep(RATE_PERIOD - (now - _window_start))
        _window_start = time.time()
        _call_count = 0
    _call_count += 1

# ── 3. Helper: Polygon time‑series data ────────────────────────────────────────
def get_polygon_data(ticker, timespan="day", multiplier=1, limit=100):
    _rate_limit()
    end = datetime.now().date()
    start = end - timedelta(days=limit*2)
    url = f"https://api.polygon.io/v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{start}/{end}"
    params = {"adjusted":"true","sort":"asc","limit":limit,"apikey":os.environ["POLYGON_API_KEY"]}
    resp = requests.get(url, params=params)
    data = resp.json()
    if data.get("status") not in ("OK","DELAYED"):
        raise ValueError(f"Polygon error: {data}")
    return data["results"]

# ── 4. Technical Indicator Tools ──────────────────────────────────────────────
sent_analyzer = SentimentIntensityAnalyzer()

def _df_for(ticker, window, timespan="day"):
    df = pd.DataFrame(get_polygon_data(ticker, timespan=timespan, limit=window*2))
    return df

def current_price_tool(ticker: str):
    try:
        data = get_polygon_data(ticker, timespan="minute", limit=1) or get_polygon_data(ticker, timespan="day", limit=1)
        return float(data[-1]["c"])
    except Exception as e:
        return f"Error price: {e}"

def sma_tool(ticker: str, window: int = 20):
    try:
        df = _df_for(ticker, window)
        df['sma'] = ta.sma(df['c'], length=window)
        return float(df['sma'].iloc[-1])
    except Exception as e:
        return f"Error SMA: {e}"

def ema_tool(ticker: str, window: int = 20):
    try:
        df = _df_for(ticker, window)
        df['ema'] = ta.ema(df['c'], length=window)
        return float(df['ema'].iloc[-1])
    except Exception as e:
        return f"Error EMA: {e}"

def macd_tool(ticker: str):
    try:
        df = _df_for(ticker, 100)
        macd = ta.macd(df['c'])
        return {
            'macd':   float(macd['MACD_12_26_9'].iloc[-1]),
            'signal': float(macd['MACDs_12_26_9'].iloc[-1])
        }
    except Exception as e:
        return f"Error MACD: {e}"

def bbands_tool(ticker: str, window: int = 20):
    try:
        df = _df_for(ticker, window)
        bb = ta.bbands(df['c'], length=window)
        return {
            'upper':  float(bb[f'BBU_{window}_2.0'].iloc[-1]),
            'middle': float(bb[f'BBM_{window}_2.0'].iloc[-1]),
            'lower':  float(bb[f'BBL_{window}_2.0'].iloc[-1])
        }
    except Exception as e:
        return f"Error BBANDS: {e}"

def atr_tool(ticker: str, window: int = 14):
    try:
        df = _df_for(ticker, window)
        df['atr'] = ta.atr(high=df['h'], low=df['l'], close=df['c'], length=window)
        return float(df['atr'].iloc[-1])
    except Exception as e:
        return f"Error ATR: {e}"

# ── 5. SEC EDGAR Fundamentals via Company Facts API ───────────────────────────
# load ticker→CIK mapping once
_TICKER_CIK = None
def _load_ticker_cik_map():
    global _TICKER_CIK
    if _TICKER_CIK is None:
        try:
            url = "https://www.sec.gov/files/company_tickers.json"
            headers = {
                "User-Agent": "Stock Analysis Tool aradhya.kapil2004@gmail.com",
                "Accept": "application/json",
                "Host": "www.sec.gov"
            }
            resp = requests.get(url, headers=headers)
            resp.raise_for_status()
            _TICKER_CIK = {item["ticker"]: str(item["cik_str"]).zfill(10)
                           for item in resp.json().values()}
        except Exception as e:
            print(f"Error loading ticker-CIK mapping: {e}")
            _TICKER_CIK = {}
    return _TICKER_CIK

def sec_fundamentals_tool(ticker: str):
    try:
        # Add rate limiting for SEC requests
        time.sleep(0.1)  # SEC recommends 10 requests per second max

        cik = _load_ticker_cik_map().get(ticker.upper())
        if not cik:
            return f"CIK not found for {ticker}"

        url = f"https://data.sec.gov/api/xbrl/companyfacts/CIK{cik}.json"

        # Improved headers for SEC EDGAR API
        headers = {
            "User-Agent": "Stock Analysis Tool aradhya.kapil2004@gmail.com",
            "Accept": "application/json, text/plain, */*",
            "Accept-Encoding": "gzip, deflate",
            "Host": "data.sec.gov"
        }

        resp = requests.get(url, headers=headers, timeout=30)

        # Check if response is successful
        if resp.status_code != 200:
            return f"SEC API returned status {resp.status_code}: {resp.reason}"

        # Check if response has content
        if not resp.content:
            return "SEC API returned empty response"

        # Check if response is JSON
        content_type = resp.headers.get('content-type', '')
        if 'application/json' not in content_type:
            return f"SEC API returned non-JSON content: {content_type}"

        # Try to parse JSON
        try:
            data = resp.json()
        except ValueError as json_error:
            return f"Failed to parse SEC response as JSON: {json_error}"

        # Check if required structure exists
        if "facts" not in data or "us-gaap" not in data["facts"]:
            return "SEC data missing expected structure (facts/us-gaap)"

        facts = data["facts"]["us-gaap"]

        def _latest(tag):
            try:
                vals = facts.get(tag, {}).get("units", {}).get("USD", [])
                if vals:
                    # Sort by end date to get the most recent
                    sorted_vals = sorted(vals, key=lambda x: x.get("end", ""), reverse=True)
                    return sorted_vals[0]["val"]
                return None
            except (KeyError, IndexError, TypeError):
                return None

        result = {
            'total_revenue': _latest("Revenues"),
            'net_income':    _latest("NetIncomeLoss"),
            'assets':        _latest("Assets"),
            'liabilities':   _latest("Liabilities"),
            'equity':        _latest("StockholdersEquity")
        }

        # If all values are None, try alternative tags
        if all(v is None for v in result.values()):
            alternative_result = {
                'total_revenue': _latest("RevenueFromContractWithCustomerExcludingAssessedTax") or _latest("SalesRevenueNet"),
                'net_income':    _latest("ProfitLoss") or _latest("NetIncomeLossAvailableToCommonStockholdersBasic"),
                'assets':        _latest("AssetsCurrent") or _latest("AssetsNoncurrent"),
                'liabilities':   _latest("LiabilitiesCurrent") or _latest("Liabilities"),
                'equity':        _latest("StockholdersEquityIncludingPortionAttributableToNoncontrollingInterest")
            }
            # Use alternative if it has more data
            if sum(1 for v in alternative_result.values() if v is not None) > sum(1 for v in result.values() if v is not None):
                result = alternative_result

        return result

    except requests.exceptions.Timeout:
        return "SEC API request timed out"
    except requests.exceptions.RequestException as req_error:
        return f"SEC API request error: {req_error}"
    except Exception as e:
        return f"Error SEC fundamentals: {e}"

# ── 6. Yahoo Finance Enhanced Fundamentals Tool ───────────────────────────────
def yahoo_fundamentals_tool(ticker: str):
    try:
        stock = yf.Ticker(ticker)
        info = stock.info

        def safe_get(key, default=None):
            return info.get(key, default)

        # Try to get financial statements for growth calculations
        revenue_growth = None
        eps_growth = None

        try:
            # Get quarterly financials using the new API
            quarterly_financials = stock.quarterly_financials
            income_stmt = stock.quarterly_income_stmt

            # Calculate revenue growth if data is available
            if quarterly_financials is not None and not quarterly_financials.empty:
                revenue_row = quarterly_financials.loc[quarterly_financials.index.str.contains('Total Revenue|Revenue', case=False, na=False)]
                if not revenue_row.empty and len(revenue_row.columns) >= 5:
                    try:
                        current_revenue = revenue_row.iloc[0, 0]  # Most recent quarter
                        yoy_revenue = revenue_row.iloc[0, 4]      # Same quarter last year
                        if pd.notna(current_revenue) and pd.notna(yoy_revenue) and yoy_revenue != 0:
                            revenue_growth = ((current_revenue - yoy_revenue) / abs(yoy_revenue)) * 100
                    except (IndexError, KeyError, ZeroDivisionError):
                        pass

            # Calculate EPS growth if data is available
            if income_stmt is not None and not income_stmt.empty:
                net_income_row = income_stmt.loc[income_stmt.index.str.contains('Net Income', case=False, na=False)]
                if not net_income_row.empty and len(net_income_row.columns) >= 5:
                    try:
                        current_ni = net_income_row.iloc[0, 0]    # Most recent quarter
                        yoy_ni = net_income_row.iloc[0, 4]        # Same quarter last year
                        shares = safe_get('sharesOutstanding')
                        if all(pd.notna(x) for x in [current_ni, yoy_ni, shares]) and yoy_ni != 0 and shares != 0:
                            current_eps = current_ni / shares
                            yoy_eps = yoy_ni / shares
                            eps_growth = ((current_eps - yoy_eps) / abs(yoy_eps)) * 100
                    except (IndexError, KeyError, ZeroDivisionError):
                        pass

        except Exception:
            # If financial statements fail, fall back to info data
            revenue_growth = safe_get('revenueGrowth')
            eps_growth = safe_get('earningsGrowth')

        result = {
            # Valuation Metrics
            'pegRatio': safe_get('pegRatio'),
            'priceToBook': safe_get('priceToBook'),
            'priceToSales': safe_get('priceToSalesTrailing12Months'),
            'enterpriseToRevenue': safe_get('enterpriseToRevenue'),
            'enterpriseToEbitda': safe_get('enterpriseToEbitda'),

            # Profitability Metrics
            'returnOnEquity': safe_get('returnOnEquity'),
            'returnOnAssets': safe_get('returnOnAssets'),
            'grossMargins': safe_get('grossMargins'),
            'operatingMargins': safe_get('operatingMargins'),
            'profitMargins': safe_get('profitMargins'),

            # Growth Metrics
            'revenueGrowth': revenue_growth or safe_get('revenueGrowth'),
            'earningsGrowth': eps_growth or safe_get('earningsGrowth'),
            'earningsQuarterlyGrowth': safe_get('earningsQuarterlyGrowth'),

            # Dividend Metrics
            'dividendYield': safe_get('dividendYield'),
            'payoutRatio': safe_get('payoutRatio'),
            'dividendRate': safe_get('dividendRate'),

            # Financial Health
            'debtToEquity': safe_get('debtToEquity'),
            'currentRatio': safe_get('currentRatio'),
            'quickRatio': safe_get('quickRatio'),
            'totalCashPerShare': safe_get('totalCashPerShare'),

            # Per Share Metrics
            'earningsPerShare': safe_get('trailingEps'),
            'forwardEps': safe_get('forwardEps'),
            'bookValue': safe_get('bookValue'),
            'revenuePerShare': safe_get('revenuePerShare'),

            # Market Metrics
            'beta': safe_get('beta'),
            'marketCap': safe_get('marketCap'),
            'enterpriseValue': safe_get('enterpriseValue'),
            'floatShares': safe_get('floatShares'),
            'sharesOutstanding': safe_get('sharesOutstanding'),

            # Additional Key Metrics
            'fiftyTwoWeekHigh': safe_get('fiftyTwoWeekHigh'),
            'fiftyTwoWeekLow': safe_get('fiftyTwoWeekLow'),
            'averageVolume': safe_get('averageVolume'),
            'targetMeanPrice': safe_get('targetMeanPrice'),
            'recommendationKey': safe_get('recommendationKey')
        }

        # Convert percentage values to proper format
        percentage_fields = ['returnOnEquity', 'returnOnAssets', 'grossMargins',
                           'operatingMargins', 'profitMargins', 'dividendYield']

        for field in percentage_fields:
            if result[field] is not None and isinstance(result[field], (int, float)):
                # Convert decimal to percentage (e.g., 0.15 -> 15%)
                if abs(result[field]) <= 1:
                    result[field] = result[field] * 100

        return result

    except Exception as e:
        return f"Error Yahoo fundamentals: {e}"

# ── 7. Combined Fundamentals Tool ─────────────────────────────────────────────
def combined_fundamentals_tool(ticker: str):
    try:
        yahoo_data = yahoo_fundamentals_tool(ticker)
        sec_data = sec_fundamentals_tool(ticker)

        return {
            "yahoo_finance": yahoo_data,
            "sec_edgar": sec_data,
            "data_source": "Combined Yahoo Finance + SEC EDGAR data"
        }

    except Exception as e:
        return f"Error combined fundamentals: {e}"

# ── 8. News Sentiment Tool ────────────────────────────────────────────────────
def news_sentiment_tool(ticker: str, count: int = 5):
    try:
        articles = PolygonAPIWrapper().get_ticker_news(ticker=ticker)[:count]
        scores = [sent_analyzer.polarity_scores(a["title"])["compound"] for a in articles]
        return sum(scores)/len(scores) if scores else 0.0
    except Exception as e:
        return f"Error news sentiment: {e}"

# ── 9. Financial Health Score Tool ───────────────────────────────────────────
def financial_health_score_tool(ticker: str):
    try:
        yahoo_data = yahoo_fundamentals_tool(ticker)
        if isinstance(yahoo_data, str):  # Error case
            return yahoo_data

        score = 0
        max_score = 0

        # ROE scoring (0-20 points)
        roe = yahoo_data.get('returnOnEquity')
        if roe is not None:
            max_score += 20
            if roe > 15: score += 20
            elif roe > 10: score += 15
            elif roe > 5: score += 10
            elif roe > 0: score += 5

        # Debt to Equity scoring (0-20 points)
        debt_to_equity = yahoo_data.get('debtToEquity')
        if debt_to_equity is not None:
            max_score += 20
            if debt_to_equity < 0.3: score += 20
            elif debt_to_equity < 0.6: score += 15
            elif debt_to_equity < 1.0: score += 10
            elif debt_to_equity < 2.0: score += 5

        # Current Ratio scoring (0-20 points)
        current_ratio = yahoo_data.get('currentRatio')
        if current_ratio is not None:
            max_score += 20
            if current_ratio > 2.0: score += 20
            elif current_ratio > 1.5: score += 15
            elif current_ratio > 1.2: score += 10
            elif current_ratio > 1.0: score += 5

        # Profit Margin scoring (0-20 points)
        profit_margin = yahoo_data.get('profitMargins')
        if profit_margin is not None:
            max_score += 20
            if profit_margin > 20: score += 20
            elif profit_margin > 15: score += 15
            elif profit_margin > 10: score += 10
            elif profit_margin > 5: score += 5

        # Revenue Growth scoring (0-20 points)
        revenue_growth = yahoo_data.get('revenueGrowth')
        if revenue_growth is not None:
            max_score += 20
            if revenue_growth > 20: score += 20
            elif revenue_growth > 15: score += 15
            elif revenue_growth > 10: score += 10
            elif revenue_growth > 5: score += 5

        if max_score > 0:
            final_score = (score / max_score) * 100
            return {
                'health_score': round(final_score, 2),
                'rating': 'Excellent' if final_score >= 80 else
                         'Good' if final_score >= 60 else
                         'Fair' if final_score >= 40 else 'Poor',
                'components_scored': max_score // 20
            }
        else:
            return "Insufficient data for health score calculation"

    except Exception as e:
        return f"Error calculating financial health: {e}"

# ── 10. Assemble Tools & Initialize Agent ────────────────────────────────────
tools = [
    Tool.from_function(current_price_tool,           name="CurrentPrice",        description="Get latest stock price"),
    Tool.from_function(sma_tool,                     name="SMA",                 description="Calculate 20-day Simple Moving Average"),
    Tool.from_function(ema_tool,                     name="EMA",                 description="Calculate 20-day Exponential Moving Average"),
    Tool.from_function(macd_tool,                    name="MACD",                description="Calculate MACD and signal line"),
    Tool.from_function(bbands_tool,                  name="BBANDS",              description="Calculate 20-day Bollinger Bands"),
    Tool.from_function(atr_tool,                     name="ATR",                 description="Calculate 14-day Average True Range"),
    Tool.from_function(sec_fundamentals_tool,        name="SEC_Fundamentals",    description="Get SEC EDGAR fundamental data"),
    Tool.from_function(yahoo_fundamentals_tool,      name="Yahoo_Fundamentals",  description="Get comprehensive Yahoo Finance fundamental metrics"),
    Tool.from_function(combined_fundamentals_tool,   name="Combined_Fundamentals", description="Get both Yahoo Finance and SEC fundamental data"),
    Tool.from_function(financial_health_score_tool,  name="Financial_Health",    description="Calculate overall financial health score"),
    Tool.from_function(news_sentiment_tool,          name="NEWS_SENTIMENT",      description="Calculate average news sentiment score")
]

llm = ChatOpenAI(temperature=0, model="gpt-4o-mini")
agent = initialize_agent(
    tools, llm,
    agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

# ── 11. Enhanced Query Agent ─────────────────────────────────────────────────
query = (
    "Analyze this ticker: AAPL. "
    "Provide comprehensive analysis including: "
    "1. Current price and technical indicators (SMA, EMA, MACD, BBANDS, ATR) "
    "2. Detailed fundamental analysis from Yahoo Finance (PEG ratio, ROE, dividend yield, growth metrics, etc.) "
    "3. SEC EDGAR fundamental data for verification "
    "4. Financial health score assessment "
    "5. News sentiment analysis "
    "6. Final Buy/Hold/Sell recommendation with detailed reasoning"
)

print("="*80)
print("ENHANCED STOCK ANALYSIS REPORT")
print("="*80)
print(agent.invoke({"input": query})["output"])

ENHANCED STOCK ANALYSIS REPORT


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mThought: I will start by gathering the current price and technical indicators for AAPL, including SMA, EMA, MACD, BBANDS, and ATR. This will provide a foundation for the analysis.

Action:
```
{
  "action": "CurrentPrice",
  "action_input": "AAPL"
}
```
[0m
Observation: [36;1m[1;3m214.06[0m
Thought:[32;1m[1;3mI will now gather the technical indicators for AAPL, including SMA, EMA, MACD, BBANDS, and ATR, to complete the analysis.

Action:
```
{
  "action": "SMA",
  "action_input": "AAPL"
}
```

[0m
Observation: [33;1m[1;3m202.42399999999998[0m
Thought:[32;1m[1;3mI will now gather the 20-day Exponential Moving Average (EMA) for AAPL to continue the technical analysis.

Action:
```
{
  "action": "EMA",
  "action_input": "AAPL"
}
```

[0m
Observation: [38;5;200m[1;3m204.5214162126727[0m
Thought:[32;1m[1;3mI will now gather the MACD and signal line for AAPL to complete the technical