In [1]:
import os
import torch
import numpy as np
import pandas as pd
from transformers import BertTokenizer, BertModel
from torch import nn, optim
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# ================================
# 1️⃣ Initialize Spark Session
# ================================
spark = SparkSession.builder \
    .appName("SentimentAnalysis") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://master:9000") \
    .getOrCreate()

2025-04-02 14:32:10.644645: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-04-02 14:32:10.648970: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-04-02 14:32:10.737904: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-04-02 14:32:10.738815: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
25/04/02 14:32:16 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take e

In [2]:
# ================================
# 2️⃣ Load Dataset into Spark
# ================================
df = spark.read.csv("hdfs://localhost:9000/covid/twitter_dataset.csv", header=True, inferSchema=True)
df = df.na.drop()  # Drop missing values


                                                                                

In [3]:
df.show(5)
df.printSchema()
df.count()  # Check number of rows


+--------+----------+--------------------+--------------------+----+--------------+-------------+---------------+--------------------+--------------------+--------------------+--------------------+--------+-----+-----+-----+---------+
|      id|created_at|              source|       original_text|lang|favorite_count|retweet_count|original_author|            hashtags|       user_mentions|               place|         clean_tweet|compound|  neg|  neu|  pos|sentiment|
+--------+----------+--------------------+--------------------+----+--------------+-------------+---------------+--------------------+--------------------+--------------------+--------------------+--------+-----+-----+-----+---------+
|1.25e+18|2020-04-19|"<a href=""http:/...|RT @morethanmySLE...|  en|           0.0|        474.0|DrJeffreyPOlson|         coronavirus|       morethanmySLE|      JPO Aesthetics|break new york wo...| -0.5994|0.262|0.738|  0.0|      neg|
|1.25e+18|2020-04-19|"<a href=""http:/...|RT @ClevelandClin.

                                                                                

9609

In [4]:
df.groupBy("sentiment").count().show()




+---------+-----+
|sentiment|count|
+---------+-----+
|      pos| 3430|
|    0.422|    1|
|      0.0|   68|
|      neg| 2053|
|    0.268|    1|
|    0.271|    1|
|      neu| 4005|
|      1.0|    6|
|    0.419|    1|
|    0.806|    1|
|    0.219|    1|
|    0.238|    1|
|    0.231|    1|
|    0.184|    1|
|    0.894|    1|
|    0.359|    1|
|    0.242|    1|
|    0.672|    1|
|    0.177|    1|
|    0.627|    1|
+---------+-----+
only showing top 20 rows



                                                                                

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, row_number, rand, min
from pyspark.sql.window import Window

# 3️⃣ Fix Sentiment Column
# ================================
df = df.withColumn(
    "sentiment",
    when(col("sentiment") == "pos", "pos")
    .when(col("sentiment") == "neg", "neg")
    .when(col("sentiment") == "neu", "neu")
    .otherwise(
        when(col("sentiment") >= 0.05, "pos")
        .when(col("sentiment") <= -0.05, "neg")
        .otherwise("neu")
    ),
)

# Verify fixed sentiment categories
df.groupBy("sentiment").count().show()



                                                                                

+---------+-----+
|sentiment|count|
+---------+-----+
|      pos| 3483|
|      neg| 2053|
|      neu| 4073|
+---------+-----+



In [None]:
# ================================
# 4️⃣ Reduce Dataset to Balanced Samples
# ================================
# Get minimum class count


In [6]:
# Stratified sampling using window function

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# ✅ Step 1: Check the smallest class count
sentiment_counts = df.groupBy("sentiment").count()
sentiment_counts.show()

min_class_count = sentiment_counts.agg(F.min("count")).collect()[0][0]

# ✅ Step 2: Apply stratified sampling
window_spec = Window.partitionBy("sentiment").orderBy(F.rand())
df = df.withColumn("row_num", F.row_number().over(window_spec))

# Keep only `min_class_count` samples per class
balanced_df = df.filter(F.col("row_num") <= min_class_count).drop("row_num")

# ✅ Step 3: Verify final distribution
balanced_df.groupBy("sentiment").count().show()


                                                                                

+---------+-----+
|sentiment|count|
+---------+-----+
|      pos| 3483|
|      neg| 2053|
|      neu| 4073|
+---------+-----+





+---------+-----+
|sentiment|count|
+---------+-----+
|      neg| 2053|
|      neu| 2053|
|      pos| 2053|
+---------+-----+



                                                                                

In [7]:
# Verify new balanced dataset count
balanced_df.groupBy("sentiment").count().show()




+---------+-----+
|sentiment|count|
+---------+-----+
|      neg| 2053|
|      neu| 2053|
|      pos| 2053|
+---------+-----+



                                                                                

In [9]:
# ================================
# 5️⃣ Convert to Pandas
# ================================
if balanced_df.count() > 0:
    pandas_df = balanced_df.toPandas()
    print(pandas_df.head())
else:
    print("🚨 Error: Balanced dataset is empty! Check your filtering conditions.")

                                                                                

         id  created_at                                             source  \
0  1.26e+18  2020-05-06  "<a href=""https://mobile.twitter.com"" rel=""...   
1  1.25e+18  2020-04-26  "<a href=""https://mobile.twitter.com"" rel=""...   
2  1.25e+18  2020-04-26  "<a href=""http://twitter.com/download/iphone"...   
3  1.26e+18  2020-05-22  "<a href=""https://mobile.twitter.com"" rel=""...   
4  1.26e+18  2020-05-12  "<a href=""http://twitter.com/download/iphone"...   

                                       original_text lang favorite_count  \
0  RT @nccdd: Did you miss our #COVID19 #webinars...   en            3.0   
1  RT @AdvocateKids: Our #HealthCareHeroes are pu...   en            0.0   
2  RT @iran_policy: #VirtualConference #Coronavir...   en            0.0   
3  RT @UNICEF_ECA: The #COVID19 pandemic has made...   en            0.0   
4  RT @OzraeliAvi: #BREAKING: Chinese human right...   en            0.0   

  retweet_count  original_author  \
0           0.0  DINorthCarolina   
1 

In [10]:
# 🚀 Debug: Check if the dataset is empty before conversion
print("✅ Original Dataset Count:", df.count())
print("✅ Balanced Dataset Count:", balanced_df.count())

# Check class distribution after balancing
balanced_df.groupBy("sentiment").count().show()


                                                                                

✅ Original Dataset Count: 9609


                                                                                

✅ Balanced Dataset Count: 6159




+---------+-----+
|sentiment|count|
+---------+-----+
|      neg| 2053|
|      neu| 2053|
|      pos| 2053|
+---------+-----+



                                                                                

In [None]:
#with word embeddings

In [11]:
pip install numpy pandas scikit-learn nltk


Note: you may need to restart the kernel to use updated packages.


In [12]:
import pandas as pd
import numpy as np
import re
import nltk
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report

# Download stopwords if not already downloaded
nltk.download("stopwords")
stop_words = set(stopwords.words("english"))

# 1️⃣ Load your dataset
df = pandas_df.copy()  # Ensure it contains 'clean_tweet' and 'sentiment' columns

# 2️⃣ Text Preprocessing Function
def clean_text(text):
    text = text.lower()  # Convert to lowercase
    text = re.sub(r"http\S+|www\S+|https\S+", "", text, flags=re.MULTILINE)  # Remove URLs
    text = re.sub(r"[^\w\s]", "", text)  # Remove punctuation
    text = " ".join([word for word in text.split() if word not in stop_words])  # Remove stopwords
    return text

# Apply preprocessing to the text column
df["clean_tweet"] = df["clean_tweet"].astype(str).apply(clean_text)

# 3️⃣ Convert Sentiments into Numerical Labels
sentiment_mapping = {"pos": 1, "neg": 0, "neu": 2}
df["sentiment"] = df["sentiment"].map(sentiment_mapping)

# 4️⃣ Convert Text Data into Numerical Representation (TF-IDF)
vectorizer = TfidfVectorizer(max_features=5000)  # Use 5000 important words
X = vectorizer.fit_transform(df["clean_tweet"])
y = df["sentiment"]

# 5️⃣ Split Data into Training and Testing Sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 6️⃣ Train Models
models = {
    "Naïve Bayes": MultinomialNB(),
    "Logistic Regression": LogisticRegression(),
    "Support Vector Machine (SVM)": SVC(kernel="linear")
}

# Train and Evaluate
for model_name, model in models.items():
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    print(f"\n🔹 Model: {model_name}")
    print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
    print(classification_report(y_test, y_pred, target_names=["Negative", "Positive", "Neutral"]))


[nltk_data] Downloading package stopwords to
[nltk_data]     /home/satvika/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!



🔹 Model: Naïve Bayes
Accuracy: 0.7508
              precision    recall  f1-score   support

    Negative       0.77      0.78      0.77       431
    Positive       0.72      0.79      0.75       408
     Neutral       0.77      0.67      0.72       393

    accuracy                           0.75      1232
   macro avg       0.75      0.75      0.75      1232
weighted avg       0.75      0.75      0.75      1232


🔹 Model: Logistic Regression
Accuracy: 0.8239
              precision    recall  f1-score   support

    Negative       0.90      0.79      0.84       431
    Positive       0.89      0.76      0.82       408
     Neutral       0.72      0.93      0.81       393

    accuracy                           0.82      1232
   macro avg       0.84      0.83      0.82      1232
weighted avg       0.84      0.82      0.82      1232


🔹 Model: Support Vector Machine (SVM)
Accuracy: 0.8531
              precision    recall  f1-score   support

    Negative       0.92      0.82      0.

In [13]:
pip install xgboost


Note: you may need to restart the kernel to use updated packages.


In [14]:
import pandas as pd
import numpy as np
import re
import nltk
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, classification_report

# Download stopwords if not already downloaded
nltk.download("stopwords")
stop_words = set(stopwords.words("english"))

# 1️⃣ Load your dataset
df = pandas_df.copy()  # Ensure it contains 'clean_tweet' and 'sentiment' columns

# 2️⃣ Text Preprocessing Function
def clean_text(text):
    text = text.lower()  # Convert to lowercase
    text = re.sub(r"http\S+|www\S+|https\S+", "", text, flags=re.MULTILINE)  # Remove URLs
    text = re.sub(r"[^\w\s]", "", text)  # Remove punctuation
    text = " ".join([word for word in text.split() if word not in stop_words])  # Remove stopwords
    return text

# Apply preprocessing to the text column
df["clean_tweet"] = df["clean_tweet"].astype(str).apply(clean_text)

# 3️⃣ Convert Sentiments into Numerical Labels
sentiment_mapping = {"pos": 1, "neg": 0, "neu": 2}
df["sentiment"] = df["sentiment"].map(sentiment_mapping)

# 4️⃣ Convert Text Data into Numerical Representation (TF-IDF)
vectorizer = TfidfVectorizer(max_features=5000)  # Use 5000 important words
X = vectorizer.fit_transform(df["clean_tweet"])
y = df["sentiment"]

# 5️⃣ Split Data into Training and Testing Sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 6️⃣ Define Models
models = {
    "Naïve Bayes": MultinomialNB(),
    "Logistic Regression": LogisticRegression(),
    "Support Vector Machine (SVM)": SVC(kernel="linear"),
    "Random Forest": RandomForestClassifier(n_estimators=100, random_state=42),
    "Gradient Boosting": GradientBoostingClassifier(n_estimators=100, random_state=42),
    "XGBoost": XGBClassifier(use_label_encoder=False, eval_metric='mlogloss')
}

# Train and Evaluate Models
for model_name, model in models.items():
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"\n🔹 Model: {model_name}")
    print(f"Accuracy: {accuracy:.4f}")
    print(classification_report(y_test, y_pred, target_names=["Negative", "Positive", "Neutral"]))

    # Cross-validation
    scores = cross_val_score(model, X, y, cv=5)
    print(f"Cross-validation Accuracy: {scores.mean():.4f} ± {scores.std():.4f}")


[nltk_data] Downloading package stopwords to
[nltk_data]     /home/satvika/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!



🔹 Model: Naïve Bayes
Accuracy: 0.7508
              precision    recall  f1-score   support

    Negative       0.77      0.78      0.77       431
    Positive       0.72      0.79      0.75       408
     Neutral       0.77      0.67      0.72       393

    accuracy                           0.75      1232
   macro avg       0.75      0.75      0.75      1232
weighted avg       0.75      0.75      0.75      1232

Cross-validation Accuracy: 0.7350 ± 0.0037

🔹 Model: Logistic Regression
Accuracy: 0.8239
              precision    recall  f1-score   support

    Negative       0.90      0.79      0.84       431
    Positive       0.89      0.76      0.82       408
     Neutral       0.72      0.93      0.81       393

    accuracy                           0.82      1232
   macro avg       0.84      0.83      0.82      1232
weighted avg       0.84      0.82      0.82      1232

Cross-validation Accuracy: 0.8172 ± 0.0159

🔹 Model: Support Vector Machine (SVM)
Accuracy: 0.8531
          

Parameters: { "use_label_encoder" } are not used.




🔹 Model: XGBoost
Accuracy: 0.8563
              precision    recall  f1-score   support

    Negative       0.97      0.77      0.86       431
    Positive       0.92      0.81      0.86       408
     Neutral       0.74      1.00      0.85       393

    accuracy                           0.86      1232
   macro avg       0.88      0.86      0.86      1232
weighted avg       0.88      0.86      0.86      1232



Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.



Cross-validation Accuracy: 0.8522 ± 0.0135
