In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SentimentAnalysis") \
    .master("local[*]") \
    .config("spark.executor.memory", "12g") \
    .config("spark.driver.memory", "12g") \
    .getOrCreate()

In [6]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("TweetID", IntegerType(), True),
    StructField("Entity", StringType(), True),
    StructField("Sentiment", StringType(), True),
    StructField("Content", StringType(), True)
])

df = spark.read.csv("./Spark/twitter_training.csv", header=False, schema=schema)
df.show()


+-------+-----------+---------+--------------------+
|TweetID|     Entity|Sentiment|             Content|
+-------+-----------+---------+--------------------+
|   2401|Borderlands| Positive|im getting on bor...|
|   2401|Borderlands| Positive|I am coming to th...|
|   2401|Borderlands| Positive|im getting on bor...|
|   2401|Borderlands| Positive|im coming on bord...|
|   2401|Borderlands| Positive|im getting on bor...|
|   2401|Borderlands| Positive|im getting into b...|
|   2402|Borderlands| Positive|So I spent a few ...|
|   2402|Borderlands| Positive|So I spent a coup...|
|   2402|Borderlands| Positive|So I spent a few ...|
|   2402|Borderlands| Positive|So I spent a few ...|
|   2402|Borderlands| Positive|2010 So I spent a...|
|   2402|Borderlands| Positive|                 was|
|   2403|Borderlands|  Neutral|Rock-Hard La Varl...|
|   2403|Borderlands|  Neutral|Rock-Hard La Varl...|
|   2403|Borderlands|  Neutral|Rock-Hard La Varl...|
|   2403|Borderlands|  Neutral|Rock-Hard La Vi

In [7]:
df_filtered = df.filter(df["Content"].isNotNull())
df = df.fillna({"Content": "default_value"})

In [8]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
import re

def preprocess_text(text):
    text = text.lower()
    text = re.sub(r'<.*?>', '', text)  
    text = re.sub(r'http\S+|www\S+', '', text)  
    text = re.sub(r'\d+', '', text)  
    text = re.sub(r'[^\w\s]', '', text)  
    return text

preprocess_text_udf = spark.udf.register("preprocess_text", preprocess_text)
df = df.withColumn("Content", preprocess_text_udf("Content"))

tokenizer = Tokenizer(inputCol="Content", outputCol="words")
df_words = tokenizer.transform(df)

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
df_filtered_words = remover.transform(df_words)

hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures")
featurizedData = hashingTF.transform(df_filtered_words)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.show()

+-------+-----------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|TweetID|     Entity|Sentiment|             Content|               words|            filtered|         rawFeatures|            features|
+-------+-----------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|   2401|Borderlands| Positive|im getting on bor...|[im, getting, on,...|[im, getting, bor...|(262144,[31015,92...|(262144,[31015,92...|
|   2401|Borderlands| Positive|I am coming to th...|[i, am, coming, t...|[coming, borders,...|(262144,[12409,14...|(262144,[12409,14...|
|   2401|Borderlands| Positive|im getting on bor...|[im, getting, on,...|[im, getting, bor...|(262144,[31015,68...|(262144,[31015,68...|
|   2401|Borderlands| Positive|im coming on bord...|[im, coming, on, ...|[im, coming, bord...|(262144,[12409,31...|(262144,[12409,31...|
|   2401|Borderlands| Positive|im getting

In [9]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

indexer = StringIndexer(inputCol="Sentiment", outputCol="label")
df_final = indexer.fit(rescaledData).transform(rescaledData)

In [10]:
train_data, test_data = df_final.randomSplit([0.8, 0.2], seed=1234)

In [15]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol='features', labelCol='label')

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.01])  
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) 
             .addGrid(lr.maxIter, [10, 50, 100]) 
             .build())

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds = 3)  

cvModel = crossval.fit(train_data)
predictions = cvModel.transform(test_data)
accuracy = evaluator.evaluate(predictions)

print("Logistic Regression Accuracy: %f" % accuracy)

bestModel = cvModel.bestModel
print("Best model parameters:")
print("regParam:", bestModel._java_obj.getRegParam())
print("elasticNetParam:", bestModel._java_obj.getElasticNetParam())
print("maxIter:", bestModel._java_obj.getMaxIter())

Logistic Regression Accuracy: 0.866391
Best model parameters:
regParam: 0.01
elasticNetParam: 0.0
maxIter: 10


In [13]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(featuresCol='features', labelCol='label')

paramGrid = (ParamGridBuilder()
             .addGrid(nb.smoothing, [0.0, 1.0, 2.0])  
             .build())

crossval = CrossValidator(estimator=nb,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  

cvModel = crossval.fit(train_data)

predictions = cvModel.transform(test_data)

accuracy = evaluator.evaluate(predictions)

print("Naive Bayes Classifier Accuracy: %f" % accuracy)

bestModel = cvModel.bestModel
print("Best model parameters:")
print("Smoothing parameter:", bestModel._java_obj.getSmoothing())

Naive Bayes Classifier Accuracy: 0.825080
Best model parameters:
Smoothing parameter: 1.0
