# Real-Time Financial Sentiment Classification using GenAI + RAG + Agent

This capstone demonstrates an end-to-end **Generative AI pipeline** using **Gemini 2.0 Flash**, **retrieval-augmented generation (RAG)**, and a **LangChain Python agent** for financial sentiment analysis.

The steps are:
- Fetch **1000 real-time finance news articles**
- Use **Gemini embeddings** to retrieve similar examples from a labeled dataset via **FAISS**
- Build **few-shot prompts** dynamically and classify sentiment using Gemini Flash
- Return a structured **JSON output** per article, showing the prediction and the supporting example

**Impact**: This solution supports analysts and decision-makers by transforming unstructured market news into structured, explainable sentiment summaries in real time.


### Install Dependencies
Install required libraries such as LangChain, Gemini SDK, FAISS, and sentence-transformers.

In [21]:
import numpy as np 
import pandas as pd 

import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
    # for filename in filenames:
        # print(os.path.join(dirname, filename))

## Install dependencies in Kaggle

### Set Up Gemini & Import Packages
Configure the Gemini SDK and import all required Python packages.

In [3]:
!pip install -q langchain faiss-cpu google-genai google-adk

In [2]:
!pip install -U langchain-google-genai

## LangChain + Gemini Setup

In [22]:
from google import genai
from langchain_core.tools import tool
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.tools import tool

import os
import time
import numpy as np
import pandas as pd
import faiss
import random
from tqdm import tqdm
from typing import List
from typing import Dict,Any
import requests


In [23]:
from dotenv import load_dotenv
load_dotenv()

GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

In [24]:
client = genai.Client(api_key=GOOGLE_API_KEY)

# model = genai.GenerativeModel('gemini-2.0-flash')
# def classify_with_gemini(prompt: str):
#     response = genai.generate_content(prompt, model="gemini-2.0-flash")
#     return response.text.strip()


### Load & Clean Labeled Data
Load the labeled financial sentiment dataset and prepare it for embedding.

In [48]:
df = pd.read_csv("datasets/sentiment_data.csv", encoding="ISO-8859-1", header=None)
df.columns = ['label', 'text']
df['label'] = df['label'].str.lower().str.strip()
df = df.iloc[:100,:]

### Embed Labeled Data with Gemini
Use Gemini's embedding API to encode each labeled example for similarity search.

In [29]:
# from google.generativeai import embed_content

def get_gemini_embedding(text):
    try:
        response = client.models.embed_content(
                model="models/text-embedding-004",
                contents=text,
                config={"task_type":'RETRIEVAL_DOCUMENT'}
        )
        time.sleep(0.7)
        return response.embeddings
    except Exception as e:
        print("Embedding failed:", e)
        return None

# Step 5: Apply to the labeled dataset
labeled_data = []
for i, row in tqdm(df.iterrows(), total=len(df)):
    embedding = get_gemini_embedding(row['text'])
    if embedding:
        labeled_data.append({
            'text': row['text'],
            'label': row['label'],
            'embedding': embedding
        })

100%|███████████████████████████████████████████| 10/10 [00:09<00:00,  1.09it/s]


### Build FAISS Index on Labeled Embeddings
Use FAISS to index all labeled data vectors for fast nearest-neighbor retrieval.

In [30]:
#
label_embeddings = np.array([item['embedding'][0].values for item in labeled_data]).astype('float32')
faiss.normalize_L2(label_embeddings)
faiss_index = faiss.IndexFlatIP(label_embeddings.shape[1])
faiss_index.add(label_embeddings)


### Fetch Real-Time Finance News
Use NewsAPI to retrieve the top 100 recent finance-related news articles.

In [31]:
NEWS_API_KEY = os.getenv("NEWS_API_KEY")

In [32]:
def fetch_news(query: str, max_results: int) -> List[str]:
    """
    Fetches recent English-language news articles matching the provided search query using the NewsAPI.

    Args:
        query (str): The search term or keywords to look for in news articles.
        max_results (int, optional): The maximum number of articles to retrieve. Defaults to 10.

    Returns:
        List[str]: A list of strings, where each string contains the title and description of a news article.
                   Returns an empty list if the request fails or no articles are found.

    Example:
        articles = fetch_news("artificial intelligence", max_results=10)
        # Returns a list of up to 10 news articles about artificial intelligence.
    """
    url = "https://newsapi.org/v2/everything"
    params = {
        "q": query,
        "language": "en",
        "pageSize": max_results,
        "sortBy": "relevance",
        "apiKey": NEWS_API_KEY
    }
    response = requests.get(url, params=params)
    if response.status_code != 200:
        print("Failed to fetch news:", response.text)
        return []
    return [f"{a['title']}. {a['description']}" for a in response.json().get("articles", []) if a['description']]


In [69]:
# fetch_news.args_schema.model_json_schema()

### Define LangChain Tool: RAG Retriever
Wrap the retrieval function as a LangChain-compatible Tool for the agent.

In [33]:
# @tool(parse_docstring=True)
def retrieve_similar_labeled_example(news_text: str) -> Dict[str, str]:
    """
    Finds and returns the most similar labeled financial example to the given news article using FAISS-based semantic search.

    Args:
        news_text (str): The news article or headline text to search for similar labeled examples.

    Returns:
        Dict[str, str]: A dictionary containing the following keys:
            - 'example_text': The text of the most similar labeled finance example from the database.
            - 'example_label': The label or category associated with the matched example.
            - If embedding fails, returns {'error': 'Failed to embed'}.

    Example:
        result = retrieve_similar_labeled_example("Apple stock surges after earnings report.")
        # Returns: {'example_text': 'Apple posts record quarterly revenue...', 'example_label': 'positive'}
    """
    query_vec = get_gemini_embedding(news_text)
   
    if query_vec is None:
        return {"error": "Failed to embed"}
    query_array = np.array(query_vec[0].values, dtype='float32').reshape(1, -1)
    # print(query_array)
    faiss.normalize_L2(query_array)
    _, indices = faiss_index.search(query_array.reshape(1, -1), 1)
    matched = labeled_data[indices[0][0]]
    
    return {"example_text": matched['text'], "example_label": matched['label']}


### Define LangChain Tool: Sentiment Classifier
Wrap the few-shot Gemini classifier as another LangChain Tool.

### -Tool: Classify with Gemini (Few-shot prompt)

In [34]:
# @tool(parse_docstring=True)
def classify_sentiment_with_few_shot(news: str, example_text: str, example_label: str) -> Dict[str, str]:
    """
    Classifies the sentiment of a financial news article (positive, negative, or neutral) using a few-shot prompt with Gemini.
    The function leverages a matched labeled example as in-context reference to improve classification accuracy.

    Args:
        news (str): The text of the financial news article to be classified.
        example_text (str): A labeled example news text that is semantically similar to the input.
        example_label (str): The sentiment label ('positive', 'negative', or 'neutral') of the example_text.

    Returns:
        Dict[str, str]: A dictionary containing:
            - 'sentiment': The predicted sentiment for the input news article ('positive', 'negative', 'neutral', or 'unknown').
            - 'news': The input news article text.
            - 'example_text': The matched example news text used for few-shot prompting.
            - 'example_label': The sentiment label of the matched example.
            - If an error occurs, returns {'error': <error_message>}.

    Example:
        result = classify_sentiment_with_few_shot(
            news="Tesla shares drop after recall announcement.",
            example_text="Tesla faces scrutiny after software glitch, stock falls.",
            example_label="negative"
        )
        # Returns: {
        #   'sentiment': 'negative',
        #   'news': "...",
        #   'example_text': "...",
        #   'example_label': "negative"
        # }
    """
    prompt = f"""You are a financial sentiment classifier.
    Here is an example:
    Text: {example_text}
    Sentiment: {example_label.capitalize()}
    
    Now classify the following:
    Text: {news}
    Sentiment:
    """

    try:
        response = client.models.generate_content(
            model="gemini-2.0-flash",
            contents=prompt
        )
        sentiment = response.text.strip().lower()

        if sentiment.startswith("positive"):
            sentiment = "positive"
        elif sentiment.startswith("negative"):
            sentiment = "negative"
        elif sentiment.startswith("neutral"):
            sentiment = "neutral"
        else:
            sentiment = "unknown"

        return {
            "sentiment": sentiment,
            "news": news,
            # "example_text": example_text,
            # "example_label": example_label
        }

    except Exception as e:
        return {"error": str(e)}


## Create Agent and Run Over News Batch

In [18]:
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage
from langchain_core.outputs import ChatResult, ChatGeneration
from langchain.agents import initialize_agent, AgentType


In [19]:
from typing import List, Optional, Union, Any

### 🔁 Run Agent in Batch Mode
Iterate through 1000 articles, retrieve examples, classify, and store the results.

In [26]:
real_time_news = fetch_news.invoke({'query': 'S&P 500 trend today', 'max_results': 100})

results = []

for article in tqdm(real_time_news[:25]):  # Test with 25 first
    retrieved = retrieve_similar_labeled_example.run(article)
    time.sleep(5) 
    if 'example_text' not in retrieved:
        continue
    
    # Call classification with retrieved example
    classification = classify_sentiment_with_few_shot.run({
        "news":article,
        "example_text":retrieved['example_text'],
        'example_label':retrieved['example_label']
})
    
    results.append(classification)


100%|███████████████████████████████████████████| 25/25 [02:43<00:00,  6.52s/it]


In [37]:
# real_time_news

In [36]:
# results

### Save Structured Results

In [None]:
import json
from pprint import pprint

In [None]:
with open("financial_sentiment_results.json", "w") as f:
    json.dump(results, f, indent=2)

In [None]:
with open("financial_sentiment_results.json", "r") as f:
    data = json.load(f)

pprint(data[:5])  

# Creating Agent using Google ADK

In [35]:
from google.adk.agents import Agent
from google.adk.tools import FunctionTool
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types

import logging

class _NoFunctionCallWarning(logging.Filter):
    def filter(self, record: logging.LogRecord) -> bool:
        message = record.getMessage()
        if "there are non-text parts in the response:" in message:
            return False
        else:
            return True

logging.getLogger("google_genai.types").addFilter(_NoFunctionCallWarning())

In [43]:
APP_NAME="stock_market_agent"
USER_ID="user1234"
SESSION_ID="1234"
MODEL_ID="gemini-2.0-flash"

In [37]:
fetch_news_tool = FunctionTool(func=fetch_news)
retrieve_similar_labeles_tool = FunctionTool(func=retrieve_similar_labeled_example)
sentiment_classification_tool = FunctionTool(func=classify_sentiment_with_few_shot)

In [38]:
agent_instructions = """
You are a financial assistant that helps users stay updated on stock market trends by providing news highlights and summarizing the overall market sentiment.

**Workflow:**
- When the user asks about a stock, company, or market trend, use the 'fetch_news' tool to retrieve recent English news articles relevant to their query.
- For each fetched news article, use the 'retrieve_similar_labeled_example' tool to find the most semantically similar labeled finance news from the historical dataset.
- Use the 'classify_sentiment_with_few_shot' tool, providing both the new article and the matched labeled example, to classify the sentiment of the news as positive, negative, or neutral.
- Present a brief summary of the main news points to the user, including the sentiment for each article.
- At the end, provide a high-level summary of the overall sentiment trend for the user's query (e.g., "Most news about [topic] this week is positive/negative/neutral").
- If you are unable to find news or classify sentiment, let the user know and suggest they try a different query.

**General Guidance:**
- Always use the tools in the following order: 1) fetch_news, 2) retrieve_similar_labeled_example, 3) classify_sentiment_with_few_shot.
- If any tool fails or returns no results, inform the user politely and suggest next steps.
- Summarize results clearly, using easy-to-understand language and bullet points when sharing multiple news items or sentiments.
"""

In [44]:
# Agent
stock_market_agent = Agent(
    model=MODEL_ID,
    name='stock_market_agent',
    instruction=agent_instructions,
    tools=[fetch_news_tool, retrieve_similar_labeles_tool, sentiment_classification_tool]
)

In [45]:
# --- Session and Runner Setup ---
session_service = InMemorySessionService()

# Create the session ONCE, before any agent calls
await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)

runner = Runner(agent=stock_market_agent, app_name=APP_NAME, session_service=session_service)

In [46]:
def call_agent(query):
    content = types.Content(role='user', parts=[types.Part(text=query)])
    events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
    print("\nUser:", query)
    print("\n========== Agent Reasoning Steps ==========\n")
    for event in events:
        step_lines = []
        # Safe check for role
        role = getattr(event.content, 'role', None) if hasattr(event, 'content') and event.content else None

        for part in event.content.parts:
            # Agent's reasoning, narration, or explanation
            if hasattr(part, "text") and part.text is not None and part.text.strip():
                step_lines.append(f"[Agent Thought]\n{part.text.strip()}\n")
            # Tool Call
            if hasattr(part, "function_call") and part.function_call is not None:
                args = part.function_call.args
                args_text = "\n    ".join(f"{k}: {v}" for k, v in args.items())
                step_lines.append(
                    f"[Tool Call]\n"
                    f"Tool: {part.function_call.name}\n"
                    f"Arguments:\n    {args_text}\n"
                )
            # Tool Output (Tool's reply back to agent, sometimes as tool_output, sometimes as text with role 'tool')
            if hasattr(part, "function_response") and part.function_response is not None:
                step_lines.append(f"[Tool Output]\n{part.function_response.response}\n")
        # Sometimes tool output is just in text form from the tool's response
        if role == "tool":
            for part in event.content.parts:
                if hasattr(part, "text") and part.text is not None and part.text.strip():
                    step_lines.append(f"[Tool Output]\n{part.text.strip()}\n")
        if step_lines:
            print("----- Step -----")
            for l in step_lines:
                print(l)
        # Detect and print the final response clearly
        if event.is_final_response():
            print("\n========== Final Agent Response ==========\n")
            for part in event.content.parts:
                if hasattr(part, "text") and part.text is not None and part.text.strip():
                    print(part.text.strip())
            print("\n==========================================\n")


In [47]:
call_agent("What is S&P trends today?")


User: What is S&P trends today?


----- Step -----
[Tool Call]
Tool: fetch_news
Arguments:
    max_results: 5
    query: S&P 500

----- Step -----
[Tool Output]
{'result': ['Morgan Stanley shares a chart that fuels the argument for new stock-market highs. The outlook for earnings is brightening, Morgan Stanley CIO Mike Wilson said. More upward revisions has historically led to 13% boost for the S&P 500.', 'Bath & Body Works is ready to go international after a bruising year saw its stock fall 40%. The home fragrance retailer reported a strong quarter after a troubling year, with its stock down 40% year-on-year and removal from the S&P 500.', "Dow, Nasdaq, S&P 500 falling after Israel airstrikes on Iran. All three of the US market indexes (^DJI, ^IXIC, ^GSPC) start Friday's trading session in negative territory, falling after Israel coordinated airstrikes to ..."]}

----- Step -----
[Agent Thought]
Here's what's going on with the S&P 500:

*   **Positive Outlook:** Morgan Stanley sugge

In [109]:
call_agent("What are trends around QQQM ETF today?")


User: What are trends around QQQM ETF today?


----- Step -----
[Agent Thought]
Here's a summary of the trends surrounding QQQM ETF today:

*   **QQQ Attracts $2.4B in Assets:** (Sentiment: Positive) QQQ led ETF inflows as the strong jobs report lifted markets.
*   **Institutional Investor Activity:**
    *   NorthCrest Asset Management LLC increased its position: (Sentiment: Positive)
    *   Two Sigma Investments LP grew its position: (Sentiment: Positive)
    *   Dynamic Advisor Solutions LLC decreased its holdings: (Sentiment: Negative)
*   **Short Interest Decrease:** (Sentiment: Neutral) Short interest in QQQM dropped by 24.4% in May.

Overall sentiment for QQQM ETF today is slightly positive, driven by significant asset inflows and increased positions from some institutional investors, though there was also a decrease in holdings by another institution. The decrease in short interest is a neutral signal.



Here's a summary of the trends surrounding QQQM ETF today:

*   **QQQ A

In [12]:
import uuid
uuid.uuid4()

UUID('919f00ae-cf36-4045-8f41-4f9ce473c49b')