In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()

In [None]:
df = spark.read.csv("/content/drive/MyDrive/BDA/twitter_training.csv", header=False, inferSchema=True)


In [None]:
df.show(5)

+----+-----------+--------+--------------------+
| _c0|        _c1|     _c2|                 _c3|
+----+-----------+--------+--------------------+
|2401|Borderlands|Positive|im getting on bor...|
|2401|Borderlands|Positive|I am coming to th...|
|2401|Borderlands|Positive|im getting on bor...|
|2401|Borderlands|Positive|im coming on bord...|
|2401|Borderlands|Positive|im getting on bor...|
+----+-----------+--------+--------------------+
only showing top 5 rows



In [None]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)



In [None]:
df = df.withColumnRenamed("_c0", "ID")\
       .withColumnRenamed("_c1", "platform")\
       .withColumnRenamed("_c2", "sentiment")\
       .withColumnRenamed("_c3", "tweet_content")

In [None]:
df.show(5)

+----+-----------+---------+--------------------+
|  ID|   platform|sentiment|       tweet_content|
+----+-----------+---------+--------------------+
|2401|Borderlands| Positive|im getting on bor...|
|2401|Borderlands| Positive|I am coming to th...|
|2401|Borderlands| Positive|im getting on bor...|
|2401|Borderlands| Positive|im coming on bord...|
|2401|Borderlands| Positive|im getting on bor...|
+----+-----------+---------+--------------------+
only showing top 5 rows



In [None]:
df_clean = df.dropDuplicates(["ID", "tweet_content"])

In [None]:
df_clean = df_clean.dropna()

In [None]:
df_clean.show(5)

+---+--------+---------+--------------------+
| ID|platform|sentiment|       tweet_content|
+---+--------+---------+--------------------+
|  1|  Amazon| Negative|          <unk> wtf.|
|  1|  Amazon| Negative|       @ amazon wtf.|
|  2|  Amazon| Negative|I am really disap...|
|  2|  Amazon| Negative|I mean ’ wa m rea...|
|  2|  Amazon| Negative|I'm really disapp...|
+---+--------+---------+--------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer

In [None]:
df_clean = df_clean.withColumn("cleaned_tweet", lower(col("tweet_content")))
df_clean = df_clean.withColumn("cleaned_tweet", regexp_replace(col("cleaned_tweet"), "[^a-zA-Z\\s]", ""))

In [None]:
tokenizer = Tokenizer(inputCol="cleaned_tweet", outputCol="tokens")
df_clean = tokenizer.transform(df_clean)

In [None]:
df_clean.select("ID", "tokens").show(5)

+---+--------------------+
| ID|              tokens|
+---+--------------------+
|  1|          [unk, wtf]|
|  1|     [, amazon, wtf]|
|  2|[i, am, really, d...|
|  2|[i, mean, , wa, m...|
|  2|[im, really, disa...|
+---+--------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import when

In [None]:
df_clean = df_clean.withColumn("sentiment_label", when(col("sentiment") == "Positive", 1).otherwise(0))

In [None]:
df_clean.select("ID", "sentiment", "sentiment_label").show(5)

+---+---------+---------------+
| ID|sentiment|sentiment_label|
+---+---------+---------------+
|  1| Negative|              0|
|  1| Negative|              0|
|  2| Negative|              0|
|  2| Negative|              0|
|  2| Negative|              0|
+---+---------+---------------+
only showing top 5 rows



In [None]:
df_filtered = df_clean.filter((col("sentiment") == "Positive") | (col("sentiment") == "Negative"))

In [None]:
df_filtered = df_filtered.withColumn("sentiment_label", when(col("sentiment") == "Positive", 1).otherwise(0))

In [None]:
df_filtered.select("ID", "sentiment", "sentiment_label").show(5)

+---+---------+---------------+
| ID|sentiment|sentiment_label|
+---+---------+---------------+
|  1| Negative|              0|
|  1| Negative|              0|
|  2| Negative|              0|
|  2| Negative|              0|
|  2| Negative|              0|
+---+---------+---------------+
only showing top 5 rows



In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline

In [None]:
tokenizer = Tokenizer(inputCol="tweet_content", outputCol="words")

In [None]:
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=1000)

In [None]:
idf = IDF(inputCol="raw_features", outputCol="features")

In [None]:
pipeline = Pipeline(stages=[tokenizer, hashing_tf, idf])

In [None]:
model = pipeline.fit(df_filtered)

In [None]:
df_features = model.transform(df_filtered)

In [None]:
train_data, test_data = df_features.randomSplit([0.8, 0.2], seed=42)

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
lr = LogisticRegression(labelCol="sentiment_label", featuresCol="features")

In [None]:
lr_model = lr.fit(train_data)

In [None]:
predictions = lr_model.transform(test_data)

In [None]:
predictions.select("ID", "sentiment", "sentiment_label", "prediction").show(5)

+---+---------+---------------+----------+
| ID|sentiment|sentiment_label|prediction|
+---+---------+---------------+----------+
|  2| Negative|              0|       0.0|
|  4| Negative|              0|       0.0|
|  4| Negative|              0|       0.0|
|  9| Negative|              0|       0.0|
| 12| Positive|              1|       0.0|
+---+---------+---------------+----------+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol="sentiment_label", rawPredictionCol="prediction")

In [None]:
accuracy = evaluator.evaluate(predictions)

In [None]:
print("Model Accuracy: ", accuracy)

Model Accuracy:  0.7536641495447212
