In [12]:
# Import required libraries
import pandas as pd
import numpy as np
import re
import os
import torch
from torch.utils.data import TensorDataset, DataLoader
from transformers import BertTokenizer, BertForSequenceClassification
import matplotlib.pyplot as plt
import seaborn as sns
import json
import time
from datetime import datetime
from tqdm.notebook import tqdm
import tweepy
from google.colab import drive
import getpass
import threading
import queue

In [13]:
# Mount Google Drive to access saved model and store results
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [14]:
# Define paths
MODEL_PATH = '/content/drive/MyDrive/Grad_project/bert_sentiment_model.zip (Unzipped Files)'
RESULTS_PATH = '/content/drive/MyDrive/Grad_project/twitter_results'

In [15]:
# Make sure results directory exists
if not os.path.exists(RESULTS_PATH):
    os.makedirs(RESULTS_PATH, exist_ok=True)

In [16]:
# Class to handle Twitter API operations
class TwitterAPI:
    def __init__(self):
        """Initialize Twitter API client after getting credentials from user"""
        # Get Twitter API credentials securely
        print("Please enter your Twitter API credentials:")
        consumer_key = getpass.getpass("Consumer Key (API Key): ")
        consumer_secret = getpass.getpass("Consumer Secret (API Secret): ")
        access_token = getpass.getpass("Access Token: ")
        access_token_secret = getpass.getpass("Access Token Secret: ")
        bearer_token = getpass.getpass("Bearer Token: ")

        # Add option to check rate limits before proceeding
        print("\nIMPORTANT: Free Twitter API has strict rate limits.")
        choice = input("Do you want to check your current rate limit status before proceeding? (y/n): ").lower()

        # Authenticate with Twitter API v2
        self.client = tweepy.Client(
            bearer_token=bearer_token,
            consumer_key=consumer_key,
            consumer_secret=consumer_secret,
            access_token=access_token,
            access_token_secret=access_token_secret,
            wait_on_rate_limit=True  # Automatically wait when rate limit is reached
        )

        # Check rate limits if requested
        if choice == 'y':
            try:
                # Get rate limit status for search endpoint
                rate_limit = self.client.get_recent_tweets_count("test", granularity="day")
                print("\nRate limit information:")
                if hasattr(rate_limit, 'meta') and 'sent' in rate_limit.meta:
                    print(f"Request cost: {rate_limit.meta['sent']}")
                if hasattr(rate_limit, '_headers') and 'x-rate-limit-remaining' in rate_limit._headers:
                    print(f"Remaining requests: {rate_limit._headers['x-rate-limit-remaining']}")
                    print(f"Rate limit resets in: {rate_limit._headers['x-rate-limit-reset']} seconds")
                else:
                    print("Could not retrieve detailed rate limit information, but connection successful.")
            except tweepy.TweepyException as e:
                print(f"\nRate limit check failed: {e}")
                print("Proceeding anyway - the code will pause if limits are reached.")
        print("Twitter API authentication successful!")

    def search_tweets(self, query, max_results=100, lang='en', tweet_queue=None):
        """
        Search for tweets matching the query and handle pagination and rate limits

        Args:
            query (str): Search query for Twitter
            max_results (int): Maximum number of tweets to retrieve
            lang (str): Language filter code (e.g., 'en' for English)
            tweet_queue (Queue): Queue to store retrieved tweets for processing

        Returns:
            list: List of fetched tweets if tweet_queue is None, otherwise None
        """
        all_tweets = []
        total_fetched = 0
        next_token = None

        # Twitter API requires batch_size to be at least 10
        batch_size = min(100, max(10, max_results))  # Between 10 and 100

        # Add language filter to query if specified
        if lang:
            query = f"{query} lang:{lang}"

        # Fields we want to retrieve
        tweet_fields = ['created_at', 'public_metrics', 'author_id', 'lang']

        try:
            while total_fetched < max_results:
                # Calculate remaining tweets to fetch
                remaining = min(batch_size, max_results - total_fetched)

                # Ensure remaining is at least 10 for API requirement
                if remaining < 10:
                    if total_fetched >= max_results:
                        break  # We already have enough tweets
                    remaining = min(10, max_results - total_fetched)

                print(f"Fetching batch of {remaining} tweets (total: {total_fetched}/{max_results})...")

                # Search tweets with pagination
                response = self.client.search_recent_tweets(
                    query=query,
                    max_results=remaining,
                    tweet_fields=tweet_fields,
                    next_token=next_token
                )

                # If no tweets found, break the loop
                if not response.data:
                    print("No tweets found.")
                    break

                # Process each tweet
                batch_tweets = []
                for tweet in response.data:
                    # Skip non-English tweets if English filter is set
                    if lang == 'en' and tweet.lang != 'en':
                        continue

                    tweet_data = {
                        'id': tweet.id,
                        'text': tweet.text,
                        'created_at': tweet.created_at,
                        'author_id': tweet.author_id,
                        'retweet_count': tweet.public_metrics['retweet_count'],
                        'reply_count': tweet.public_metrics['reply_count'],
                        'like_count': tweet.public_metrics['like_count'],
                        'quote_count': tweet.public_metrics['quote_count'],
                        'lang': tweet.lang
                    }

                    # Either add to queue or collect in list
                    if tweet_queue:
                        tweet_queue.put(tweet_data)
                    else:
                        batch_tweets.append(tweet_data)

                # If not using queue, add batch to all_tweets list
                if not tweet_queue:
                    all_tweets.extend(batch_tweets)

                # Update total and pagination token
                total_fetched += len(response.data)

                # Check if we've reached the requested number of tweets
                if total_fetched >= max_results:
                    break

                # Check if we have more tweets to fetch via pagination
                if 'next_token' in response.meta:
                    next_token = response.meta['next_token']
                else:
                    print("No more pages available.")
                    break

                # Add a delay to avoid hitting rate limits
                print("Waiting 3 seconds before next API request...")
                time.sleep(3)

        except tweepy.TweepyException as e:
            print(f"Error fetching tweets: {e}")

        # Signal that we're done if using a queue
        if tweet_queue:
            tweet_queue.put(None)  # Sentinel value to indicate completion
            return None

        return all_tweets

In [17]:
# Class for sentiment analysis using BERT
class SentimentAnalyzer:
    def __init__(self, model_path):
        """
        Initialize the BERT model for sentiment analysis

        Args:
            model_path (str): Path to the saved BERT model
        """
        # Check if GPU is available
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {self.device}")

        # Load tokenizer
        self.tokenizer = BertTokenizer.from_pretrained(model_path)

        # Load model
        self.model = BertForSequenceClassification.from_pretrained(model_path)
        self.model.to(self.device)
        self.model.eval()  # Set to evaluation mode

        # Define sentiment mapping
        # IMPORTANT: This mapping should match your BERT model's output classes
        self.sentiments = {0: 'Negative', 1: 'Positive', 2: 'Neutral'}

        print("Sentiment analysis model loaded successfully!")

    def clean_text(self, text):
        """
        Clean and preprocess text data

        Args:
            text (str): Text to clean

        Returns:
            str: Cleaned text
        """
        if not isinstance(text, str):
            text = str(text)

        # Convert to lowercase
        text = text.lower()

        # Remove URLs, HTML tags, and mentions
        text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
        text = re.sub(r'<.*?>', '', text)
        text = re.sub(r'@\w+', '', text)

        # Remove special characters but keep emoticons
        text = re.sub(r'[^\w\s:;)(><]', ' ', text)

        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text).strip()

        return text

    def predict_sentiment(self, text, max_length=128):
        """
        Predict sentiment for a given text

        Args:
            text (str): Text to analyze
            max_length (int): Maximum length for tokenization

        Returns:
            str: Predicted sentiment label
        """
        # Clean the text
        cleaned = self.clean_text(text)

        # Tokenize the cleaned text
        inputs = self.tokenizer(
            cleaned,
            return_tensors='pt',
            truncation=True,
            padding='max_length',
            max_length=max_length
        ).to(self.device)

        # Get prediction from the model
        with torch.no_grad():
            outputs = self.model(**inputs)
            prediction = torch.argmax(outputs.logits, dim=1).item()

        return self.sentiments[prediction]

    def process_tweet_batch(self, tweets):
        """
        Process a batch of tweets and add sentiment predictions

        Args:
            tweets (list): List of tweet dictionaries

        Returns:
            list: Tweets with added sentiment predictions
        """
        for tweet in tweets:
            tweet['sentiment'] = self.predict_sentiment(tweet['text'])
        return tweets

    def process_tweets_from_queue(self, tweet_queue, result_queue):
        """
        Process tweets from a queue and add results to result queue

        Args:
            tweet_queue (Queue): Queue containing tweets to process
            result_queue (Queue): Queue to store processed tweets
        """
        while True:
            tweet = tweet_queue.get()
            if tweet is None:  # Check for sentinel value
                result_queue.put(None)  # Signal completion
                break

            tweet['sentiment'] = self.predict_sentiment(tweet['text'])
            result_queue.put(tweet)

In [18]:
# Class to handle results visualization and storage
class ResultsHandler:
    def __init__(self, results_path):
        """
        Initialize the results handler

        Args:
            results_path (str): Path to store results
        """
        self.results_path = results_path
        self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    def save_to_csv(self, tweets, topic):
        """
        Save tweets to CSV file

        Args:
            tweets (list): List of tweet dictionaries
            topic (str): Search topic

        Returns:
            str: Path to saved CSV file
        """
        # Check if we have any tweets to save
        if not tweets:
            print("No tweets to save to CSV.")
            filename = f"{self.results_path}/{topic.replace(' ', '_')}_{self.timestamp}_empty.csv"
            # Create an empty DataFrame with expected columns
            empty_df = pd.DataFrame(columns=['id', 'text', 'created_at', 'author_id',
                                            'retweet_count', 'reply_count', 'like_count',
                                            'quote_count', 'lang', 'sentiment'])
            empty_df.to_csv(filename, index=False)
            return filename

        # Convert list to DataFrame
        df = pd.DataFrame(tweets)

        # Format created_at column
        if 'created_at' in df.columns:
            df['created_at'] = pd.to_datetime(df['created_at']).dt.strftime('%Y-%m-%d %H:%M:%S')

        # Save to CSV
        filename = f"{self.results_path}/{topic.replace(' ', '_')}_{self.timestamp}.csv"
        df.to_csv(filename, index=False)
        print(f"Results saved to {filename}")

        return filename

    def generate_visualizations(self, tweets, topic):
        """
        Generate visualizations based on sentiment analysis

        Args:
            tweets (list): List of tweet dictionaries
            topic (str): Search topic

        Returns:
            str: Path to saved visualizations or None if no tweets
        """
        # Check if we have any tweets to visualize
        if not tweets:
            print("No tweets to visualize.")
            return None

        # Convert to DataFrame for easier manipulation
        df = pd.DataFrame(tweets)

        # Check if 'sentiment' column exists
        if 'sentiment' not in df.columns:
            print("Error: 'sentiment' column not found in data. Visualizations cannot be generated.")
            return None

        # Create directory for visualizations
        viz_dir = f"{self.results_path}/{topic.replace(' ', '_')}_{self.timestamp}_viz"
        os.makedirs(viz_dir, exist_ok=True)

        # 1. Sentiment distribution pie chart
        plt.figure(figsize=(10, 6))
        sentiment_counts = df['sentiment'].value_counts()
        plt.pie(sentiment_counts, labels=sentiment_counts.index, autopct='%1.1f%%', startangle=90, colors=sns.color_palette('viridis'))
        plt.title(f'Sentiment Distribution for Topic: "{topic}"')
        plt.tight_layout()
        plt.savefig(f"{viz_dir}/sentiment_pie.png")
        plt.close()

        # 2. Sentiment distribution bar chart
        plt.figure(figsize=(10, 6))
        # Fix the FutureWarning by properly using hue parameter
        ax = plt.axes()
        order = ['Positive', 'Neutral', 'Negative']
        sns.countplot(x='sentiment', data=df, palette='viridis', order=order, ax=ax)
        plt.title(f'Sentiment Count for Topic: "{topic}"')
        plt.xlabel('Sentiment')
        plt.ylabel('Count')
        plt.tight_layout()
        plt.savefig(f"{viz_dir}/sentiment_bar.png")
        plt.close()

        # 3. Average engagement metrics by sentiment
        plt.figure(figsize=(12, 8))
        metrics = ['like_count', 'retweet_count', 'reply_count', 'quote_count']

        # Calculate average metrics by sentiment
        sentiment_metrics = df.groupby('sentiment')[metrics].mean().reset_index()

        # Reshape for plotting
        sentiment_metrics_melted = pd.melt(sentiment_metrics, id_vars=['sentiment'], value_vars=metrics,
                                          var_name='Metric', value_name='Average Count')

        # Create the grouped bar chart
        sns.barplot(x='sentiment', y='Average Count', hue='Metric', data=sentiment_metrics_melted, palette='viridis')
        plt.title(f'Average Engagement by Sentiment for Topic: "{topic}"')
        plt.xlabel('Sentiment')
        plt.ylabel('Average Count')
        plt.legend(title='Engagement Metric')
        plt.tight_layout()
        plt.savefig(f"{viz_dir}/engagement_by_sentiment.png")
        plt.close()

        # 4. Language distribution (if non-English tweets are included)
        if len(df['lang'].unique()) > 1:
            plt.figure(figsize=(10, 6))
            lang_counts = df['lang'].value_counts().head(10)  # Show top 10 languages
            sns.barplot(x=lang_counts.index, y=lang_counts.values, palette='viridis')
            plt.title(f'Top Languages for Topic: "{topic}"')
            plt.xlabel('Language Code')
            plt.ylabel('Count')
            plt.xticks(rotation=45)
            plt.tight_layout()
            plt.savefig(f"{viz_dir}/language_distribution.png")
            plt.close()

        print(f"Visualizations saved to {viz_dir}")
        return viz_dir

In [19]:
# Main pipeline class
class TwitterSentimentPipeline:
    def __init__(self, model_path, results_path):
        """
        Initialize the complete Twitter sentiment analysis pipeline

        Args:
            model_path (str): Path to the sentiment analysis model
            results_path (str): Path to store results
        """
        self.twitter_api = TwitterAPI()
        self.sentiment_analyzer = SentimentAnalyzer(model_path)
        self.results_handler = ResultsHandler(results_path)

    def run_streaming_pipeline(self, topic, max_results=100, lang='en'):
        """
        Run the pipeline in streaming mode (process tweets as they come)

        Args:
            topic (str): Topic to search for
            max_results (int): Maximum number of tweets to retrieve
            lang (str): Language filter code (e.g., 'en' for English)

        Returns:
            tuple: (path to CSV file, path to visualizations)
        """
        # Validate max_results
        max_results = min(max(max_results, 10), 100)  # Between 10 and 100

        lang_filter = f"(language filter: {lang})" if lang else "(no language filter)"
        print(f"Starting sentiment analysis for topic: '{topic}' {lang_filter} (max {max_results} tweets)")

        # Create queues for tweet processing
        tweet_queue = queue.Queue()
        result_queue = queue.Queue()

        # Start tweet processing thread
        processing_thread = threading.Thread(
            target=self.sentiment_analyzer.process_tweets_from_queue,
            args=(tweet_queue, result_queue)
        )
        processing_thread.start()

        # Start tweet fetching (this will put tweets into the tweet_queue)
        self.twitter_api.search_tweets(topic, max_results, lang, tweet_queue)

        # Collect processed results
        all_results = []
        processed_count = 0

        # Create progress bar
        pbar = tqdm(total=max_results, desc="Processing tweets")

        while True:
            result = result_queue.get()
            if result is None:  # Check for sentinel value
                break

            all_results.append(result)
            processed_count += 1
            pbar.update(1)

            # Print current result (optional)
            print(f"Tweet: '{result['text'][:50]}...' - Sentiment: {result['sentiment']}")

        pbar.close()
        processing_thread.join()

        print(f"Processed {len(all_results)} tweets.")

        # Handle cases with no tweets
        if len(all_results) == 0:
            print("Warning: No tweets were processed. Results may be limited.")

        # Save results to CSV
        csv_path = self.results_handler.save_to_csv(all_results, topic)

        # Generate visualizations if we have results
        viz_path = None
        if all_results:
            viz_path = self.results_handler.generate_visualizations(all_results, topic)

        return csv_path, viz_path

In [20]:
# Function to test the pipeline
def test_pipeline():
    """Test the Twitter sentiment analysis pipeline with user input"""
    # Initialize the pipeline
    pipeline = TwitterSentimentPipeline(MODEL_PATH, RESULTS_PATH)

    # Get user input
    topic = input("Enter topic to search for on Twitter: ")

    try:
        print("\nNOTE: For free API tier, we recommend keeping this number small (10-50)")
        print("IMPORTANT: Twitter API requires a minimum of 10 tweets per request")
        max_results = int(input("Enter number of tweets to analyze (min 10, max 100): "))
        # Ensure between 10 and 100
        max_results = min(max(max_results, 10), 100)
    except ValueError:
        print("Invalid input. Using default of 10 tweets.")
        max_results = 10

    # Get language preference
    lang_choice = input("\nFilter tweets by language? (y/n): ").lower()
    lang = 'en' if lang_choice == 'y' else None

    if lang_choice == 'y':
        print("English language filter applied. Only English tweets will be analyzed.")
    else:
        print("No language filter applied. Tweets in all languages will be analyzed.")

    print("\nUsing streaming mode to process tweets as they come (better for handling rate limits)")
    print("\nIMPORTANT: If you see 'Rate limit exceeded' warning, don't worry!")
    print("The program will automatically wait and continue when Twitter allows more requests.")
    print("This is normal for free API tier accounts.")

    try:
        # Run streaming pipeline
        csv_path, viz_path = pipeline.run_streaming_pipeline(topic, max_results, lang)

        if csv_path:
            print("\nAnalysis completed!")
            print(f"Results saved to: {csv_path}")
            if viz_path:
                print(f"Visualizations saved to: {viz_path}")
            else:
                print("No visualizations were generated (likely no tweets found).")
        else:
            print("\nAnalysis failed. Please check the logs above for errors.")
    except Exception as e:
        print(f"\nAn error occurred during analysis: {e}")
        import traceback
        traceback.print_exc()

In [21]:
# Run the test function if this script is executed directly
if __name__ == "__main__":
    test_pipeline()

Please enter your Twitter API credentials:
Consumer Key (API Key): ··········
Consumer Secret (API Secret): ··········
Access Token: ··········
Access Token Secret: ··········
Bearer Token: ··········

IMPORTANT: Free Twitter API has strict rate limits.
Do you want to check your current rate limit status before proceeding? (y/n): y

Rate limit information:
Could not retrieve detailed rate limit information, but connection successful.
Twitter API authentication successful!
Using device: cpu
Sentiment analysis model loaded successfully!
Enter topic to search for on Twitter: dell

NOTE: For free API tier, we recommend keeping this number small (10-50)
IMPORTANT: Twitter API requires a minimum of 10 tweets per request
Enter number of tweets to analyze (min 10, max 100): 30

Filter tweets by language? (y/n): y
English language filter applied. Only English tweets will be analyzed.

Using streaming mode to process tweets as they come (better for handling rate limits)

The program will automat

Processing tweets:   0%|          | 0/30 [00:00<?, ?it/s]

Tweet: '@Val_Gadget Dell any day...' - Sentiment: Neutral
Tweet: 'RT @_MLFootball: 🚨🚨THIS IS WILD🚨🚨

#TEXANS STAR RE...' - Sentiment: Negative
Tweet: 'We're thrilled to inform you that you might be eli...' - Sentiment: Neutral
Tweet: 'I wish more articles were as informative as this o...' - Sentiment: Positive
Tweet: 'RT @BTC_for_Freedom: Imagine how much Bitcoin Mich...' - Sentiment: Negative
Tweet: 'Dell Rolls Out Trusted Device 5.0 with Enhanced Se...' - Sentiment: Positive
Tweet: '🔥 The BIGGЕST #Сryрtо #РUMР #Signаl is here! 🚀 Jоi...' - Sentiment: Neutral
Tweet: 'Tank Dell with the new ink!

 https://t.co/5tr3amp...' - Sentiment: Neutral
Tweet: 'RT @storagereview: The Dell PowerEdge R7715 is a 2...' - Sentiment: Neutral
Tweet: 'RT @Val_Gadget: Looking for a new laptop?

Between...' - Sentiment: Neutral
Tweet: '@MichaelDell @DellTech I’ve never owned anything b...' - Sentiment: Neutral
Tweet: 'Dangerous for wood, it has many ennemies.
#hanneto...' - Sentiment: Negative
Tweet: '🔥 The


Passing `palette` without assigning `hue` is deprecated and will be removed in v0.14.0. Assign the `x` variable to `hue` and set `legend=False` for the same effect.

  sns.countplot(x='sentiment', data=df, palette='viridis', order=order, ax=ax)


Visualizations saved to /content/drive/MyDrive/Grad_project/twitter_results/dell_20250503_235525_viz

Analysis completed!
Results saved to: /content/drive/MyDrive/Grad_project/twitter_results/dell_20250503_235525.csv
Visualizations saved to: /content/drive/MyDrive/Grad_project/twitter_results/dell_20250503_235525_viz
