In [74]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import NaiveBayes, LogisticRegression, RandomForestClassifier, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
from emoji import replace_emoji
import re

In [63]:
spark = SparkSession.builder \
    .appName("Sentiment Analysis") \
    .master("local[*]") \
    .config("spark.driver.memory", "4G") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

In [64]:
train_df = spark.read.option("header", "true").csv("./dataset/train.csv")
train_df = train_df.withColumn("label", (col("target") / 4).cast("int"))

In [65]:
train_df.createOrReplaceTempView("data")
spark.sql("SELECT * FROM data").show()

+------+----------+--------------------+--------+---------------+--------------------+-----+
|target|       ids|                date|    flag|           user|                text|label|
+------+----------+--------------------+--------+---------------+--------------------+-----+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|    0|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|    0|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|    0|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|    0|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|    0|
|     0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|    0|
|     0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |    0|
|     0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLT

In [66]:
def clean_text(text):
    if text is None: return ""

    text = replace_emoji(text, '')
    text = re.sub(r"http\S+|www.\S+", "", text)
    text = re.sub(r"[^\w\s]", "", text)

    return text.lower().strip()

clean_udf = udf(clean_text, StringType())

train_df = train_df.withColumn("clean_text", clean_udf(col("text")))

In [67]:
tokenizer = RegexTokenizer() \
    .setInputCol("clean_text") \
    .setOutputCol("tokens") \
    .setPattern("\\W+") \
    .setGaps(True)

remover = StopWordsRemover() \
    .setInputCol("tokens") \
    .setOutputCol("filtered_text")

hashingTF = HashingTF() \
    .setInputCol("filtered_text") \
    .setOutputCol("rawfeatures") \
    .setNumFeatures(50000)

idf = IDF() \
    .setInputCol("rawfeatures") \
    .setOutputCol("features")

In [68]:
preprocessing_pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf])
model = preprocessing_pipeline.fit(train_df)
train_df = model.transform(train_df)
# model.write().overwrite().save("./preprocessing_pipeline")

                                                                                

In [69]:
test_df = spark.read.option("header", "true").csv("./dataset/test.csv")
test_df = test_df.filter(test_df.target != "2")
test_df = test_df.withColumn("clean_text", clean_udf(col("text")))
test_df = test_df.withColumn("label", (col("target") / 4).cast("int"))
test_df = model.transform(test_df)

In [70]:
nb = NaiveBayes(featuresCol="features", labelCol="label", modelType="multinomial")
nbModel = nb.fit(train_df)
predictions = nbModel.transform(test_df)
accuracyEvaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = accuracyEvaluator.evaluate(predictions)
f1Evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = f1Evaluator.evaluate(predictions)
print("Naive Bayes Metrics")
print("-------------------")
print(f"Accuracy: {accuracy:.2f}")
print(f"F-1 score: {f1:.2f}")

25/05/07 22:29:09 WARN DAGScheduler: Broadcasting large task binary with size 1655.9 KiB


Naive Bayes Metrics
-------------------
Accuracy: 0.83
F-1 score: 0.83


25/05/07 22:29:09 WARN DAGScheduler: Broadcasting large task binary with size 1655.9 KiB


In [71]:
lr = LogisticRegression(featuresCol="features", labelCol="label")
lrModel = lr.fit(train_df)
predictions = lrModel.transform(test_df)
accuracyEvaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = accuracyEvaluator.evaluate(predictions)
f1Evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = f1Evaluator.evaluate(predictions)
print("Logistic Regression Metrics")
print("-------------------")
print(f"Accuracy: {accuracy:.2f}")
print(f"F-1 score: {f1:.2f}")

25/05/07 22:30:13 WARN DAGScheduler: Broadcasting large task binary with size 1271.5 KiB
25/05/07 22:30:13 WARN DAGScheduler: Broadcasting large task binary with size 1271.5 KiB


Logistic Regression Metrics
-------------------
Accuracy: 0.81
F-1 score: 0.80


In [75]:
svc = LinearSVC(featuresCol="features", labelCol="label")
svcModel = svc.fit(train_df)
predictions = svcModel.transform(test_df)
accuracyEvaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = accuracyEvaluator.evaluate(predictions)
f1Evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = f1Evaluator.evaluate(predictions)
print("Logistic Regression Metrics")
print("-------------------")
print(f"Accuracy: {accuracy:.2f}")
print(f"F-1 score: {f1:.2f}")

25/05/07 23:02:24 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 528458 ms exceeds timeout 120000 ms
25/05/07 23:02:24 WARN SparkContext: Killing executors is not supported by current scheduler.
25/05/07 23:02:28 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

KeyboardInterrupt: 

In [72]:
# rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=20, maxDepth=10, seed=42)
# rfModel = rf.fit(train_df)
# predictions = rfModel.transform(test_df)
# accuracyEvaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
# accuracy = accuracyEvaluator.evaluate(predictions)
# f1Evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
# f1 = f1Evaluator.evaluate(predictions)
# print("Random Forest Metrics")
# print("-------------------")
# print(f"Accuracy: {accuracy:.2f}")
# print(f"F-1 score: {f1:.2f}")

25/05/07 22:33:31 WARN DAGScheduler: Broadcasting large task binary with size 1378.2 KiB
25/05/07 22:34:00 WARN DAGScheduler: Broadcasting large task binary with size 1738.4 KiB
25/05/07 22:34:03 WARN MemoryStore: Not enough space to cache rdd_805_6 in memory! (computed 181.5 MiB so far)
25/05/07 22:34:03 WARN BlockManager: Persisting block rdd_805_6 to disk instead.
25/05/07 22:34:03 WARN MemoryStore: Not enough space to cache rdd_805_4 in memory! (computed 274.0 MiB so far)
25/05/07 22:34:03 WARN BlockManager: Persisting block rdd_805_4 to disk instead.
25/05/07 22:34:03 WARN MemoryStore: Not enough space to cache rdd_805_3 in memory! (computed 274.0 MiB so far)
25/05/07 22:34:03 WARN BlockManager: Persisting block rdd_805_3 to disk instead.
25/05/07 22:34:03 WARN MemoryStore: Not enough space to cache rdd_805_7 in memory! (computed 274.0 MiB so far)
25/05/07 22:34:03 WARN BlockManager: Persisting block rdd_805_7 to disk instead.
25/05/07 22:34:03 WARN MemoryStore: Not enough space t

KeyboardInterrupt: 

[Stage 226:>                                                        (0 + 8) / 8]

In [None]:
# train_transformed.select("clean_text", "tfidf_features").write.mode("overwrite").parquet("./output/transformed_train")
# test_transformed.select("clean_text", "tfidf_features").write.mode("overwrite").parquet("./output/transformed_test")

                                                                                