# Notebook 1: Data Ingestion & LLM Stock Extraction

**Goal**: Validate the news scraping and LLM-based stock recommendation extraction pipeline

**Workflow**:
1. Scrape financial news from Indian websites
2. Extract stock recommendations using Google Gemini API
3. Validate stock symbols against NSE list
4. Save results for next phase

## 1. Setup & Imports

In [1]:
import os
import json
import requests
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
from pathlib import Path
import re

# LLM
import google.generativeai as genai

# Utilities
import warnings
warnings.filterwarnings('ignore')

# Create necessary directories
Path("data/raw/news").mkdir(parents=True, exist_ok=True)
Path("data/processed/recommendations").mkdir(parents=True, exist_ok=True)

print("Setup complete!")

Setup complete!


## 2. Configuration

In [2]:
# # API Configuration
WORLD_NEWS_API_KEY = os.getenv("WORLD_NEWS_API_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

# # Configure Gemini
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel('gemini-2.5-flash')

# Date for this run
TODAY = datetime.now().strftime("%Y-%m-%d")
print(f"Running for date: {TODAY}")

Running for date: 2025-10-06


## 3. News Scraping Functions

In [3]:
# worldnewsapi.com search-news API : https://worldnewsapi.com/docs/search-news/
def world_news_api_function():
    # Calculate the timestamp for 24 hours ago
    published_after = (datetime.now() - timedelta(hours=24)).strftime('%Y-%m-%d')
    
    params = {
        'text': 'nifty OR stock OR share OR sensex OR market OR equity OR buy OR sell',
        'source-country': 'in',
        'language': 'en',
        'earliest-publish-date': published_after,
        'number': 20
    }
    
    headers = {
        'x-api-key': WORLD_NEWS_API_KEY
    }
    
    response = requests.get(
        'https://api.worldnewsapi.com/search-news',
        params=params,
        headers=headers,
        timeout=10
    )
    
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}"

In [4]:
# Get articles
articles = world_news_api_function()

In [5]:
# articles.get('news')[2]

In [6]:
# refine the 'text' section of the articles
# remove the text after 'Read more','About','Also Read',  if present
for article in articles.get('news', []):
    article['text'] = re.split(r'Read more|About|Also Read', article['text'], flags=re.IGNORECASE)[0].strip()

# Save to json file
TODAY = datetime.now().strftime("%Y-%m-%d")
with open(f"data/raw/news/worldnewsapi_{TODAY}.json", "w") as f:
    json.dump(articles, f, indent=4)

print(f"Data saved to data/raw/news/worldnewsapi_{TODAY}.json")

Data saved to data/raw/news/worldnewsapi_2025-10-06.json


## 4. LLM Stock Extraction Functions

In [7]:
# Load filtered stock market listed equity data
with open("data/processed/filtered_stock_data.json", 'r') as f:
    stocks_db = json.load(f)

In [8]:
def create_lookup_dict(stocks_db) -> Dict:
    """Create lookup dictionaries for efficient stock validation"""
    lookup = {
        'by_name': {},
        'by_symbol': {},
    }

    for stock in stocks_db:
        # Normalize names for matching
        normalized_name = stock['name'].lower().strip()
        lookup['by_name'][normalized_name] = stock
        lookup['by_symbol'][stock['trading_symbol'].upper()] = stock
        
        # Add common variations
        # Remove "LIMITED", "LTD", "PVT" etc.
        short_name = normalized_name.replace(' limited', '').replace(' ltd', '')
        short_name = short_name.replace(' pvt', '').strip()
        lookup['by_name'][short_name] = stock
        
    return lookup

In [9]:
stock_lookup = create_lookup_dict(stocks_db)

In [10]:
# store the lookup dictionary for later use
with open("data/processed/stock_lookup.json", "w") as f:
    json.dump(stock_lookup, f, indent=4)

In [11]:
# Create extraction prompt for news articles

def create_extraction_prompt(news_items: List[Dict]) -> str:
    """Create prompt for Gemini to extract stock recommendations"""
    
    # Prepare news text
    news_text = ""
    for idx, item in enumerate(news_items, 1):
        news_text += f"\n\n--- NEWS ID: {item['id']} ---\n"
        news_text += f"Title: {item['title']}\n"
        news_text += f"Text: {item['text']}\n"
        news_text += f"Summary: {item['summary']}\n"
    
    prompt = f"""You are a financial analyst expert in Indian stock markets (NSE/BSE).
Extract stock information from financial news articles based on market-relevant events.

{news_text}

TASK:
Extract stocks mentioned in these contexts:

1. **EXPLICIT RECOMMENDATIONS**: Buy/Sell/Hold ratings with target prices
2. **IPO ANNOUNCEMENTS**: Upcoming listings, IPO launches
3. **EARNINGS/RESULTS**: Strong/weak quarterly results, earnings beats/misses
4. **CORPORATE ACTIONS**: Stock splits, dividends, buybacks
5. **CONTRACT WINS**: Major order announcements, government contracts
6. **ANALYST COVERAGE**: Stocks added to watchlists, coverage initiations
7. **SIGNIFICANT NEWS**: Strategic deals, expansions, regulatory approvals

EXTRACTION RULES:
- Extract ANY stock with market-relevant news (not just formal recommendations)
- Use exact company names as mentioned
- Infer sentiment from context (positive news → "BUY_SIGNAL", negative → "SELL_SIGNAL")
- Set appropriate action_to_take based on news type
- Extract IPOs even if just announced (set is_ipo=true, action="IPO_WATCH")

ACTION MAPPING:
- Explicit "Buy" recommendation → "BUY"
- Explicit "Sell" recommendation → "SELL"  
- Explicit "Hold" recommendation → "HOLD"
- Positive news (earnings beat, contract win, etc.) → "BUY_SIGNAL"
- Negative news (earnings miss, loss, etc.) → "SELL_SIGNAL"
- Neutral/watchlist mention → "WATCH"
- IPO announcement → "IPO_WATCH"

**DEDUPLICATION:**
If same stock appears multiple times in one article:
- Conflicting signals: Create separate entries
- Similar signals: Merge with combined reasoning, average prices, lowest confidence

OUTPUT FORMAT (JSON):
[
{{
    "news_id": <news_id>,
    "stock_name": "<company name>",
    "is_ipo": true|false,
    "ipo_details": {{
        "expected_listing_date": "<date or null>",
        "price_range": "<range or null>",
        "issue_size": "<size or null>"
    }} or null,
    "news_type": "recommendation|ipo|earnings|contract|corporate_action|analyst_coverage|strategic",
    "reason_for_recommendation": "<specific catalyst/reason>",
    "action_to_take": "BUY|SELL|HOLD|BUY_SIGNAL|SELL_SIGNAL|WATCH|IPO_WATCH|null",
    "buy_price": <float or null>,
    "target_price": <float or null>,
    "target_price_range": {{"min": <float>, "max": <float>}} or null,
    "timeframe": "<timeframe or null>",
    "confidence": <0.0 to 1.0>,
    "sentiment": "positive|negative|neutral",
    "analyst_consensus": "unanimous|mixed|conflicting|null"
}}
]

EXAMPLES:

Example 1 - Earnings Beat:
"Reliance Industries reported strong Q2 earnings with 25% YoY profit growth"
{{
    "stock_name": "Reliance Industries",
    "is_ipo": false,
    "news_type": "earnings",
    "reason_for_recommendation": "Strong Q2 earnings with 25% YoY profit growth",
    "action_to_take": "BUY_SIGNAL",
    "sentiment": "positive",
    "confidence": 0.75
}}

Example 2 - Contract Win:
"L&T awarded Rs 5000 crore highway construction contract by government"
{{
    "stock_name": "L&T",
    "is_ipo": false,
    "news_type": "contract",
    "reason_for_recommendation": "Awarded Rs 5000 crore government highway contract",
    "action_to_take": "BUY_SIGNAL",
    "sentiment": "positive",
    "confidence": 0.8
}}

Example 3 - Stock Split:
"Tata Motors announces 1:2 stock split to improve liquidity"
{{
    "stock_name": "Tata Motors",
    "is_ipo": false,
    "news_type": "corporate_action",
    "reason_for_recommendation": "1:2 stock split announced to improve liquidity",
    "action_to_take": "WATCH",
    "sentiment": "neutral",
    "confidence": 0.65
}}

Example 4 - Watchlist:
"Add these 5 stocks to your radar: HDFC Bank, Infosys..."
{{
    "stock_name": "HDFC Bank",
    "is_ipo": false,
    "news_type": "analyst_coverage",
    "reason_for_recommendation": "Added to analyst watchlist",
    "action_to_take": "WATCH",
    "sentiment": "positive",
    "confidence": 0.6
}}

Example 5 - IPO:
"Tata Capital's $1.7B IPO opens next week"
{{
    "stock_name": "Tata Capital Ltd.",
    "is_ipo": true,
    "ipo_details": {{"issue_size": "$1.7 billion"}},
    "news_type": "ipo",
    "reason_for_recommendation": "Upcoming $1.7B IPO from Tata Group",
    "action_to_take": "IPO_WATCH",
    "sentiment": "neutral",
    "confidence": 0.7
}}

Example 6 - Explicit Rating:
"Analysts recommend buying HDFC Bank with target ₹1800"
{{
    "stock_name": "HDFC Bank",
    "is_ipo": false,
    "news_type": "recommendation",
    "reason_for_recommendation": "Analyst buy recommendation",
    "action_to_take": "BUY",
    "target_price": 1800,
    "sentiment": "positive",
    "confidence": 0.9
}}

Example 7 - Negative News:
"Zomato reports quarterly loss, misses revenue estimates"
{{
    "stock_name": "Zomato",
    "is_ipo": false,
    "news_type": "earnings",
    "reason_for_recommendation": "Quarterly loss, missed revenue estimates",
    "action_to_take": "SELL_SIGNAL",
    "sentiment": "negative",
    "confidence": 0.75
}}

If NO market-relevant stocks found:
{{
    "news_id": <news_id>,
    "stock_name": null,
    "is_ipo": null,
    "ipo_details": null,
    "news_type": null,
    "reason_for_recommendation": null,
    "action_to_take": null,
    "buy_price": null,
    "target_price": null,
    "target_price_range": null,
    "timeframe": null,
    "confidence": 0.0,
    "sentiment": null,
    "analyst_consensus": null
}}

Return ONLY valid JSON, no additional text."""
    
    return prompt

In [12]:
# Load news data
TODAY = datetime.now().strftime("%Y-%m-%d")
news_file = f"data/raw/news/worldnewsapi_{TODAY}.json"

with open(news_file, 'r') as f:
    news_data = json.load(f)

# Get news items
news_items = news_data.get('news', [])

print(f"Processing {len(news_items)} news items...")

Processing 20 news items...


In [13]:
prompt = create_extraction_prompt(news_items)

In [14]:
# save the prompt to a text file for reference
with open(f"data/processed/recommendations/extraction_prompt_{TODAY}.txt", "w") as f:
    f.write(prompt)

In [15]:
def extract_recommendations(prompt):
    """Extract recommendations using Gemini"""
    
    
    try:
        response = model.generate_content(prompt)
        
        # Extract JSON from response
        response_text = response.text.strip()
        
        # Remove markdown code blocks if present
        if response_text.startswith('```'):
            response_text = response_text.split('```')[1]
            if response_text.startswith('json'):
                response_text = response_text[4:]
        
        recommendations = json.loads(response_text.strip())
        return recommendations
        
    except json.JSONDecodeError as e:
        print(f"JSON parsing error: {e}")
        print(f"Response: {response.text}")
        return []
    except Exception as e:
        print(f"Error calling Gemini API: {e}")
        return []

In [16]:
LLM_recommendations = extract_recommendations(prompt)

E0000 00:00:1759751640.918505  211628 alts_credentials.cc:93] ALTS creds ignored. Not running on GCP and untrusted ALTS is not enabled.


In [17]:
# print(f"LLM Recommendations: {LLM_recommendations}")

In [18]:
# save recommendations to json file
with open(f"data/processed/recommendations/llm_recommendations_{TODAY}.json", "w") as f:
    json.dump(LLM_recommendations, f, indent=4)

## 5. Stock Symbol Validation

In [19]:
def validate_and_enrich_stock(stock_name: str, is_ipo: bool = False) -> Tuple[Optional[Dict], Optional[str]]:
    """Validate stock name and return enriched data"""
    if not stock_name:
        return None, None
    
    if is_ipo:
        # Create a placeholder stock entry for IPO
        ipo_stock = {
            'name': stock_name,
            'trading_symbol': 'IPO_PENDING',
            'instrument_key': None,
            'isin': None,
            'is_ipo': True
        }
        return ipo_stock, "ipo_stock"
    
    stock_name = stock_name.strip()
    # print(f"Validating stock: {stock_name}")
    
    # Try exact symbol match first
    if stock_name.upper() in stock_lookup['by_symbol']:
        return stock_lookup['by_symbol'][stock_name.upper()], "symbol match"
    
    # Try exact name match
    normalized = stock_name.lower()
    if normalized in stock_lookup['by_name']:
        return stock_lookup['by_name'][normalized], "name match"
    
    # Normalize the input for better matching
    def normalize_for_matching(text):
        """Remove common suffixes and standardize text"""
        text = text.lower()
        # Remove common suffixes
        text = re.sub(r'\s+(ltd\.?|limited|pvt\.?|private|inc\.?|incorporated|corp\.?|corporation)$', '', text)
        # Remove extra whitespace and special characters
        text = re.sub(r'[&\-\.\,]', ' ', text)
        text = re.sub(r'\s+', ' ', text).strip()
        return text
    
    def extract_key_words(text):
        """Extract significant words, handling abbreviations"""
        text = normalize_for_matching(text)
        words = text.split()
        # Remove very common filler words
        stop_words = {'and', 'the', 'of', 'a', 'an', 'in', 'on', 'at', 'to', 'for'}
        return [w for w in words if w not in stop_words and len(w) > 1]
    
    def is_abbreviation_match(word, abbreviated):
        """Check if abbreviated form matches word (e.g., 'distillers' matches 'distils')"""
        if word.startswith(abbreviated) or abbreviated.startswith(word):
            return True
        # Check if it could be an abbreviation (first few letters match)
        if len(abbreviated) >= 4 and len(word) >= 4:
            return abbreviated[:4] == word[:4]
        return False
    
    def matches_acronym(acronym, full_name):
        """Check if acronym matches the first letters of words in full name"""
        acronym = acronym.upper()
        words = extract_key_words(full_name)
        
        if len(acronym) != len(words):
            return False
        
        for i, word in enumerate(words):
            if i >= len(acronym):
                return False
            if word[0].upper() != acronym[i]:
                return False
        
        return True
    
    normalized_input = normalize_for_matching(stock_name)
    input_words = extract_key_words(stock_name)
    
    # Special handling for bank stocks
    bank_keywords = ['hdfc', 'icici', 'sbi', 'axis', 'kotak', 'bank']
    is_bank_query = any(keyword in normalized_input for keyword in bank_keywords)
    
    if is_bank_query:
        # For bank queries, we want to match actual banks, not ETFs or AMCs
        best_match = None
        best_score = 0
        
        for key, stock in stock_lookup['by_name'].items():
            key_lower = key.lower()
            normalized_key = normalize_for_matching(key)
            
            # Skip ETFs and AMCs when looking for banks
            if 'etf' in key_lower or 'amc' in key_lower or 'pramc' in key_lower:
                continue
            
            # For insurance companies, only match if "insurance" or "life" is in the query
            if 'insurance' in key_lower or 'life' in key_lower:
                if 'insurance' not in normalized_input and 'life' not in normalized_input:
                    continue
            
            # Must contain "bank" for bank queries (unless it's already in the input)
            if 'bank' not in key_lower:
                continue
            
            # Check short_name field if available
            short_name_match = False
            if 'short_name' in stock:
                short_name_lower = stock['short_name'].lower()
                if normalized_input == short_name_lower or stock_name.upper() == stock['short_name'].upper():
                    return stock, "short name match"
                if normalized_input in short_name_lower or short_name_lower in normalized_input:
                    short_name_match = True
            
            # Check for acronym match (e.g., SBI -> State Bank India)
            acronym_match = False
            if len(stock_name) <= 5 and stock_name.isalpha() and stock_name.isupper():
                if matches_acronym(stock_name, key):
                    acronym_match = True
            
            key_words = extract_key_words(key)
            
            # Calculate match score
            matches = 0
            for input_word in input_words:
                for key_word in key_words:
                    if input_word == key_word or is_abbreviation_match(input_word, key_word):
                        matches += 1
                        break
            
            # For short queries like "HDFC", "ICICI", "SBI", be more lenient
            if len(input_words) <= 2:
                # Check if any input word is in the key
                for input_word in input_words:
                    if input_word in normalized_key:
                        matches += 2  # Boost score for direct substring match
            
            score = matches / max(len(input_words), 1)
            
            # Boost for acronym match
            if acronym_match:
                score += 1.0
            
            # Boost for short name match
            if short_name_match:
                score += 0.8
            
            # Prioritize stocks with "bank ltd" or "bank limited" (actual banks)
            if 'bank ltd' in key_lower or 'bank limited' in key_lower:
                score += 0.5
            
            if score > best_score:
                best_score = score
                best_match = stock
        
        if best_match and best_score > 0.5:
            return best_match, "bank priority match"
    
    # General fuzzy matching for all stocks
    best_match = None
    best_score = 0
    
    for key, stock in stock_lookup['by_name'].items():
        normalized_key = normalize_for_matching(key)
        key_words = extract_key_words(key)
        
        # Check short_name field if available
        if 'short_name' in stock:
            short_name_lower = stock['short_name'].lower()
            if normalized_input == short_name_lower or stock_name.upper() == stock['short_name'].upper():
                return stock, "short name match"
        
        # Calculate matching score based on word overlap with abbreviation support
        matches = 0
        for input_word in input_words:
            for key_word in key_words:
                if input_word == key_word or is_abbreviation_match(input_word, key_word):
                    matches += 1
                    break
        
        # Need at least half of the input words to match
        if len(input_words) == 0:
            continue
            
        score = matches / len(input_words)
        
        # Boost score if there's also a substring match
        if normalized_input in normalized_key or normalized_key in normalized_input:
            score += 0.3
        
        # Also check if most of the key words are matched (for abbreviated cases)
        if len(key_words) > 0:
            reverse_score = matches / len(key_words)
            score = max(score, reverse_score)
        
        if score > best_score:
            best_score = score
            best_match = stock
    
    # Lower threshold for better recall
    if best_match and best_score > 0.4:
        return best_match, "fuzzy match"
    
    return None, None

In [25]:
def process_news(news_items: List[Dict]) -> List[Dict]:
    """Processes the news items and return validated recommendations"""
    
    # Create news lookup for quick access
    news_lookup = {item['id']: item for item in news_items}

    final_recommendations = []

    # Validate and enrich recommendations
    for rec in LLM_recommendations:
        news_id = int(rec['news_id'])
        news_item = news_lookup.get(news_id)

        if not rec['stock_name']:
            continue

        # Validate stock
        is_ipo = rec.get('is_ipo', False)
        stock_data, method_used = validate_and_enrich_stock(rec.get('stock_name'), is_ipo=is_ipo)
        
        if stock_data:
            # Build final recommendation object
            recommendation = {
                # News information
                "news_id": news_id,
                "news_url": news_item['url'],
                "publish_date": news_item['publish_date'],
                
                # Equity information
                "equity_name": stock_data['name'],
                "instrument_key": stock_data['instrument_key'],
                "trading_symbol": stock_data['trading_symbol'],
                "isin": stock_data['isin'],
                
                # LLM extracted information
                "is_ipo": is_ipo,
                "ipo_details": rec.get('ipo_details') if is_ipo else None,
                "news_type": rec.get('news_type'),
                "reason_for_recommendation": rec.get('reason_for_recommendation'),
                "action_to_take": rec.get('action_to_take'),
                "buy_price": rec.get('buy_price'),
                "target_price": rec.get('target_price'),
                "target_price_range": rec.get('target_price_range'),
                "timeframe": rec.get('timeframe'),
                "confidence": rec.get('confidence', 0.0),
                "analyst_consensus": rec.get('analyst_consensus'),
                
                # Metadata
                "extraction_timestamp": datetime.now().isoformat(),
                "validated": True,
                'validation_method_used': method_used,
            }
        else:
            # Stock not found in database
            recommendation = {
                "news_id": news_id,
                "news_url": news_item['url'],
                "publish_date": news_item['publish_date'],
                
                "equity_name": rec.get('stock_name'),
                "instrument_key": None,
                "trading_symbol": None,
                "isin": None,
                
                "is_ipo": is_ipo,
                "ipo_details": rec.get('ipo_details') if is_ipo else None,
                "news_type": rec.get('news_type'),
                "reason_for_recommendation": rec.get('reason_for_recommendation'),
                "action_to_take": rec.get('action_to_take'),
                "buy_price": rec.get('buy_price'),
                "target_price": rec.get('target_price'),
                "target_price_range": rec.get('target_price_range'), 
                "timeframe": rec.get('timeframe'),
                "confidence": rec.get('confidence', 0.0),
                "analyst_consensus": rec.get('analyst_consensus'), 
                
                "extraction_timestamp": datetime.now().isoformat(),
                "validated": False,
                "validation_error": "Stock not found in master database"
            }
        
        final_recommendations.append(recommendation)
    
    return final_recommendations

In [26]:
all_recommendations = process_news(news_items)

In [27]:
# print(f"all_recommendations: {all_recommendations}")

## Save the Final Results

In [28]:
# Filter only validated recommendations
validated_recommendations = [rec for rec in all_recommendations if rec.get('validated')]

# Save results
output_dir = Path("data/processed/recommendations")
output_dir.mkdir(parents=True, exist_ok=True)

# Save all recommendations
all_output_file = output_dir / f"all_recommendations_{TODAY}.json"
with open(all_output_file, 'w') as f:
    json.dump(all_recommendations, f, indent=2)

# Save only validated recommendations
validated_output_file = output_dir / f"validated_recommendations_{TODAY}.json"
with open(validated_output_file, 'w') as f:
    json.dump(validated_recommendations, f, indent=2)

print(f"\nResults:")
print(f"Total recommendations extracted: {len(all_recommendations)}")
print(f"Validated recommendations: {len(validated_recommendations)}")
print(f"\nSaved to:")
print(f"  All: {all_output_file}")
print(f"  Validated: {validated_output_file}")


Results:
Total recommendations extracted: 102
Validated recommendations: 101

Saved to:
  All: data/processed/recommendations/all_recommendations_2025-10-06.json
  Validated: data/processed/recommendations/validated_recommendations_2025-10-06.json


In [29]:
# Print sample recommendations
if validated_recommendations:
    print("\nSample validated recommendations:")
    for rec in validated_recommendations[:3]:
        print(f"\n- {rec['equity_name']} ({rec['trading_symbol']})")
        print(f"  Action: {rec['action_to_take']}")
        print(f"  Reason: {rec['reason_for_recommendation'][:100]}...")
        print(f"  Confidence: {rec['confidence']}")


Sample validated recommendations:

- TATA CONSULTANCY SERV LT (TCS)
  Action: WATCH
  Reason: Scheduled to announce Q2 results on October 9...
  Confidence: 0.6

- TATA ELXSI LIMITED (TATAELXSI)
  Action: WATCH
  Reason: Scheduled to announce Q2 results on October 9...
  Confidence: 0.6

- Tata Capital (IPO_PENDING)
  Action: IPO_WATCH
  Reason: IPO slated to open next week...
  Confidence: 0.8
