<a href="https://colab.research.google.com/github/SirishaSurnam/SentimentAnalysis_Spark/blob/main/BDA_Sentiment140_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Twitter Data Analysis using Apache Spark on Cloud

In [1]:
!rm -rf spark-4.0.1-bin-hadoop3*

In [5]:

# ********************************** Spark 2.3.x Env Setup For Google Collab ********************************** #

# download spark 3.5
!wget -q -O spark-3.5.0-bin-hadoop3.tgz https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

# extract spark archive
!tar xf spark-3.5.0-bin-hadoop3.tgz

# install findspark
!pip install -q findspark

# ********************************** end of setup ********************************** #


tar: This does not look like a tar archive

gzip: stdin: unexpected end of file
tar: Child returned status 1
tar: Error is not recoverable: exiting now


In [46]:
!ls -lh

total 8.0K
drwx------ 5 root root 4.0K Sep 16 12:16 drive
drwxr-xr-x 1 root root 4.0K Sep  9 13:46 sample_data


In [6]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

# initialize SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("App_Name") \
    .getOrCreate()

# set the log level to WARN
spark.sparkContext.setLogLevel('WARN')

print("✅ Spark session created successfully!")

✅ Spark session created successfully!


In [7]:
# Sentiment140 Dataset Analysis using Apache Spark
# Dataset: https://www.kaggle.com/datasets/kazanova/sentiment140

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import re

In [8]:
dataset_path = "training.1600000.processed.noemoticon.csv"


In [20]:
class Sentiment140Analyzer:
    def __init__(self, app_name="Sentiment140Analysis"):
        """Initialize Spark session for Sentiment140 dataset analysis"""
        self.spark = SparkSession.builder \
            .appName(app_name) \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
            .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
            .getOrCreate()

        self.spark.sparkContext.setLogLevel("WARN")
        self.clean_text_udf = self.create_text_cleaning_udf()


    def define_sentiment140_schema(self):
        """Define schema for Sentiment140 dataset"""
        return StructType([
            StructField("target", IntegerType(), True),      # Sentiment label (0=negative, 4=positive)
            StructField("ids", StringType(), True),          # Tweet ID
            StructField("date", StringType(), True),         # Date of tweet
            StructField("flag", StringType(), True),         # Query flag
            StructField("user", StringType(), True),         # Username
            StructField("text", StringType(), True)          # Tweet text
        ])

    def load_sentiment140_dataset(self, file_path):
        """Load Sentiment140 dataset from CSV file"""
        schema = self.define_sentiment140_schema()

        # Load the CSV file (no headers in original dataset)
        df = self.spark.read.csv(file_path, schema=schema, encoding='latin1')

        print(f"Dataset loaded successfully!")
        print(f"Total records: {df.count()}")
        df.show(5)

        return df

    def preprocess_sentiment140_data(self, df):
        """Preprocess Sentiment140 dataset"""
        print("Preprocessing Sentiment140 data...")

        # Convert sentiment labels: 0 -> 0 (negative), 4 -> 1 (positive)
        df = df.withColumn("sentiment_label",
                          when(col("target") == 0, 0)  # Negative
                          .when(col("target") == 4, 1)  # Positive
                          .otherwise(2))  # Neutral (though not in original dataset)

        # Convert date to proper timestamp
        df = df.withColumn("timestamp", to_timestamp(col("date"), "EEE MMM dd HH:mm:ss Z yyyy"))

        # Extract date components for analysis
        df = df.withColumn("year", year("timestamp")) \
               .withColumn("month", month("timestamp")) \
               .withColumn("day", dayofmonth("timestamp")) \
               .withColumn("hour", hour("timestamp"))

        # Clean tweet text
        df = df.withColumn("cleaned_text", self.clean_text_udf(col("text")))

        # Filter out empty tweets after cleaning
        df = df.filter(col("cleaned_text").isNotNull() & (length(col("cleaned_text")) > 10))

        # Add sentiment labels as string for better readability
        df = df.withColumn("sentiment",
                          when(col("sentiment_label") == 0, "negative")
                          .when(col("sentiment_label") == 1, "positive")
                          .otherwise("neutral"))

        print(f"Data preprocessed. Records after cleaning: {df.count()}")
        return df

    def create_text_cleaning_udf(self):
        """Create UDF for text cleaning"""
        def clean_text(text):
            if text is None:
                return None

            # Convert to lowercase
            text = text.lower()

            # Remove URLs
            text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)

            # Remove user mentions and hashtags symbols
            text = re.sub(r'@\w+|#', '', text)

            # Remove extra whitespace
            text = re.sub(r'\s+', ' ', text).strip()

            # Remove non-alphabetic characters (keep spaces)
            text = re.sub(r'[^a-zA-Z\s]', '', text)

            return text if len(text.strip()) > 0 else None

        return udf(clean_text, StringType())


    def exploratory_data_analysis(self, df):
        """Perform comprehensive EDA on Sentiment140 dataset"""
        print("\n=== EXPLORATORY DATA ANALYSIS ===")

        # Basic statistics
        print("1. Dataset Overview:")
        print(f"Total tweets: {df.count():,}")
        print(f"Unique users: {df.select('user').distinct().count():,}")

        # Sentiment distribution
        print("\n2. Sentiment Distribution:")
        sentiment_dist = df.groupBy("sentiment").count().orderBy("count", ascending=False)
        sentiment_dist.show()

        # Store for visualization
        sentiment_pandas = sentiment_dist.toPandas()

        # Temporal analysis
        print("\n3. Temporal Distribution:")
        temporal_dist = df.groupBy("year", "month").count().orderBy("year", "month")
        temporal_dist.show(20)

        # Hourly patterns
        print("\n4. Hourly Tweet Patterns:")
        hourly_dist = df.groupBy("hour").count().orderBy("hour")
        hourly_dist.show(24)

        # Top users by tweet count
        print("\n5. Most Active Users:")
        top_users = df.groupBy("user").count().orderBy(desc("count")).limit(10)
        top_users.show()

        # Text length analysis
        print("\n6. Text Length Statistics:")
        text_stats = df.select(length("cleaned_text").alias("text_length")) \
                      .describe("text_length")
        text_stats.show()

        # Average text length by sentiment
        print("\n7. Average Text Length by Sentiment:")
        avg_length = df.groupBy("sentiment") \
                      .agg(avg(length("cleaned_text")).alias("avg_text_length")) \
                      .orderBy("avg_text_length", ascending=False)
        avg_length.show()

        return {
            'sentiment_dist': sentiment_pandas,
            'temporal_dist': temporal_dist.toPandas(),
            'hourly_dist': hourly_dist.toPandas()
        }

    def word_frequency_analysis(self, df):
        """Analyze word frequencies by sentiment"""
        print("\n=== WORD FREQUENCY ANALYSIS ===")

        # Tokenize words
        tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="words")
        words_df = tokenizer.transform(df)

        # Remove stop words
        remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
        filtered_df = remover.transform(words_df)

        # Explode words and analyze frequency by sentiment
        exploded_df = filtered_df.select("sentiment", explode("filtered_words").alias("word"))

        # Overall word frequency
        print("Top 20 Most Common Words:")
        word_freq = exploded_df.groupBy("word").count() \
                              .filter(length("word") > 2) \
                              .orderBy(desc("count")) \
                              .limit(20)
        word_freq.show()

        # Word frequency by sentiment
        print("Top Words by Sentiment - Positive:")
        positive_words = exploded_df.filter(col("sentiment") == "positive") \
                                   .groupBy("word").count() \
                                   .filter(length("word") > 2) \
                                   .orderBy(desc("count")) \
                                   .limit(15)
        positive_words.show()

        print("Top Words by Sentiment - Negative:")
        negative_words = exploded_df.filter(col("sentiment") == "negative") \
                                   .groupBy("word").count() \
                                   .filter(length("word") > 2) \
                                   .orderBy(desc("count")) \
                                   .limit(15)
        negative_words.show()

        return {
            'overall_words': word_freq.toPandas(),
            'positive_words': positive_words.toPandas(),
            'negative_words': negative_words.toPandas()
        }

    def build_sentiment_classifier(self, df):
        """Build and train sentiment classification models"""
        print("\n=== BUILDING SENTIMENT CLASSIFIER ===")

        # Prepare features
        tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="words")
        remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
        hashingTF = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=2000)
        idf = IDF(inputCol="raw_features", outputCol="features")

        # Reduce data
        df = df.sample(0.1, seed=42)
        # Split data
        train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

        print(f"Training set: {train_data.count():,} tweets")
        print(f"Test set: {test_data.count():,} tweets")

        # Logistic Regression Pipeline
        print("\n1. Training Logistic Regression Model...")
        lr = LogisticRegression(featuresCol="features", labelCol="sentiment_label", maxIter=30)
        lr_pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])

        lr_model = lr_pipeline.fit(train_data)
        lr_predictions = lr_model.transform(test_data)

        # Evaluate Logistic Regression
        evaluator = MulticlassClassificationEvaluator(labelCol="sentiment_label",
                                                    predictionCol="prediction",
                                                    metricName="accuracy")
        lr_accuracy = evaluator.evaluate(lr_predictions)

        '''
        # Random Forest Pipeline
        print("2. Training Random Forest Model...")
        rf = RandomForestClassifier(featuresCol="features", labelCol="sentiment_label", numTrees=50)
        rf_pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, rf])

        rf_model = rf_pipeline.fit(train_data)
        rf_predictions = rf_model.transform(test_data)
        rf_accuracy = evaluator.evaluate(rf_predictions)
        '''

        print(f"\nModel Performance:")
        print(f"Logistic Regression Accuracy: {lr_accuracy:.4f}")
        # print(f"Random Forest Accuracy: {rf_accuracy:.4f}")

        '''
        # Detailed evaluation for best model
        best_model = lr_model if lr_accuracy > rf_accuracy else rf_model
        best_predictions = lr_predictions if lr_accuracy > rf_accuracy else rf_predictions
        best_name = "Logistic Regression" if lr_accuracy > rf_accuracy else "Random Forest"
        '''
        best_model = lr_model
        best_predictions = lr_predictions
        best_name = "Logistic Regression"

        print(f"\nDetailed evaluation for {best_name}:")

        # Confusion Matrix
        confusion_matrix = best_predictions.groupBy("sentiment_label", "prediction").count()
        confusion_matrix.show()

        # Precision, Recall, F1-Score
        precision_evaluator = MulticlassClassificationEvaluator(labelCol="sentiment_label",
                                                               predictionCol="prediction",
                                                               metricName="weightedPrecision")
        recall_evaluator = MulticlassClassificationEvaluator(labelCol="sentiment_label",
                                                            predictionCol="prediction",
                                                            metricName="weightedRecall")
        f1_evaluator = MulticlassClassificationEvaluator(labelCol="sentiment_label",
                                                        predictionCol="prediction",
                                                        metricName="f1")

        precision = precision_evaluator.evaluate(best_predictions)
        recall = recall_evaluator.evaluate(best_predictions)
        f1_score = f1_evaluator.evaluate(best_predictions)

        print(f"Weighted Precision: {precision:.4f}")
        print(f"Weighted Recall: {recall:.4f}")
        print(f"F1-Score: {f1_score:.4f}")

        #  'accuracy': lr_accuracy if lr_accuracy > rf_accuracy else rf_accuracy,

        return {
            'best_model': best_model,
            'best_predictions': best_predictions,
            'model_name': best_name,
            'accuracy': lr_accuracy,
            'precision': precision,
            'recall': recall,
            'f1_score': f1_score,
            'confusion_matrix': confusion_matrix.toPandas()
        }

    def advanced_sentiment_analysis(self, df):
        """Advanced sentiment analysis with custom insights"""
        print("\n=== ADVANCED SENTIMENT ANALYSIS ===")

        # Sentiment by time patterns
        print("1. Sentiment Patterns by Hour:")
        hourly_sentiment = df.groupBy("hour", "sentiment").count() \
                            .orderBy("hour", "sentiment")
        hourly_sentiment.show(50)

        # User sentiment consistency
        print("2. User Sentiment Consistency:")
        user_sentiment = df.groupBy("user", "sentiment").count()
        user_variety = user_sentiment.groupBy("user") \
                                   .agg(countDistinct("sentiment").alias("sentiment_types"),
                                        sum("count").alias("total_tweets")) \
                                   .filter(col("total_tweets") >= 5) \
                                   .orderBy(desc("total_tweets"))

        print("Users with most diverse sentiment (both positive and negative):")
        diverse_users = user_variety.filter(col("sentiment_types") >= 2).limit(10)
        diverse_users.show()

        # Long vs short tweets sentiment
        print("3. Sentiment by Tweet Length:")
        df_with_length = df.withColumn("text_length", length("cleaned_text")) \
                          .withColumn("length_category",
                                     when(col("text_length") < 50, "short")
                                     .when(col("text_length") < 100, "medium")
                                     .otherwise("long"))

        length_sentiment = df_with_length.groupBy("length_category", "sentiment").count() \
                                        .orderBy("length_category", "sentiment")
        length_sentiment.show()

        return {
            'hourly_sentiment': hourly_sentiment.toPandas(),
            'user_variety': user_variety.toPandas(),
            'length_sentiment': length_sentiment.toPandas()
        }

    def predict_new_tweets(self, model, new_texts):
        """Predict sentiment for new tweets using trained model"""
        print("\n=== PREDICTING NEW TWEETS ===")

        # Create DataFrame with new texts
        new_data = self.spark.createDataFrame([(text,) for text in new_texts], ["text"])
        new_data = new_data.withColumn("cleaned_text", self.clean_text_udf(col("text")))

        # Make predictions
        predictions = model.transform(new_data)

        # Show results
        results = predictions.select("text", "prediction",
                                   when(col("prediction") == 0, "negative")
                                   .when(col("prediction") == 1, "positive")
                                   .otherwise("neutral").alias("predicted_sentiment"))

        print("Predictions for new tweets:")
        results.show(truncate=False)

        return results.toPandas()

    def generate_insights_report(self, df, analysis_results):
        """Generate comprehensive insights report"""
        print("\n" + "="*50)
        print("SENTIMENT140 DATASET ANALYSIS REPORT")
        print("="*50)

        total_tweets = df.count()
        unique_users = df.select("user").distinct().count()

        print(f"\nDATASET OVERVIEW:")
        print(f"• Total Tweets Analyzed: {total_tweets:,}")
        print(f"• Unique Users: {unique_users:,}")
        print(f"• Average Tweets per User: {total_tweets/unique_users:.1f}")

        # Sentiment insights
        sentiment_dist = df.groupBy("sentiment").count().collect()
        sentiment_dict = {row['sentiment']: row['count'] for row in sentiment_dist}

        print(f"\nSENTIMENT DISTRIBUTION:")
        for sentiment, count in sentiment_dict.items():
            percentage = (count / total_tweets) * 100
            print(f"• {sentiment.title()}: {count:,} ({percentage:.1f}%)")

        # Time insights
        peak_hour = df.groupBy("hour").count().orderBy(desc("count")).first()
        print(f"\nTEMPORAL INSIGHTS:")
        print(f"• Peak Activity Hour: {peak_hour['hour']:02d}:00 ({peak_hour['count']:,} tweets)")

        # Model performance
        if 'accuracy' in analysis_results:
            print(f"\nMODEL PERFORMANCE:")
            print(f"• Best Model: {analysis_results['model_name']}")
            print(f"• Accuracy: {analysis_results['accuracy']:.1%}")
            print(f"• F1-Score: {analysis_results['f1_score']:.3f}")

        print("\n" + "="*50)

    def visualize_results(self, analysis_data):
        """Create visualizations for the analysis results"""
        print("\nGenerating visualizations...")

        # Sentiment distribution pie chart
        plt.figure(figsize=(15, 10))

        # Subplot 1: Sentiment Distribution
        plt.subplot(2, 3, 1)
        sentiment_data = analysis_data['sentiment_dist']
        plt.pie(sentiment_data['count'], labels=sentiment_data['sentiment'], autopct='%1.1f%%')
        plt.title('Sentiment Distribution')

        # Subplot 2: Hourly Tweet Patterns
        plt.subplot(2, 3, 2)
        hourly_data = analysis_data['hourly_dist']
        plt.bar(hourly_data['hour'], hourly_data['count'])
        plt.title('Tweets by Hour of Day')
        plt.xlabel('Hour')
        plt.ylabel('Tweet Count')

        # Subplot 3: Word Frequency
        plt.subplot(2, 3, 3)
        if 'overall_words' in analysis_data:
            words_data = analysis_data['overall_words'].head(10)
            plt.barh(words_data['word'], words_data['count'])
            plt.title('Top 10 Most Common Words')
            plt.xlabel('Frequency')

        plt.tight_layout()
        plt.savefig('sentiment140_analysis.png', dpi=300, bbox_inches='tight')
        print("Visualizations saved as 'sentiment140_analysis.png'")

    def save_results(self, df, results, output_path):
        """Save analysis results to files"""
        print(f"\nSaving results to {output_path}...")

        # Save processed dataset
        df.write.mode("overwrite").parquet(f"{output_path}/processed_sentiment140.parquet")

        # Save analysis results as JSON
        import json
        results_json = {}
        for key, value in results.items():
            if hasattr(value, 'to_dict'):
                results_json[key] = value.to_dict()
            elif isinstance(value, (int, float, str)):
                results_json[key] = value

        with open(f"{output_path}/analysis_results.json", 'w') as f:
            json.dump(results_json, f, indent=2)

        print("Results saved successfully!")

    def close(self):
        """Close Spark session"""
        self.spark.stop()

In [11]:
# Part 3: Data Loading and Preprocessing

import kagglehub
import os

print("Starting Sentiment140 Dataset Analysis with Apache Spark")
print("="*60)

# Initialize analyzer
analyzer = Sentiment140Analyzer()

# Step 0: Download the dataset using kagglehub
print("Downloading dataset from Kaggle...")
download_path = kagglehub.dataset_download("kazanova/sentiment140")
dataset_path = os.path.join(download_path, "training.1600000.processed.noemoticon.csv")

# Configuration
output_path = "sentiment140_results"

# Step 1: Load dataset
print(f"\nStep 1: Loading Sentiment140 dataset from {dataset_path}...")
df = analyzer.load_sentiment140_dataset(dataset_path)

# Step 2: Preprocess data
print("\nStep 2: Preprocessing data...")
df = analyzer.preprocess_sentiment140_data(df)

Starting Sentiment140 Dataset Analysis with Apache Spark
Downloading dataset from Kaggle...
Using Colab cache for faster access to the 'sentiment140' dataset.

Step 1: Loading Sentiment140 dataset from /kaggle/input/sentiment140/training.1600000.processed.noemoticon.csv...
Dataset loaded successfully!
Total records: 1600000
+------+----------+--------------------+--------+---------------+--------------------+
|target|       ids|                date|    flag|           user|                text|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|

In [12]:
# Part 4: EDA and Word Frequency Analysis

print("\nStep 3: Performing Exploratory Data Analysis...")
eda_results = analyzer.exploratory_data_analysis(df)

print("\nStep 4: Analyzing word frequencies...")
word_analysis = analyzer.word_frequency_analysis(df)


Step 3: Performing Exploratory Data Analysis...

=== EXPLORATORY DATA ANALYSIS ===
1. Dataset Overview:
Total tweets: 1,559,073
Unique users: 649,936

2. Sentiment Distribution:
+---------+------+
|sentiment| count|
+---------+------+
| negative|783531|
| positive|775542|
+---------+------+


3. Temporal Distribution:
+----+-----+-------+
|year|month|  count|
+----+-----+-------+
|NULL| NULL|1559073|
+----+-----+-------+


4. Hourly Tweet Patterns:
+----+-------+
|hour|  count|
+----+-------+
|NULL|1559073|
+----+-------+


5. Most Active Users:
+---------------+-----+
|           user|count|
+---------------+-----+
|       lost_dog|  549|
|        webwoke|  345|
|    VioletsCRUK|  276|
|SallytheShizzle|  272|
|    mcraddictal|  259|
|       tsarnick|  246|
|    what_bugs_u|  246|
|      DarkPiano|  226|
|   SongoftheOss|  220|
|      Jayme1988|  217|
+---------------+-----+


6. Text Length Statistics:
+-------+------------------+
|summary|       text_length|
+-------+---------------

In [44]:
# Checking  Display a few examples from the raw 'date' column
df.select("date").show(5, truncate=False)

+----------------------------+
|date                        |
+----------------------------+
|Mon Apr 06 22:19:45 PDT 2009|
|Mon Apr 06 22:19:49 PDT 2009|
|Mon Apr 06 22:19:53 PDT 2009|
|Mon Apr 06 22:19:57 PDT 2009|
|Mon Apr 06 22:19:57 PDT 2009|
+----------------------------+
only showing top 5 rows



In [21]:
# Part 5: Model Building and Evaluation

print("\nStep 5: Building sentiment classification models...")
model_results = analyzer.build_sentiment_classifier(df)


Step 5: Building sentiment classification models...

=== BUILDING SENTIMENT CLASSIFIER ===
Training set: 1,247,647 tweets
Test set: 311,426 tweets

1. Training Logistic Regression Model...


SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.DATETIME_PATTERN_RECOGNITION] You may get a different result due to the upgrading to Spark >= 3.0:
Fail to recognize 'EEE MMM dd HH:mm:ss XXX yyyy' pattern in the DateTimeFormatter. 1) You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from 'https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html'.

In [22]:
# Part 6: Advanced Analysis and Prediction

print("\nStep 6: Performing advanced sentiment analysis...")
advanced_results = analyzer.advanced_sentiment_analysis(df)

print("\nStep 7: Testing model with new tweets...")
test_tweets = [
    "I love this new product! It's amazing!",
    "This is terrible service. Very disappointed.",
    "The weather is nice today.",
    "Best day ever! So happy right now!",
    "Worst experience of my life. Avoid this place."
]
predictions = analyzer.predict_new_tweets(model_results['best_model'], test_tweets)


Step 6: Performing advanced sentiment analysis...

=== ADVANCED SENTIMENT ANALYSIS ===
1. Sentiment Patterns by Hour:
+----+---------+------+
|hour|sentiment| count|
+----+---------+------+
|NULL| negative|783531|
|NULL| positive|775542|
+----+---------+------+

2. User Sentiment Consistency:
Users with most diverse sentiment (both positive and negative):
+---------------+---------------+------------+
|           user|sentiment_types|total_tweets|
+---------------+---------------+------------+
|        webwoke|              2|         345|
|    VioletsCRUK|              2|         276|
|SallytheShizzle|              2|         272|
|    mcraddictal|              2|         259|
|       tsarnick|              2|         246|
|      DarkPiano|              2|         226|
|   SongoftheOss|              2|         220|
|      Jayme1988|              2|         217|
|         keza34|              2|         215|
|    Karen230683|              2|         213|
+---------------+-------------

NameError: name 'model_results' is not defined

In [None]:
# Part 7: Final Reporting and Cleanup

# Step 8: Generate comprehensive report
all_results = {**eda_results, **word_analysis, **model_results, **advanced_results}
analyzer.generate_insights_report(df, model_results)

# Step 9: Create visualizations
analyzer.visualize_results({**eda_results, **word_analysis})

# Step 10: Save results
analyzer.save_results(df, all_results, output_path)

print("\n✅ Analysis completed successfully!")
print(f"📊 Check '{output_path}' folder for detailed results")
print("📈 Visualizations saved as 'sentiment140_analysis.png'")

analyzer.close()

# Additional utility functions for Sentiment140 dataset

def download_sentiment140_dataset():
    """Instructions to download Sentiment140 dataset"""
    instructions = """
    To download the Sentiment140 dataset:
    
    1. Go to: https://www.kaggle.com/datasets/kazanova/sentiment140
    2. Click 'Download' (requires Kaggle account)
    3. Extract the ZIP file
    4. The main file is: training.1600000.processed.noemoticon.csv
    5. Update the dataset_path variable in main() function
    
    Alternative download via Kaggle API:
    pip install kaggle
    kaggle datasets download -d kazanova/sentiment140
    """
    return instructions

def sample_sentiment140_for_testing(input_path, output_path, sample_size=10000):
    """Create a smaller sample for testing purposes"""
    analyzer = Sentiment140Analyzer("SampleCreator")
    
    # Load full dataset
    df = analyzer.load_sentiment140_dataset(input_path)
    
    # Take a random sample
    sample_df = df.sample(fraction=sample_size/df.count(), seed=42)
    
    # Save sample
    sample_df.write.mode("overwrite").csv(output_path, header=True)
    
    print(f"Sample of {sample_size} tweets saved to {output_path}")
    analyzer.close()

# Cloud deployment specific configurations
CLOUD_CONFIGS = {
    "aws_emr": {
        "dataset_path": "s3://your-bucket/sentiment140/training.1600000.processed.noemoticon.csv",
        "output_path": "s3://your-bucket/results/sentiment140/",
        "spark_configs": {
            "spark.executor.memory": "4g",
            "spark.driver.memory": "2g",
            "spark.executor.cores": "2"
        }
    },
    "gcp_dataproc": {
        "dataset_path": "gs://your-bucket/sentiment140/training.1600000.processed.noemoticon.csv",
        "output_path": "gs://your-bucket/results/sentiment140/",
        "spark_configs": {
            "spark.executor.memory": "4g",
            "spark.driver.memory": "2g"
        }
    },
    "azure_hdinsight": {
        "dataset_path": "abfss://container@account.dfs.core.windows.net/sentiment140/training.csv",
        "output_path": "abfss://container@account.dfs.core.windows.net/results/sentiment140/",
        "spark_configs": {
            "spark.executor.memory": "4g",
            "spark.driver.memory": "2g"
        }
    }
}