# Financial Market Insight Assistant #

<p5> Objective: Build a local, automated system that ingests financial news and stock data daily, embeds it using HuggingFace embeddings, stores it in Chroma vector DB, and answers domain-specific queries via a LangChain RAG pipeline. Showcase DE orchestration, logging, CI/CD, and model drift detection.</p5>

<h3>Create an API Key</h3>
<ul>
    <li>Go to NEWSAPI.org and signup <a>https://newsapi.org/</a></li>
    <li>Create a file with .env extention and save the NEWSAPI_KEY</li>
    <li>NewsAPI is a RESTful API that provides real-time and historical news articles from thousands of sources worldwide. You can query news based on</li>
    <li>It’s not a scraping tool—it’s a structured API with ready-to-use JSON responses.</li>
    <li>JSON Responses include [Source, Author, Title, Desciption, URL]</li>
</ul>

In [1]:
import os
from dotenv import load_dotenv

# Loading the environment variables
load_dotenv()

API_KEY = os.getenv("NEWSAPI_KEY")
if API_KEY:
    print("The key has been successfully loaded")

The key has been successfully loaded


In [2]:
from pathlib import Path
import sqlite3

#--------- Paths ---------
BASE_DIR = Path().resolve().parent
RAW_DIR = BASE_DIR / "data" / "raw"
STOCK_DIR = BASE_DIR / "data" / "stock"
DB_PATH = BASE_DIR / "chroma_db.sqlite"

# Make sure directories exist
RAW_DIR.mkdir(parents=True, exist_ok=True)
STOCK_DIR.mkdir(parents=True, exist_ok=True)

#--------- Connect to DB ---------
connection_object = sqlite3.connect(DB_PATH)
cursor = connection_object.cursor()

#--------- Create stock_prices table if not exists ---------
create_table_query = """
CREATE TABLE IF NOT EXISTS stock_prices (
    Datetime DATETIME,
    Ticker TEXT,
    Open REAL,
    High REAL,
    Low REAL,
    Close REAL,
    Volume INTEGER
);
"""
# INTEGER --> Whole Numbers
# REAL --> Floating Point Numbers
# TEXT --> String
# DATETIME --> Date&Time    {These datatypes are sqlite3 specific}
cursor.execute(create_table_query)
connection_object.commit()

In [3]:
#-------- fetching news articles --------------

import datetime
from datetime import timedelta, timezone
import requests
import json
import os

def fetch_news():
    url = "https://newsapi.org/v2/everything"
    params = {
        "q": "NSE OR BASE OR FinTech", # Query --> Free-text search, supports keywords, phrases, and Boolean operators (OR, AND).
        "language": "en",
        "pageSize": 20, # Maximum number of articles returned per request (1–100).
        "sortBy": "publishedAt", # Detetermines order of articles. Other Options include
        # ['relevancy', 'popularity', 'publishedAt'---> Newest First]
        "apiKey": API_KEY, 
    }
    response = requests.get(url, params=params, timeout=30)
    data = response.json() # The data is extracted from the API
    
    
    timestamp = datetime.datetime.now(timezone.utc).strftime("%Y%m%d%H") # Year-Month-Day
    filename = os.path.join(RAW_DIR, f"news_{timestamp}.json")
    
    with open(filename, "w", encoding="utf8") as fp:
        json.dump(data, fp) # Extracted Data is dumped inside the json file
    
    return filename

<h4>Fetching Stock data from yfinance</h4>
<ul>
    <li>yfinance is a Python library that allows you to fetch historical and real-time stock market data from Yahoo Finance.</li>
    <li>It is a library not an API</li>
    <li>Covers global exchanges and stock prices</li>
    <li>Returns data as pandas DataFrame</li>
</ul>
<h5>Key Concepts</h5>
<ul>
    <li>ticker : Unique identifier of each stock</li>
    <li>Info : Metadata about the company (sector, market cap etc...).</li>
</ul>

In [12]:
# def fetch_stocks(tickers=["HDFCBANK.NS","ICICIBANK.NS","SBIN.NS"]):
#     for t in tickers:
#         df = yf.download(t, period="7d", interval="1h", group_by='ticker')
        
#         # --- Force tidy format ---
#         if isinstance(df.columns, pd.MultiIndex):
#             df = df.stack(level=1).rename_axis(['Datetime','Ticker']).reset_index()
#         else:
#             df.reset_index(inplace=True)
#             df["Ticker"] = t

#         # --- Ensure column order ---
#         df = df[['Datetime','Ticker','Open','High','Low','Close','Volume']]
        
#         df.to_sql("stock_prices", connection_object, if_exists="append", index=False)


In [19]:
# #---- fetching stock data using yfinance -----------------
# import pandas as pd
# import yfinance as yf

# def fetch_stocks(tickers=["HDFCBANK.NS","ICICIBANK.NS","SBIN.NS"]):
#     for x in tickers:
#         df = yf.download(x, period="7d", interval="1h")
#         df.columns = [f"{c[0]}_{c[1]}" if isinstance(c, tuple) else c for c in df.columns]
#         df.reset_index(inplace=True)
#         df["ticker"] = x # Since df return [Open High...Volume], No description about company name
        
#         # The above step ensures the name of the company in each row.
#         # Datetime     Open   High    Low   Close   Volume     ticker
#         # 2025-09-18   09:15:00  1570.5  1581.0  1565.0  1578.2   1200000  HDFCBANK.NS
#         # 2025-09-18   10:15:00  1578.3  1585.5  1572.0  1581.3    980000  HDFCBANK.NS
#         df.to_sql("stock_prices", connection_object, if_exists="append", index=False)

In [4]:
# import yfinance as yf
# import pandas as pd

# def fetch_stock_single(ticker_symbol="AAPL", period="7d", interval="1h"):
#     ticker = yf.Ticker(ticker_symbol)
#     df = ticker.history(period=period, interval=interval)
    
#     df.reset_index(inplace=True)          
#     df["Ticker"] = ticker_symbol          
    
#     return df

In [8]:
import yfinance as yf
import pandas as pd

def fetch_stocks(tickers=["HDFCBANK.NS","ICICIBANK.NS","SBIN.NS"]):
    full_df = pd.DataFrame()
    
    for t in tickers:
        ticker = yf.Ticker(t)
        df = ticker.history(period="7d", interval="1h")
        df.reset_index(inplace=True)
        df["Ticker"] = t                 # Standard column name for all tickers
        full_df = pd.concat([full_df, df], ignore_index=True)
        
    
    return full_df

<ul>
    <li>fetch_news() pulls all articles and dumps the raw API response into a JSON file inside data/raw/</li>
    <li>That’s the archive or backup form of data</li>
    <li>The below function reads back that raw JSON file. Extracts just a lightweight subset of fields (source, title, published date, path to raw file) and stores those into SQL.</li>
    <li>Now we can query it like a database</li>
    <li>e.g: SELECT title, publishedAt FROM news_meta</li>
    <li>This helps in Exploration and Monioring</li>
    <li>Debugging/Auditing</li>
</ul>


In [6]:
def store_news_metadata(news_json_path):
    with open(news_json_path, "r", encoding="utf8") as f:
        data = json.load(f)
    articles = data.get("articles", [])
    meta = []
    for a in articles:
        meta.append((
            a.get("source", {}).get("name"),
            a.get("title"),
            a.get("publishedAt"),
            news_json_path
        ))
    df = pd.DataFrame(meta, columns=["source", "title", "publishedAt", "raw_file"])
    df.to_sql("news_meta", connection_object, if_exists="append", index=False)

<p>
json_response = {
    "articles": [
        {"source": {"name": "CNN"}, "title": "Stocks Up", "publishedAt": "2025-09-25"},
        {"source": {"name": "BBC"}, "title": "Market Crash", "publishedAt": "2025-09-24"}
    ]
}
</
    p>

In [9]:
if __name__ == "__main__":
    news_file = fetch_news() # returns a file 
    store_news_metadata(news_file) 
    fetch_stocks() 
    connection_object.close() 
    print("Data ingestion completed.")

Data ingestion completed.


<h5>Preprocessing & Chunking News Articles</h5>

In [7]:
raw_json_files = [x for x in os.listdir(RAW_DIR) if x.endswith(".json")]

for file in raw_json_files:
    filepath = os.path.join(RAW_DIR, file)
    with open(filepath, "r", encoding="utf8") as fp:
        data = json.load(fp)   

In [8]:
type(data)

dict

In [9]:
data.keys()

dict_keys(['status', 'totalResults', 'articles'])

In [10]:
data["totalResults"]

29884

In [12]:
articles = data.get("articles", [])
articles[0]

{'source': {'id': 'the-times-of-india', 'name': 'The Times of India'},
 'author': 'Nandini Sanyal',
 'title': 'Indian equities still expensive despite correction: Raunak Onkar, PPFAS',
 'description': 'PPFAS Mutual Fund is adopting a cautious stance amid stretched valuations in Indian markets, favoring selective investments in private banks, auto stocks, and international equities. The fund house emphasizes holding higher cash levels, awaiting better opport…',
 'url': 'https://economictimes.indiatimes.com/markets/expert-view/indian-equities-still-expensive-despite-correction-raunak-onkar-ppfas/articleshow/124114852.cms',
 'urlToImage': 'https://img.etimg.com/thumb/msid-124114897,width-1200,height-630,imgsize-41888,overlay-etmarkets/articleshow.jpg',
 'publishedAt': '2025-09-25T10:36:39Z',
 'content': 'Indian markets may be going through a correction phase, but valuations remain stretched, making fresh investments tricky, according to Raunak Onkar, Research Head &amp; Fund Manager at PP

In [13]:
import re

CHUNK_DIR = BASE_DIR / "data" / "chunks"
CHUNK_DIR.mkdir(parents=True, exist_ok=True)

def clean_text(text:str):
    if not text:
        return ""
    # removing HTML tags
    text = re.sub(r"<.*?>", "", text)
    # removing multiple spaces
    text = re.sub(r"\s+", " ", text)
    return text.strip()

In [14]:
def chunk_text(text: str, chunk_size=300, overlap=50):
    words = text.split()
    chunks = []
    for x in range(0, len(words), chunk_size-overlap):
        chunk = words[x:x+chunk_size]
        if len(chunk)<30:
            continue
        chunk_text = " ".join(chunk)
        chunks.append(chunk_text)
    return chunks

In [15]:
def process_news_file(news_json_path):
    with open(news_json_path, "r", encoding="utf8") as f:
        data = json.load(f)

    articles = data.get("articles", [])
    chunk_records = []

    for art in articles:
        source = art.get("source", {}).get("name")
        title = clean_text(art.get("title", ""))
        content = clean_text(art.get("content", "")) or clean_text(art.get("description", ""))

        if not content:
            continue

        # chunk the article
        chunks = chunk_text(content)

        for i, ch in enumerate(chunks):
            chunk_records.append({
                "source": source,
                "title": title,
                "publishedAt": art.get("publishedAt"),
                "chunk_id": f"{title[:30]}_{i}",
                "text": ch,
                "raw_file": str(news_json_path)
            })

    # save chunked output
    if chunk_records:
        df = pd.DataFrame(chunk_records)
        ts = os.path.basename(news_json_path).replace(".json", "")
        out_path = CHUNK_DIR / f"chunks_{ts}.jsonl"
        df.to_json(out_path, orient="records", lines=True, force_ascii=False)
        print(f"Saved chunks → {out_path}")

In [16]:
if __name__ == "__main__":
    news_files = [x for x in os.listdir(RAW_DIR) if x.endswith(".json")]
    for nf in news_files:
        full_path = RAW_DIR / nf
        process_news_file(full_path)

Saved chunks → C:\Users\DELL\Data Science\Deep Learning\NLP\Projects\Financial_Insights\data\chunks\chunks_news_2025092610.jsonl


<h3>Generate Embeddings & Store in Chroma</h3>

In [17]:
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings

CHROMA_DIR = BASE_DIR / "chroma_db"
CHROMA_DIR.mkdir(parents=True, exist_ok=True)

def load_chunks(jsonl_path):
    df = pd.read_json(jsonl_path, lines=True)
    return df

In [19]:
embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")

def create_chroma_vectorstore(df, persist_dir):
    texts = df["text"].tolist()
    metadatas = df.drop(columns=["text"]).to_dict(orient="records")

    vectordb = Chroma.from_texts(
        texts=texts,
        embedding=embedding_model,
        metadatas=metadatas,
        persist_directory=str(persist_dir)
    )
    vectordb.persist()
    print(f"Saved vectorstore at {persist_dir}")

In [20]:
if __name__ == "__main__":
    chunk_files = sorted(CHUNK_DIR.glob("*.jsonl"))
    for cf in chunk_files:
        df = load_chunks(cf)
        create_chroma_vectorstore(df, CHROMA_DIR)

Saved vectorstore at C:\Users\DELL\Data Science\Deep Learning\NLP\Projects\Financial_Insights\chroma_db


  vectordb.persist()


<h3>Retrieval Augmented Generation (RAG) with Ollama</h3>

In [22]:
from langchain_community.llms import Ollama
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
# RetrievalQA = auto handles retrieval + context injection (less boilerplate than manual LLMChain(context))


# ---- Load vector store ----
vectordb = Chroma(
    persist_directory="./chroma_db", 
    embedding_function=embedding_model
)

# ---- Set up retriever ----
retriever = vectordb.as_retriever(
    search_kwargs={"k": 3}   # fetch top 3 relevant chunks
)

# ---- Load Ollama LLM ----
llm = Ollama(model="mistral")

# ---- Build RetrievalQA chain ----
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    retriever=retriever,
    return_source_documents=True # return_source_documents=True means,
    #along with the answer, also return the documents that were retrieved and used as context.
)

# ---- Example query ----
query = "What are the recent developments in the Indian banking sector?"
response = qa_chain.invoke(query)

In [23]:
print("Answer:\n", response["result"])
print("\nSources:")
for doc in response["source_documents"]:
    print("-", doc.metadata)

Answer:
  As of my last update, there have been several significant developments in the Indian banking sector. Here are a few key points:

1. Digital Transformation: Banks like HDFC Bank, ICICI Bank, and Axis Bank have been investing heavily in digital technology to enhance customer experience, increase efficiency, and reduce costs. This includes the adoption of AI, machine learning, blockchain, and other advanced technologies.

2. Consolidation: The Indian government is pushing for consolidation among public sector banks to create fewer but stronger entities. In October 2020, five state-owned banks merged into one, creating the country's third-largest lender, Punjab National Bank (PNB).

3. Regulatory Measures: The Reserve Bank of India (RBI) has implemented several regulatory measures aimed at strengthening the banking sector. These include stricter capital adequacy norms, stress tests for banks, and measures to combat fraud and non-performing assets.

4. Fintech Collaborations: Indi

In [10]:
BASE_DIR

WindowsPath('C:/Users/DELL/Data Science/Deep Learning/NLP/Projects/Financial_Insights')

In [12]:
import os
import json
import datetime
import sqlite3
import requests
import yfinance
import pandas
from dotenv import load_dotenv
import langchain
import langchain_community
import chromadb
import transformers

from pathlib import Path

# Print versions
print("os - built-in, no version")
print("json - built-in, no version")
print("datetime - built-in, no version")
print("sqlite3 - built-in, no version")
print("pathlib - built-in, no version")

print("requests", requests.__version__)
print("yfinance", yfinance.__version__)
print("pandas", pandas.__version__)
#print("python-dotenv", dotenv.__version__)
print("langchain", langchain.__version__)
print("langchain_community", langchain_community.__version__)
print("chromadb", chromadb.__version__)
print("transformers", transformers.__version__)

import importlib.metadata

print("python-dotenv", importlib.metadata.version("python-dotenv"))


os - built-in, no version
json - built-in, no version
datetime - built-in, no version
sqlite3 - built-in, no version
pathlib - built-in, no version
requests 2.32.5
yfinance 0.2.66
pandas 2.2.2
langchain 0.3.27
langchain_community 0.3.29
chromadb 1.1.0
transformers 4.53.2
python-dotenv 1.1.1
