<a href="https://colab.research.google.com/github/PranavSuresh525/AI-ML-Projects/blob/main/AI-ML-Projects/Analytical_Finance_Chatbot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# All the downloads and imports required
!pip install -q -U --no-warn-conflicts \
    langchain-huggingface \
    langchain-google-genai \
    langgraph \
    yfinance \
    transformers \
    accelerate \
    newsapi-python \
    langchain-text-splitters \
    langchain-chroma
import os
import re
import requests
import operator
from datetime import datetime, timedelta
from typing import TypedDict, Annotated
import yfinance as yf
from newsapi import NewsApiClient
from transformers import pipeline
from langchain_huggingface import HuggingFacePipeline, HuggingFaceEmbeddings
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, END, START
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_chroma import Chroma
from google.colab import userdata
import traceback
from datetime import datetime, timedelta

In [None]:
## the local llm whixh avoids unecessary calls from gemini as there is a API limit
local_pipe = pipeline("text2text-generation", model="google/flan-t5-large", max_new_tokens=256, device_map="auto")
local_llm = HuggingFacePipeline(pipeline=local_pipe)
os.environ["GOOGLE_API_KEY"] = userdata.get('GOOGLE_TOKEN')
llm = ChatGoogleGenerativeAI(
    model="gemma-3-27b-it", # mainly chosen as gemini models allow only 20 tokens per day, this allows 14k
    temperature=0.2,
    google_api_key=os.environ["GOOGLE_API_KEY"]
)
_ticker_cache={} # stores already extracted tickers

In [None]:
# Mainly uses the gemma model but in case it fails( due to rate limits) it switches to local model
def smart_llm_invoke(prompt):
  try:
      response = llm.invoke([HumanMessage(content=prompt)])
      return response.content.strip()
  except Exception as e:
      print(f"Error from Gemini: {e}")
      response = local_llm.invoke(prompt)
      return response.strip()

In [None]:
# a basic function to get all the info related to a stock
def get_stock_price(ticker: str):
  try:
    stock = yf.Ticker(ticker)
    info = stock.info
    history = stock.history(period='1y')

    if not info or not info.get('symbol') or history.empty:
      return {"error": f"No data found or invalid ticker for {ticker}"}

    return{
            'ticker': ticker,
            'current_price': info.get('currentPrice', 0),
            'previous_close': info.get('previousClose', 0),
            'day_high': info.get('dayHigh', 0),
            'day_low': info.get('dayLow', 0),
            'volume': info.get('volume', 0),
            'market_cap': info.get('marketCap', 0),
            'company_name': info.get('longName', ticker),
            'pe_ratio': info.get('trailingPE', 0),
            'dividend_yield': info.get('dividendYield', 0),
            'target_mean_price': info.get('targetMeanPrice', 0),
            'recommendation_key': info.get('recommendationKey', 'N/A'),
            '50_day_average': history['Close'].rolling(window=50).mean().iloc[-1] if len(history) >= 50 else 0,
            '200_day_average': history['Close'].rolling(window=200).mean().iloc[-1] if len(history) >= 200 else 0,
            'price_history': history['Close'].tail(30).to_list()
        }
  except Exception as e:
        return {"error": f"Failed to fetch data for {ticker}: {str(e)}"}

In [None]:
# a function which accesses a pandas data frame and gets the latest stock price of the company-'item' here
def get_recent_data(df, items):
  if df.empty:
    return {}
  recent_data={}
  for item in items:
    try:
      if item in df.index:
          value = df.loc[item].iloc[0]
          recent_data[item] = float(value) if value is not None else 0
      else:
          recent_data[item] = 0
    except:
      recent_data[item] = 0


In [None]:
# a function which basically gets the latest info (listed below) from yahoo finance, uses the period here to know how long to look for
def fetch_financial_statements(ticker: str, period: str)->dict:
  try:
    stock=yf.Ticker(ticker)
    if not stock.info or not stock.info.get('symbol'):
      return {'error': f'Invalid or no data for ticker {ticker}'}

    if period=='quaterly':
      balance_sheet=stock.quarterly_balance_sheet
      income_statement=stock.quarterly_income_stmt
      cash_flow=stock.quarterly_cashflow
    else:
      balance_sheet=stock.balance_sheet
      income_statement=stock.income_stmt
      cash_flow=stock.cashflow
    return{
        'balance_sheet': get_recent_data(balance_sheet, ['Total Cash', 'Total Debt']),
        'income_statement': get_recent_data(income_statement, ['Total Revenue', 'Gross Profit']),
        'cash_flow': get_recent_data(cash_flow, ['Net Cash Flow']),
        'period': period
    }
  except Exception as e:
    return {"error": f"Failed to fetch data for {ticker}: {str(e)}"}

In [None]:
# a simple validate function that cross checks with yf to see of the ticker exist
def validate_ticker(potential_ticker: str):
  try:
    stock = yf.Ticker(potential_ticker)
    info = stock.info
    if info and 'symbol' in info and info.get('symbol'):
      return info['symbol']
    return None
  except Exception as e:
    return None

In [None]:
# classifies which currency must be used in the response
def fetch_currency(ticker: str):
  try:
      info = yf.Ticker(ticker).info
      currency_code = info.get("currency", "USD")
  except Exception:
      currency_code = "USD"
  SYMBOL_MAP = {
      "USD": "$", "INR": "₹", "EUR": "€", "GBP": "£",
      "JPY": "¥", "CNY": "¥", "HKD": "HK$",
      "AUD": "A$", "CAD": "C$", "CHF": "CHF",
      "KRW": "₩", "BRL": "R$", "ZAR": "R"
  }
  symbol = SYMBOL_MAP.get(currency_code, currency_code + " ")
  return symbol, currency_code

In [None]:
#A function that analyses sentiments using llm, however sometimes, llm fails to extract the sentiment, the function relies on a list of words, some of which have multipliers which are used to amplify the sentiments and negations to reduce it
def analyze_sentiment(text: str):
    if not text or not text.strip():
        return 0.0
    prompt = f"""Analyze the sentiment of this financial news text.
Return ONLY a number between -1 (very negative) and 1 (very positive).
Text:
{text}
sentiment:"""
    try:
        response = smart_llm_invoke(prompt)
        score = float(re.findall(r"-?\d+\.?\d*", response)[0])
        return max(-1.0, min(1.0, score))
    except:
        pass
    positive_words = {
        'gain', 'gains', 'rise', 'rises', 'up', 'surge', 'rally', 'jump',
        'soar', 'climb', 'boost', 'profit', 'growth', 'strong',
        'outperform', 'beat', 'bullish', 'optimistic', 'upgrade', 'buy'
    }
    negative_words = {
        'loss', 'losses', 'down', 'drop', 'fall', 'decline', 'plunge',
        'crash', 'tumble', 'slump', 'miss', 'weak', 'bearish',
        'pessimistic', 'downgrade', 'sell', 'concern', 'fear'
    }
    intensifiers = {'very', 'extremely', 'highly', 'significantly', 'sharply'}
    negations = {'not', 'no', 'never', "don't", "doesn't", "didn't", "won't"}
    words = re.findall(r"[a-zA-Z']+", text.lower())

    score = 0.0
    weight = 1.0
    for i, word in enumerate(words):
        prev = words[max(0, i-2):i]
        multiplier = 1.0
        if any(p in intensifiers for p in prev):
            multiplier = 1.7
        negated = any(p in negations for p in prev)
        if word in positive_words:
            score += (-multiplier if negated else multiplier)
        elif word in negative_words:
            score += (multiplier if negated else -multiplier)
    if score == 0:
        return 0.0
    normalized = score / max(3.0, abs(score))
    return max(-1.0, min(1.0, normalized))

In [None]:
# basic features that the model must have
class AgentState(TypedDict):
    query: str
    ticker: str
    intent: str
    price_data: dict
    financial_data: dict
    news_articles: Annotated[list, operator.add]
    news_context: Annotated[list, operator.add]
    sentiment_score: float
    analysis: str
    recommendation: str
    messages: list
    news_sources: list

In [None]:
# The main RAG pipeline
class NewsRAG:
    def __init__(self):
        self.embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-MiniLM-L6-v2"
        ) # the model chosen to embed the text into a vector space
        self.vectorstore = None

        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=500,
            chunk_overlap=100
        ) # basically cuts the text into chunks of 500 characters each of which 100 overlap with the previous one
    def index_news(self, articles: list): # used to store the info goven below
        documents = []
        for article in articles:
            text = f"""
            Title: {article['title']}
            Source: {article['source']}
            Date: {article['date']}
            Content: {article['content']}
            """
            docs = self.splitter.split_documents([
                Document(
                    page_content=text,
                    metadata={
                        "source": article["source"],
                        "url": article["link"],
                        "date": article["date"]
                    }
                )
            ])
            documents.extend(docs)
        if documents: # vectors are stored in chroma
            self.vectorstore = Chroma.from_documents(
                documents,
                self.embeddings,
                collection_name="news_rag"
            )
    def retrieve_context(self, query: str, k: int = 5): # used to retrieve the vectors from the embedded space
        if not self.vectorstore:
            return []
        results = self.vectorstore.similarity_search(query, k=k)
        return [
            {
                "content": doc.page_content,
                "source": doc.metadata.get("source", "Unknown"),
                "url": doc.metadata.get("url", ""),
                "date": doc.metadata.get("date", "")
            }
            for doc in results
        ]

In [None]:
# a helper function which calls the RAG setup cleanly, here we take k=5 so we call the top 5 chunks which are closest to the query mathematically
def rag_retriever(state: AgentState):
  articles = state.get("news_articles", [])
  if not articles:
      state["news_context"] = []
      state["news_sources"] = []
      return state
  rag = NewsRAG()
  rag.index_news(articles)
  retrieved_dicts = rag.retrieve_context(state["query"], k=5)
  state["news_context"] = [r["content"] for r in retrieved_dicts]
  state["news_sources"] = retrieved_dicts
  return state

In [None]:
# classifies the query into the following categories to allow a more precise answer
def intent_classifier(state: AgentState):
    query = state['query']
    query_lower = query.lower()
    if any(word in query_lower for word in ['when did', 'when was']) and any(word in query_lower for word in ['hit', 'reach', 'achieve', 'cross']):
        intent = 'milestone_query'
    elif any(word in query_lower for word in ['why', 'reason', 'cause']):
        intent = 'reason_query'
    elif any(word in query_lower for word in ['when', 'trend', 'history']):
        intent = 'trend_analysis'
    elif any(word in query_lower for word in ['compare', 'vs', 'versus']):
        intent = 'comparison'
    elif any(word in query_lower for word in ['price', 'cost', 'trading at']):
        intent = 'price_query'
    else:
        intent = 'general'
    state['intent'] = intent
    state['messages'].append(f"[Intent Classifier] Intent: {intent}")
    return state

In [None]:
# defining a function that uses yahoo api to search across yahoo's database to get the exact ticker that the query want so give
def yahoo_lookup_best_match(search_term):# searches through yahoo's database
  try:
    url = "https://query2.finance.yahoo.com/v1/finance/search"
    params = {
        'q': search_term,
        'quotesCount': 10,
        'newsCount': 0,
        'enableFuzzyQuery': False,
        'quotesQueryId': 'tss_match_phrase_query'
    }
    headers = {'User-Agent': 'Mozilla/5.0'}
    response = requests.get(url, params=params, headers=headers, timeout=5)
    data = response.json()
    quotes = data.get('quotes', [])
    if not quotes:
        return None
# the problem with having just the company's name is that it might pickup on any subsidary or minor company with the same name ( due to alphabetical order of yahoo database) thus assigns a ranking system to the search results
    def score_result(quote):
      score = 0
      symbol = quote.get('symbol', '')
      name = quote.get('longname', '') or quote.get('shortname', '')
      exchange = quote.get('exchange', '')
      quote_type = quote.get('quoteType', '')
      if search_term.lower() in name.lower():
          score += 100
      if quote_type == 'EQUITY':
          score += 50
      if exchange in ['NSI', 'BSE', 'NMS', 'NYQ']:
          score += 30
      if len(name) > len(search_term) + 20:
          score -= 20
      if len(symbol) <= 10:
          score += 10
      base_symbol = symbol.split('.')[0]
      if base_symbol.isalpha():
          score += 10
      return score

    ranked = sorted(quotes, key=score_result, reverse=True)
    return ranked[0].get('symbol')

  except Exception as e:
      print(f"Yahoo lookup failed: {e}")
      return None

In [None]:
# the bottleneck of this project, the following function has a 3 stage fallback option to try to extract a ticker at all costs
def ticker_extractor(state):
  # uses regex search first, in case the query contains the ticker directly, this if found is extremely fast
    query = state["query"].strip()
    q_upper = query.upper()
    explicit_ticker_pattern = r'\b[A-Z&]{2,10}\.[A-Z]{1,4}\b|\$[A-Z]{2,5}\b'
    explicit_matches = re.findall(explicit_ticker_pattern, q_upper)
    for raw in explicit_matches: # checks the find with validate ticker function
        ticker = raw.replace("$", "").strip()
        if validate_ticker(ticker):
            state["ticker"] = ticker
            state["messages"].append(f"[Ticker Extractor] Explicit ticker: {ticker}")
            return state
# if there is no direct ticker in the query, it proceeds to use a llm to extract the company name
    extract_prompt = f"""Extract ONLY the company name from this query. Return just the company name, nothing else.
Query: {query}
Company name:"""
    company_name = smart_llm_invoke(extract_prompt).strip()
# calls upon the yahoo function from earlier
    ticker = yahoo_lookup_best_match(company_name)
    if ticker and validate_ticker(ticker):
        state["ticker"] = ticker
        state["messages"].append(f"[Ticker Extractor] Found via Yahoo: {ticker}")
        return state
# if the above 2 methods fail, it uses the last method, i.e tries to extract the ticker using the following prompt
    llm_prompt = f"""What is the PRIMARY/MAIN stock ticker symbol for "{company_name}"?
IMPORTANT: If there are multiple companies with similar names, return the LARGEST/MOST WELL-KNOWN one.
Examples:
- "Mahindra" → M&M.NS (Mahindra & Mahindra, the main automobile company, NOT subsidiaries)
- "Tata" → TATAMOTORS.NS (the main auto company)
- "Adani" → ADANI.NS (Adani Enterprises, the flagship)
- "Apple" → AAPL (the main company)
For Indian companies: use .NS suffix
For US companies: no suffix
Company: {company_name}
Main ticker symbol:"""
    ticker_response = smart_llm_invoke(llm_prompt).strip().upper()
# tries to regex search the results from the llm as it should output the ticker
    match = re.search(r'\b[A-Z&]{1,15}\.(?:NS|BO)\b|\b[A-Z]{1,5}\b', ticker_response)
    ticker = match.group(0) if match else ticker_response
    if ticker and validate_ticker(ticker):
        state["ticker"] = ticker
        state["messages"].append(f"[Ticker Extractor] LLM resolved: {ticker}")
        return state
    state["ticker"] = ""
    state["messages"].append(f"[Ticker Extractor] Could not find ticker for {company_name}")
    return state

In [None]:
# uses the get stock info function to update the agent's parameters
def price_fetcher(state: AgentState):
  ticker=state['ticker']
  if ticker:
    price_data=get_stock_price(ticker)
    state['price_data']=price_data
    state['messages'].append(f"[Price Fetcher] Price Data: {price_data}")
  else:
    state['price_data']={}
    state['messages'].append(f"[Price Fetcher] No price data found")
  return state

In [None]:
# feches financial info if the intent classifies the query as general or comparision
def financial_fetcher(state: AgentState):
  ticker=state['ticker']
  intent=state['intent']
  if intent in ['general', 'comparison'] and ticker:
    financial_data=fetch_financial_statements(ticker, 'annual')
    state['financial_data']=financial_data
    state['messages'].append(f"[Financial Fetcher] Financial Data: {financial_data}")
  else:
    state['financial_data']={}
    state['messages'].append(f"[Financial Fetcher] No financial data found")
  return state

In [None]:
# a crucial function in this setup, fetches the news that the RAG will embed, i used a news api to get the source
def news_fetcher(state: AgentState):
    ticker = state.get('ticker')
    if not ticker:
        state['news_articles'] = []
        state.setdefault('messages', []).append("[News Fetcher] No ticker provided")
        return state
    try:
        try:
            api_key = userdata.get('NEWSAPI_KEY')
            newsapi = NewsApiClient(api_key=api_key)
        except Exception as e:
            state['news_articles'] = []
            state.setdefault('messages', []).append(f"[News Fetcher] API initialization failed: {e}")
            return state
# reduces the ticker to its company name or get the company name from price_data function
        clean_ticker = ticker.split('.')[0]
        price_data = state.get('price_data', {})
        company_name = price_data.get('company_name', clean_ticker)
        intent = state.get('intent', '')
        from_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')

        articles = []
        seen_urls = set()
        queries = [
            company_name,
            f"{clean_ticker}",
            f"{company_name} stock",
            f"{company_name} India",
        ]
        for query in queries:
            try: # proceeds to search and gets the site name with its URL along with its date
                response = newsapi.get_everything(
                    q=query,
                    from_param=from_date,
                    language='en',
                    sort_by='publishedAt',
                    page_size=20
                )
                status = response.get('status')
                news_items = response.get('articles', [])
                if status != 'ok':
                    continue
                for item in news_items:
                    url = item.get('url', '')
                    if not url or url in seen_urls:
                        continue
                    seen_urls.add(url)
                    source_obj = item.get('source', {})
                    source_name = source_obj.get('name', '') if isinstance(source_obj, dict) else str(source_obj)
                    content = item.get('description') or item.get('content') or item.get('title', '')
                    published_date = item.get('publishedAt', '')
                    try:
                        date_obj = datetime.fromisoformat(published_date.replace('Z', '+00:00'))
                        formatted_date = date_obj.strftime('%Y-%m-%d')
                    except:
                        formatted_date = published_date

                    articles.append({
                        'title': item.get('title', ''),
                        'content': content,
                        'link': url,
                        'date': formatted_date,
                        'source': source_name
                    })
                if len(articles) >= 20:
                    break
            except Exception as e:
                continue
        state['news_articles'] = articles
        state.setdefault('messages', [])
        state['messages'].append(f"[News Fetcher] Found {len(articles)} articles for {ticker}")
    except Exception as e:
        state['news_articles'] = []
        state.setdefault('messages', []).append(f"[News Fetcher] Error: {str(e)}")
    return state

In [None]:
# a function that updates the state class after the news fetcher is done
def news_analyzer(state: AgentState):
    context = state.get("news_context", [])
    if not context:
        state["sentiment_score"] = 0.0
        state["messages"].append("[News Analyzer] No context")
        return state
    sentiment = analyze_sentiment("\n".join(context))
    state["sentiment_score"] = sentiment
    state["messages"].append(
        f"[News Analyzer] Sentiment Score: {sentiment:.2f}"
    )
    return state

In [None]:
# the response generation node, takes all the parameters of the agent and asks the llm to stich together a coherant answer using the context from the information
def response_generator(state: AgentState):
  query = state["query"]
  ticker = state["ticker"]
  price_data = state.get("price_data", {})
  sentiment = state.get("sentiment_score", 0.0)
  news_context = state.get("news_context") or []
  sources = state.get("news_sources") or []
  symbol = fetch_currency(ticker)[0]
  news_str = "\n".join(news_context) if news_context else "No specific news found."
  prompt = f"""
  You are a professional financial analyst.
  STEP 1: ANALYZE PRICE DATA
  Ticker: {ticker}
  Current Price: {symbol}{price_data.get('current_price')}
  Change: From {symbol}{price_data.get('previous_close')}
  STEP 2: INCORPORATE RAG CONTEXT (News)
  Recent News Context: {news_str}
  Sentiment Score: {sentiment:.2f}
  USER QUESTION: {query}
  INSTRUCTIONS:
  1. Provide a "Market Snapshot" based on the price data.
  2. Provide a "News & Catalyst" section using the RAG context.
  3. Be factual and grounded in the provided snippets.
  """
  answer = smart_llm_invoke(prompt)
# adds the news sources that the news fetcher accessed during its search to the final answer
  if sources:
      answer += "\n\n---"
      answer += "\n### Latest News & Sources:"
      seen_urls = set()
      for i, s in enumerate(sources, 1):
          url = s.get('url')
          if url and url not in seen_urls:
              seen_urls.add(url)
              answer += f"\n{i}. {s.get('source', 'News')} – [Read Article]({url})"
  state["analysis"] = answer
  return state

In [None]:
# the function which puts the entire flow together and defines the order of callig the functions and the pipeline that has to be run
def build_workflow():
  workflow = StateGraph(AgentState)
  # Nodes
  workflow.add_node("intent_classifier", intent_classifier)
  workflow.add_node("ticker_extractor", ticker_extractor)
  workflow.add_node("price_fetcher", price_fetcher)
  workflow.add_node("financial_fetcher", financial_fetcher)
  workflow.add_node("news_fetcher", news_fetcher)
  workflow.add_node("news_analyzer", news_analyzer)
  workflow.add_node("rag_retriever", rag_retriever)
  workflow.add_node("response_generator", response_generator)
  # Linear Execution: Force it to always gather News for the RAG display
  workflow.add_edge(START, "intent_classifier")
  workflow.add_edge("intent_classifier", "ticker_extractor")
  workflow.add_edge("ticker_extractor", "price_fetcher")
  workflow.add_edge("price_fetcher", "financial_fetcher")
  workflow.add_edge("financial_fetcher", "news_fetcher")
  workflow.add_edge("news_fetcher", "news_analyzer")
  workflow.add_edge("news_analyzer", "rag_retriever")
  workflow.add_edge("rag_retriever", "response_generator")
  workflow.add_edge("response_generator", END)

  return workflow.compile()

In [None]:
# A helper function that helps run the chatbot, it initializes all values of the agent, calls the agent and feeds it the user's query
def query_stocks(user_query: str):
    initial_state = {
        'query': user_query,
        'ticker': '',
        'intent': '',
        'price_data': {},
        'financial_data': {},
        'news_articles': [],
        'news_context': [],
        'sentiment_score': 0.0,
        'analysis': '',
        'recommendation': '',
        'messages': []
    }
    agent = build_workflow()
    result = agent.invoke(initial_state)
    return {
        'answer': result['analysis'],
        'ticker': result['ticker'],
        'sentiment': result['sentiment_score'],
        'debug_messages': result['messages']
    }

In [None]:
query="Why did Apple stock drop?"
result2 = query_stocks(query)
print(f"Query: {query}")
print(f"Answer: {result2['answer']}\n")

Query: Why did Apple stock drop?
Answer: ## Apple (AAPL) Stock Analysis - January 26, 2026

Here's an analysis of Apple's stock performance, incorporating available price data and recent news:

**1. Market Snapshot**

* **Ticker:** AAPL
* **Current Price:** $254.8899
* **Change:** +$6.8499 (from $248.04)
* **Direction:** Positive - Apple stock *increased* in price, it did *not* drop. The provided data shows a gain, not a loss.

**2. News & Catalyst**

The recent news context primarily focuses on historical documentation related to Apple's founding. Specifically, the original partnership agreement between Steve Jobs, Steve Wozniak, and Ronald Wayne sold for $2.51 million on January 23, 2026. This news, while interesting from a historical perspective, **does not provide any explanation for stock movement.** It's unlikely to be a catalyst for either a price increase or decrease. 

There is also a news item regarding Alphabet (GOOGL) and its advancements in AI and cloud, highlighted by Bil