In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import re
import string

In [2]:
# Initialize Spark session
spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()

In [4]:
# Load dataset
df_train = spark.read.csv("/content/twitter_training.csv", header=False, inferSchema=True)
df_train = df_train.toDF("ID", "Topic", "Sentiment", "Tweet")

# Text cleaning function
def clean_text(text):
    if not isinstance(text, str):  # Ensure text is a string
        return ""
    text = text.lower()
    text = re.sub(f"[{string.punctuation}]", "", text)
    text = re.sub(r"\d+", "", text)
    return text

clean_text_udf = udf(clean_text, StringType())
df_train = df_train.withColumn("Cleaned_Tweet", clean_text_udf(col("Tweet")))

# Convert labels to numerical format
indexer = StringIndexer(inputCol="Sentiment", outputCol="label")

# Tokenization and TF-IDF feature extraction
tokenizer = Tokenizer(inputCol="Cleaned_Tweet", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Naive Bayes classifier
nb = NaiveBayes()

# Build pipeline
pipeline = Pipeline(stages=[indexer, tokenizer, remover, hashingTF, idf, nb])

# Split data
train_data, test_data = df_train.randomSplit([0.8, 0.2], seed=42)

# Train the model
model = pipeline.fit(train_data)

# Predictions
predictions = model.transform(test_data)

# Evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Accuracy: {accuracy:.2f}")
predictions.select("Sentiment", "Cleaned_Tweet", "prediction").show(10)

Accuracy: 0.66
+----------+--------------------+----------+
| Sentiment|       Cleaned_Tweet|prediction|
+----------+--------------------+----------+
|  Negative|          amazon wtf|       0.0|
|  Negative|i am really disap...|       0.0|
|  Negative|im really disappo...|       0.0|
|   Neutral|admit it subs cra...|       2.0|
|  Negative| amazon probably ...|       0.0|
|  Negative|amazon probably s...|       0.0|
|Irrelevant|youve purchased  ...|       0.0|
|   Neutral|love speculative ...|       3.0|
|  Negative|amazon be having ...|       0.0|
|  Negative|amazon be having ...|       2.0|
+----------+--------------------+----------+
only showing top 10 rows



In [5]:
# Get label encoding mapping
labels_map = model.stages[0].labels
print("Label Encoding Mapping:")
for i, label in enumerate(labels_map):
    print(f"{label} -> {i}")

Label Encoding Mapping:
Negative -> 0
Positive -> 1
Neutral -> 2
Irrelevant -> 3


In [6]:
print(df_train.rdd.getNumPartitions())


2
