# NVDA_Daily_Sentiment_Pipeline_and_Analysis

- **Perform sentiment analysis on NVDA-related tweets and output daily sentiment scores.** This pipeline implements the full workflow—data loading, preprocessing, sentiment scoring, aggregation, and saving of results.

  

### 1. Data Loading & Preprocessing

- Read the raw tweet dataset (e.g. `cleaned_nvda.csv`), filter to the target years (e.g. 2019–2021), and keep only the relevant columns (date, cleaned text, tokenized text, emojis, etc.).
    
- Conduct basic integrity checks and descriptive statistics (total tweets, unique tweets, average length, most common words, emoji usage, etc.).
    

### 2. Financial vs. Non-Financial Classification

- Use regular expressions to detect finance-related terms in each tweet, splitting the dataset into “financial” and “non-financial” subsets.
    

### 3. Sentiment Analysis

- **Financial tweets:** Apply FinBERT (a BERT variant fine-tuned on finance text) to produce sentiment labels (positive/neutral/negative) and confidence scores.
    
- **Non-financial tweets:** Apply a general-purpose BERT model to each tweet, yielding both labels and scores.
    

### 4. Aggregation & Export

- Aggregate each tweet’s sentiment scores by date to compute a daily average sentiment.
    
- Log summary statistics and score ranges to facilitate downstream plotting and comparison with financial indicators.

In [1]:
import pandas as pd
import numpy as np
import ast
import re
import torch
import gc
import sys
import matplotlib.pyplot as plt
import seaborn as sns
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
from datetime import datetime
from collections import Counter
from tqdm import tqdm

In [2]:
def load_and_filter_data(filepath, start_year=2019, end_year=2022):
    """
    Read the CSV file and filter the data to the target years and rows.
    """
    try:
        df = pd.read_csv(filepath)

        # Only keep the relevant columns
        df = df[['Date', 'Cleaned_Tweet', 'Processed_Tweet', 'Emoji_Texts']]

        # Convert date type
        df['Date'] = pd.to_datetime(df['Date'], errors='coerce')

        # Remove invalid dates
        df = df.dropna(subset=['Date'])

        # Filter time range
        df = df[(df['Date'].dt.year >= start_year) & (df['Date'].dt.year <= end_year)]

        # Limit rows
        # df = df.head(n_rows)

        return df
        
    except Exception as e:
        print(f"Error loading data: {e}")
        return None

def validate_data(df):
    """Verify data integrity"""
    required_columns = ['Date', 'Cleaned_Tweet', 'Processed_Tweet', 'Emoji_Texts']
    if not all(col in df.columns for col in required_columns):
        print("Missing required columns")
        return False
    return True

In [3]:
def get_descriptive_stats(df):
    """
    Calculate descriptive statistics
    
    Args:
        df (pd.DataFrame): Input Dataframe
        
    Returns:
        pd.DataFrame: DataFrame with statistics (same structure as simple version)
    """
    # Keep the same return structure as simple version
    stats = {
        'total_tweets': len(df),
        'unique_tweets': df['Processed_Tweet'].nunique(),
        'avg_length': df['Processed_Tweet'].apply(lambda x: len(str(x).split())).mean(),
        'emoji_count': df['Emoji_Texts'].notna().sum(),
        'most_common_words': []
    }

    try:
        # Input validation
        required_columns = ['Processed_Tweet', 'Emoji_Texts']
        missing_columns = [col for col in required_columns if col not in df.columns]
        if missing_columns:
            raise ValueError(f"Missing required columns: {missing_columns}")

        # Counting the most common words
        all_words = ' '.join(df['Processed_Tweet'].fillna('')).split()
        all_words = [w for w in all_words if w.isalnum()]
        word_counts = Counter(all_words)
        stats['most_common_words'] = word_counts.most_common(20)
        stats['avg_tweet_length'] = df['Processed_Tweet'].str.len().mean()
        
        # Emoji statistics
        if 'Emoji_Texts' in df.columns:
            try:
                emoji_lists = df['Emoji_Texts'].apply(
                    lambda x: ast.literal_eval(x) if isinstance(x, str) else x if isinstance(x, list) else []
                )
                stats['emoji_count'] = emoji_lists.apply(len).sum()
            except:
                pass

    except Exception as e:
        print(f"Warning in get_descriptive_stats: {str(e)}")
        # Keep the data structure consistent even if there is an error
        stats['most_common_words'] = [('Error', 0)]

    return pd.DataFrame.from_dict(stats, orient='index', columns=['Value'])

In [4]:
# Emoji Distribution
def plot_top_emojis(df, emoji_column='Emoji_Texts', top_n=10, color='skyblue'):
    """
    Count and plot the most common emojis from a DataFrame
    
    Args:
        df (pd.DataFrame): DataFrame containing emoji data
        emoji_column (str): Column name containing emoji lists, default is 'Emoji_Texts'
        top_n (int): Number of emojis to display, default is 10
        color (str): Bar color, default is 'skyblue'
    """
    # Ensure the strings in the emoji column are converted to lists
    df[emoji_column] = df[emoji_column].apply(
        lambda x: ast.literal_eval(x) if isinstance(x, str) else x
    )
    
    # Count all emoji occurrences
    emoji_counter = Counter(
        [emoji for sublist in df[emoji_column] for emoji in sublist]
    )
    
    # Get the most frequent emoji
    top_emoji_series = pd.Series(dict(emoji_counter.most_common(top_n)))
    
    # Plot horizontal bar chart
    top_emoji_series.plot(
        kind='barh', 
        title=f'Top {top_n} Emojis', 
        color=color
    )
    
    plt.xlabel('Frequency')
    plt.ylabel('Emoji')
    plt.tight_layout()
    plt.show()

 

# No need to do Enhanced Sentiment Analysis Including Emojis based on the results.

In [5]:
# Split tweets into financial and non-financial

def classify_financial_text(text):
    """
    Determine if text contains financial terms
    
    Args:
        text (str): Input text
        
    Returns:
        bool: If text contains financial terms, return True, otherwise return False
    """
    try:
        # Predefined financial terms regex
        financial_pattern = r'\b(?:\$[A-Z]{1,5}\b|stock(?:\s*price|s?)|price\s*target|market\s*(?:cap|value)|' \
                          r'shares|share\s*price|earnings(?:\s*report|per\s*share|call)|semiconductor|chip\s*industry|' \
                          r'invest(?:ing|ment)|trading|portfolio|dividend|buyback|' \
                          r'\b(?:bull|bear)(?:ish|market)\b|valuation|P/E|price-to-earnings|' \
                          r'analyst\s*rating|upgrade|downgrade|(?:financial|quarterly)\s*results|' \
                          r'volume|liquidity|SEC\s*filing|10-[KQ]|IPO|FPO|secondary\s*offering)'
        
        # Convert input to string and check if it contains financial terms
        return bool(re.search(financial_pattern, str(text), flags=re.IGNORECASE))
    except Exception as e:
        print(f"Error in classify_financial_text: {str(e)}")
        return False

In [6]:
class SentimentAnalyzer:
    def __init__(self, finbert_model, roberta_model):
        self.finbert = finbert_model
        self.roberta = roberta_model

    def is_initialized(self):
        """Check if all required models are initialized"""
        return all([self.finbert is not None, self.roberta is not None])

    def analyze_financial_text(self, texts, batch_size=32):
        results = []
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i+batch_size]
            try:
                if torch.cuda.is_available():
                    torch.cuda.empty_cache()
                output = self.finbert(batch, batch_size=batch_size)
                results.extend(output)
            except Exception as e:
                print(f"FinBERT Error in batch {i}: {e}")
                results.extend([{'label': 'ERROR', 'score': 0}] * len(batch))
            finally:
                gc.collect()
        return results

    def analyze_non_financial_text(self, texts, batch_size=32):
        results = []
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i+batch_size]
            try:
                output = self.roberta(batch, batch_size=batch_size)
                results.extend(output)
            except Exception as e:
                print(f"RoBERTa Error in batch {i}: {e}")
                results.extend([{'label': 'ERROR', 'score': 0}] * len(batch))
        return results

In [7]:
from tqdm import tqdm 
tqdm.pandas() 



def main():
    try:
        # 1. Load data
        print("Loading data...")
        df = load_and_filter_data("../2_data/cleaned_nvda.csv")
        if df is None or df.empty:
            print("Error: Failed to load data or data is empty")
            return
        
        # 2. Validate data
        print("Validating data...")
        if not validate_data(df):
            print("Error: Data validation failed")
            return
        
        # 3. Get descriptive statistics
        print("Calculating descriptive statistics...")
        stats_df = get_descriptive_stats(df)

        # Print statistics
        print("\nBasic Statistics:")
        print(f"Total tweets: {stats_df.loc['total_tweets', 'Value']}")
        print(f"Unique tweets: {stats_df.loc['unique_tweets', 'Value']}")
        print(f"Average length (words): {stats_df.loc['avg_length', 'Value']:.1f}")
        print(f"Tweets with emojis: {stats_df.loc['emoji_count', 'Value']}")

        # Print common words
        print("\nTop 20 Most Common Words:")
        common_words = stats_df.loc['most_common_words', 'Value']
        if isinstance(common_words, list) and len(common_words) > 0:
            for word, count in common_words:
                print(f"{word}: {count}")
        else:
            print("No common words data available")

        if 'avg_tweet_length' in stats_df.index:
            print(f"\nAverage tweet length (chars): {stats_df.loc['avg_tweet_length', 'Value']:.1f}")
        
        # 4. Initialize sentiment analyzers
        print("\nInitializing sentiment analyzers...")

        # FinBERT pipeline for financial tweets
        finbert_pipeline = pipeline("sentiment-analysis", 
                                  model="yiyanghkust/finbert-tone",
                                  truncation=True,
                                  max_length=512)

        # RoBERTa pipeline for non-financial tweets
        tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment")
        model = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment")
        roberta_pipeline = pipeline("sentiment-analysis", 
                                  model=model, 
                                  tokenizer=tokenizer, 
                                  truncation=True, 
                                  max_length=512)

        # Initialize SentimentAnalyzer instance
        analyzer = SentimentAnalyzer(finbert_pipeline, roberta_pipeline)

        if not analyzer.is_initialized():
            print("Error: Failed to initialize sentiment analyzers")
            return
        
        # 5. Classify financial and non-financial texts
        print("\nClassifying financial and non-financial texts...")
        df['is_financial'] = df['Cleaned_Tweet'].apply(classify_financial_text)
        
        df_fin = df[df['is_financial']].copy()
        df_nonfin = df[~df['is_financial']].copy()
        
        print(f"Financial tweets: {len(df_fin)}")
        print(f"Non-financial tweets: {len(df_nonfin)}")
        
        # 6. Perform sentiment analysis
        print("\nPerforming sentiment analysis...")
        
        # Financial texts: FinBERT
        print("Analyzing financial texts with FinBERT...")
        fin_texts = df_fin['Cleaned_Tweet'].tolist()
        fin_results = []
        for text in tqdm(fin_texts, desc="FinBERT Sentiment", ncols=80):
            fin_results.append(analyzer.analyze_financial_text([text])[0]) 
        df_fin = pd.concat([
            df_fin.reset_index(drop=True),
            pd.DataFrame(fin_results).rename(columns={'label': 'sentiment_label'})
        ], axis=1)

        # Map FinBERT scores
        df_fin['sentiment_score'] = df_fin['sentiment_label'].str.lower().map(
            {'positive': 1, 'neutral': 0, 'negative': -1}
        ).clip(-1, 1)
        df_fin['category'] = 'financial'

        # Non-financial texts: RoBERTa
        print("Analyzing non-financial texts with RoBERTa...")
        nonfin_texts = df_nonfin['Cleaned_Tweet'].tolist()
        roberta_results = []        
        for text in tqdm(nonfin_texts, desc="RoBERTa Sentiment", ncols=80):
            roberta_results.append(analyzer.analyze_non_financial_text([text])[0])
        df_nonfin = pd.concat([
            df_nonfin.reset_index(drop=True),
            pd.DataFrame(roberta_results).rename(columns={'label': 'sentiment_label'})
        ], axis=1)
        
        # Map RoBERTa scores (POSITIVE, NEUTRAL, NEGATIVE)
        # Map RoBERTa sentiment labels to numeric score like FinBERT
        roberta_label_map = {
            'LABEL_0': -1,  # negative
            'LABEL_1':  0,  # neutral
            'LABEL_2':  1   # positive
        }
        df_nonfin['sentiment_score'] = df_nonfin['sentiment_label'].map(roberta_label_map).astype(float)
        df_nonfin['category'] = 'non_financial'

        # 7. Merge results
        print("\nMerging results...")


        # Get the global date range
        all_dates = pd.to_datetime(pd.concat([df_fin['Date'], df_nonfin['Date']]))
        date_range = pd.date_range(
            start=all_dates.min().date(), 
            end=all_dates.max().date(),
            freq='D'
        )
        
        # Merge all sentiment results
        df_tweet_sentiment = pd.concat([df_fin, df_nonfin], ignore_index=True)

        # Save tweet-level sentiments
        cols_to_save = ['Date', 'Processed_Tweet', 'sentiment_score', 'sentiment_label', 'category']
        for col in ['username', 'likes']:
            if col in df_tweet_sentiment.columns and col not in cols_to_save:
                cols_to_save.append(col)

        df_tweet_sentiment[cols_to_save].to_csv("../2_data/tweet_level_sentiment.csv", index=False)
        print("Saved tweet-level sentiment file: tweet_level_sentiment.csv")

        def create_daily_sentiment(df_combined, model_name):
            """General daily aggregation function"""
            df_combined['Date'] = pd.to_datetime(df_combined['Date'])
            daily_df = (
                df_combined.groupby(df_combined['Date'].dt.date)['sentiment_score']
                .agg(['mean', 'count', 'std'])
                .reindex(date_range)
                .fillna(method='ffill')
                .reset_index()
            )
            daily_df.columns = ['date', 'avg_sentiment_score', 'tweet_count', 'sentiment_std']
            daily_df['model_type'] = model_name
            return daily_df

        # Create daily sentiment DataFrames
        print("\nCreating daily sentiment aggregates...")
        df_daily_fin = create_daily_sentiment(df_fin, 'FinBERT')
        df_daily_roberta = create_daily_sentiment(df_nonfin, 'RoBERTa')
        
        # Create combined daily sentiment
        df_combined = pd.concat([df_fin, df_nonfin], ignore_index=True)
        df_daily_combined = create_daily_sentiment(df_combined, 'FinBERT+RoBERTa')

        # 8. Save results
        print("\nSaving results...")
        df_daily_combined.to_csv("../2_data/daily_sentiment_combined.csv", index=False)
        pd.concat([df_daily_fin, df_daily_roberta]).to_csv("../2_data/all_daily_sentiment.csv", index=False)
        print("Saved daily sentiment files")

        # 9. Validate output
        print("\nFinal Score Ranges:")
        print(f"FinBERT: [{df_fin['sentiment_score'].min():.2f}, {df_fin['sentiment_score'].max():.2f}]")
        print(f"RoBERTa: [{df_nonfin['sentiment_score'].min():.2f}, {df_nonfin['sentiment_score'].max():.2f}]")

        print("\nAnalysis completed successfully!")

    except Exception as e:
        print(f"\nError in main execution: {str(e)}")
        import traceback
        traceback.print_exc()
        sys.exit(1)

if __name__ == "__main__":
    main()

Loading data...
Validating data...
Calculating descriptive statistics...

Basic Statistics:
Total tweets: 266660
Unique tweets: 222153
Average length (words): 9.1
Tweets with emojis: 78706

Top 20 Most Common Words:
nvda: 280198
amd: 26862
go: 25249
buy: 20566
stock: 18875
call: 18530
today: 17261
market: 15638
like: 15356
day: 15277
get: 14733
aapl: 13734
spi: 13520
week: 13168
see: 12474
look: 12459
short: 11609
tsla: 11524
time: 11333
trade: 11252

Average tweet length (chars): 49.8

Initializing sentiment analyzers...


Device set to use mps:0
Device set to use mps:0



Classifying financial and non-financial texts...
Financial tweets: 38365
Non-financial tweets: 228295

Performing sentiment analysis...
Analyzing financial texts with FinBERT...


FinBERT Sentiment: 100%|████████████████| 38365/38365 [1:02:36<00:00, 10.21it/s]


Analyzing non-financial texts with RoBERTa...


RoBERTa Sentiment: 100%|█████████████| 228295/228295 [10:40:50<00:00,  5.94it/s]



Merging results...
Saved tweet-level sentiment file: tweet_level_sentiment.csv

Creating daily sentiment aggregates...

Saving results...
Saved daily sentiment files

Final Score Ranges:
FinBERT: [-1.00, 1.00]
RoBERTa: [-1.00, 1.00]

Analysis completed successfully!


  .fillna(method='ffill')
  .fillna(method='ffill')
  .fillna(method='ffill')


In [None]:
non_zero_likes = merged_df[merged_df['likes_num'] > 0]
print(non_zero_likes['likes_num'].describe())
print(len(non_zero_likes))

count    282114.00000
mean          2.56799
std           2.76649
min           1.00000
25%           1.00000
50%           2.00000
75%           3.00000
max         201.00000
Name: likes_num, dtype: float64
282114
