In [2]:
!pip install google-generativeai



In [118]:
from dotenv import load_dotenv
import google.generativeai as genai
from google.genai import types
import os
import requests
import json
from google import genai as gai
from collections import defaultdict
from datetime import datetime, timedelta
from collections import defaultdict, Counter
from transformers import BertTokenizer, BertForSequenceClassification, pipeline
import yfinance as yf
import pandas as pd

In [119]:
class SentimentScraper:

    def __init__(self):
        load_dotenv()
        API_KEY = os.getenv("GEMINI_API_KEY")

        if not API_KEY:
            raise ValueError("GEMINI_API_KEY not found in environment variables")

        try:
            self.client = gai.Client(api_key=API_KEY)
            print("Gemini Agent initialized successfully!!")
        except Exception as model_error:
            raise ValueError(f"Failed to initialize Gemini Model: {str(model_error)}")

        tokenizer = BertTokenizer.from_pretrained("ProsusAI/finbert")
        model = BertForSequenceClassification.from_pretrained("ProsusAI/finbert")
        self.finbert = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer, truncation=True)
        

    def get_data(self, ticker, time_period):
        prompt = f"""
            You are a financial research assistant.
            
            Given a stock ticker and a time period, gather the most relevant and recent mentions of the company across multiple sources such as news articles, social media (Twitter, Reddit), financial blogs, and press releases.

            Gather atleast 50 different articles
            
            Return only structured data in **strictly valid JSON** format with the following structure:
            
            {{
              "ticker": "<ticker_symbol>",
              "company_name": "<resolved_company_name>",
              "time_period": "<time_period>",
              "mentions": [
                {{
                  "source_type": "<news | twitter | reddit | blog | forum | press_release>",
                  "title_or_excerpt": "<title or post excerpt (if available)>",
                  "full_text": "<detailed content of the article or post. If it is short, make it long>",
                  "published_date": "<YYYY-MM-DD>",
                  "source_name": "<source or platform>",
                }}
              ]
            }}

            Source type must be one of these6 sources only:-
            1) news
            2) twitter
            3) reddit
            4) blog
            5) forum
            6) press_release
            
            Only include information that is relevant to the specified stock ticker within the given time period.  
            If the ticker cannot be resolved to a known public company, return an empty `mentions` list but still provide the `ticker` and `time_period`.
            Ensure that the `full_text` field contains enough content to be used as input for a sentiment analysis model. This can include the entire article or post, or a long excerpt with full context. Even if the actual content is less, process it more so as to get more context
            
            
            **Do not include any commentary or text outside of the JSON.**
            
            Ticker: {ticker}  
            Time Period: {time_period}
            
            Respond strictly with valid JSON.

        """

        try:
            # Generate content using Gemini model
            response = self.client.models.generate_content(
                model='gemini-2.0-flash',
                contents=prompt,
                config=types.GenerateContentConfig(
                    tools=[types.Tool(
                        google_search=types.GoogleSearchRetrieval()
                    )]
                )
            )
            
            # Get the raw text from the response
            response_text = response.text
            
            # Try to clean up the response to extract just the JSON part
            try:
                # First attempt: direct parsing
                data = json.loads(response_text)
            except json.JSONDecodeError:
                # Second attempt: Try to extract JSON from potential markdown or text
                # Look for JSON array pattern starting with [ and ending with ]
                import re
                json_pattern = r'\[[\s\S]*\]'
                json_match = re.search(json_pattern, response_text)
                
                if json_match:
                    try:
                        data = json.loads(json_match.group(0))
                    except json.JSONDecodeError as e:
                        print(f"Failed to parse extracted JSON pattern: {str(e)}")
                        print(f"Extracted pattern: {json_match.group(0)[:100]}...")
                        return results
                else:
                    print("No JSON array pattern found in the response")
                    print(f"Raw response: {response_text[:300]}...")
                    return
            
            # Filter out incomplete entries if any
            # valid_papers = [
            #     paper for paper in scholar_data 
            #     if "title" in paper and "summary" in paper
            # ]
            
            # Add source identifier
            # for paper in valid_papers:
            #     paper["source"] = "google_scholar"
            
            
        except Exception as e:
            print(f"Error generating results for '{ticker}': {str(e)}")
            print(f"Full error: {traceback.format_exc()}")

        # for i in range(len(results)):
        #     results[i]['url'] = self.get_actual_url(results[i]['url'])
        
        return data

    def store_mentions_by_source(self, ticker, time_period, output_path):

        mentions = []

        for i in range(10):
            data = self.get_data(ticker,time_period)
            mentions.extend(data)
        
        grouped = defaultdict(list)
        for mention in mentions:
            source = mention.get("source_type", "unknown").lower()
            grouped[source].append(mention)
    
        output = {
            "ticker": ticker,
            "time_period": time_period,
            "collected_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
            "grouped_mentions": grouped
        }
    
        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(output, f, indent=2, ensure_ascii=False)


    def analyze_sentiment(self,filename):
        
        with open(filename, "r") as f:
            data = json.load(f)
        
        source_sentiments = defaultdict(list)
        
        for source, entries in data["grouped_mentions"].items():
            for entry in entries:
                text = entry['title_or_excerpt'] + entry["full_text"]
                result = self.finbert(text[:512])[0]
                source_sentiments[source].append(result["label"].lower())
        
        aggregated_results = {}
        total_counter = Counter()
        
        for source, sentiments in source_sentiments.items():
            count = Counter(sentiments)
            total = sum(count.values())
            aggregated_results[source] = {
                "positive": count["positive"] / total if total else 0,
                "neutral": count["neutral"] / total if total else 0,
                "negative": count["negative"] / total if total else 0,
                "total_mentions": total
            }
            total_counter.update(count)
        
        overall_total = sum(total_counter.values())
        overall_sentiment = {
            "positive": total_counter["positive"] / overall_total if overall_total else 0,
            "neutral": total_counter["neutral"] / overall_total if overall_total else 0,
            "negative": total_counter["negative"] / overall_total if overall_total else 0,
            "total_mentions": overall_total
        }
        
        output = {
            "ticker": data["ticker"],
            "time_period": data["time_period"],
            "source_wise_sentiment": aggregated_results,
            "overall_sentiment": overall_sentiment
        }
        
        with open("sentiment_results.json", "w") as f:
            json.dump(output, f, indent=2)

    def fetch_and_store_historical(self, ticker: str, period: str):

        period_map = {
            "1w": 7,
            "1mo": 30,
            "6mo": 182,
            "1yr": 365,
            "2yr": 730,
        }
        
        if period not in period_map:
            raise ValueError(f"Invalid period '{period}'. Choose from {list(period_map.keys())}")

        end_date = datetime.today()
        start_date = end_date - timedelta(days=period_map[period])
        end_str = end_date.strftime("%Y-%m-%d")
        start_str = start_date.strftime("%Y-%m-%d")

        stock = yf.Ticker(ticker)
        hist = stock.history(start=start_str, end=end_str)
        hist.reset_index(inplace=True)

        # Convert dataframe to list of dicts for JSON serialization
        data = hist.to_dict(orient="records")

        filename = f"{ticker}_{period}_historical.json"
        with open(filename, "w") as f:
            json.dump(data, f, indent=4, default=str)

        return filename

In [120]:
ss = SentimentScraper()

Gemini Agent initialized successfully!!


  return torch.load(checkpoint_file, map_location=map_location)


In [121]:
ss.fetch_and_store_historical("AAPL","6mo")

'AAPL_6mo_historical.json'

In [88]:
data

[{'source_type': 'press_release',
  'title_or_excerpt': 'Apple unveils powerful accessibility features coming later this year',
  'full_text': 'Apple has announced new accessibility features coming later this year, including eye tracking, which lets users control iPad and iPhone with their eyes; Vocal Shortcuts, which allows users to create custom voice commands; and Vehicle Motion Cues, which can help reduce motion sickness for passengers [1, 5, 7, 19]. These features are designed to make Apple devices more accessible and user-friendly for people with disabilities [18].',
  'published_date': '2025-05-13',
  'source_name': 'Business Wire'},
 {'source_type': 'news',
  'title_or_excerpt': 'Chinese Retailers Offer Deep Discounts on Apple iPhones',
  'full_text': "Major Chinese e-commerce sites are offering discounts of up to 2,530 yuan ($351) on iPhone 16 models ahead of the '618' shopping festival [5, 8]. This move comes amid a 9% drop in shipments [8]. Apple is also experiencing increas

In [89]:
len(data)

23

In [90]:
ss.store_mentions_by_source("AAPL","Last 1 week","data.json")

  "collected_at": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),


In [107]:
ss.analyze_sentiment("data.json")