In [1]:
### Importing required libraries 
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pandas as pd
import numpy as np
import pickle
import warnings
warnings.filterwarnings('ignore')

### Import Vader sentiment labeler 
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

### Spark ML 
from pyspark.ml import Pipeline, Model
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression, OneVsRest, NaiveBayes, MultilayerPerceptronClassifier, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import confusion_matrix
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel

spark = SparkSession.builder.appName("reddit-bigdata-project").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/13 23:41:06 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
23/04/13 23:41:13 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!


In [2]:
spark

In [11]:
### Reading in the data
df_sub_processed = spark.read.parquet("s3a://ppol567-llj40-bucket-4/worldnews/submissions_preprocessed")
df_com_processed = spark.read.parquet("s3a://ppol567-llj40-bucket-4/comments_preprocessed")

                                                                                

In [13]:
df_sub_processed.printSchema()
print(df_sub_processed.count())
df_com_processed.printSchema()
print(df_com_processed.count())

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- created_date: string (nullable = true)
 |-- date_clean: date (nullable = true)
 |-- Live_Thread: boolean (nullable = true)
 |-- War_Dummy: boolean (nullable = true)
 |-- finished_clean: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- string_form: string (nullable = false)



                                                                                

146327
root
 |-- id: string (nullable = true)
 |-- submission_id: string (nullable = true)
 |-- created_date: string (nullable = true)
 |-- date_clean: date (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- finished_clean: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- string_form: string (nullable = false)





16171595


                                                                                

In [None]:
### Creating a new column for the sentence to apply sentence embeddings and implement goal specific processing. 
df_sub_processed = df_sub_processed.withColumn("string_form", f.concat_ws(" ", col("finished_clean")))
df_com_processed = df_com_processed.withColumn("string_form", f.concat_ws(" ", col("finished_clean")))
df_com_processed.show(5)

[Stage 2:>                                                          (0 + 1) / 1]

+-------+-------------+------------+----------+--------------------+----------------+--------------------+--------------------+
|     id|submission_id|created_date|date_clean|                body|controversiality|      finished_clean|         string_form|
+-------+-------------+------------+----------+--------------------+----------------+--------------------+--------------------+
|hytxcbv|       t3pr7i|  02-28-2022|2022-02-28|Also calling the ...|               0|[call, people, ho...|call people hosta...|
|hytxccf|       t3jzfh|  02-28-2022|2022-02-28|Had to read this ...|               0|              [read]|                read|
|hytxcd8|       t3pr7i|  02-28-2022|2022-02-28|I think it is up ...|               0|[23rds, limit, av...|23rds limit avenu...|
|hytxcdm|       t139g6|  02-28-2022|2022-02-28|I see your answer...|               0|[answer, question...|answer question w...|
|hytxcdq|       t3gw0b|  02-28-2022|2022-02-28|How sweet that yo...|               0|[sweet, worry, of..

                                                                                

In [47]:
### Labelling only ~3.1 million comments 
df_sentiment_label = df_com_processed.sample(False, 0.2, seed = 9)
df_sentiment_label.count()

                                                                                

3233187

In [48]:
### Further cleaning of text 
### Removing comments from subreddit moderators 
df_sentiment_label = df_sentiment_label.withColumn("moderator", col("string_form").rlike("(?i)moderator"))
df_sentiment_label_clean = df_sentiment_label.filter(col("moderator") == False)

### Removing empty rows 
df_sentiment_label_clean = df_sentiment_label_clean.filter(f.col("string_form") != "")

In [49]:
df_sentiment_label_clean.count()

                                                                                

3177282

In [50]:
### Instantiating the Vader Sentiment Analyzer 
sa_model = SentimentIntensityAnalyzer()

### Registering the method 
udf_1 = f.udf(lambda z : sa_model.polarity_scores(z))
udf_2 = f.udf(lambda z : z["compound"])
udf_3 = f.udf(lambda z : z["pos"])
udf_4 = f.udf(lambda z : z["neg"])
udf_5 = f.udf(lambda z : z["neu"])

df_sentiment_label_clean = df_sentiment_label_clean.withColumn("Polarity_Scores", udf_1(f.col("string_form")))\
                                                   .withColumn("Compound_Score", udf_2(f.col("Polarity_Scores"))) \
                                                   .withColumn("Positive_Score", udf_3(f.col("Polarity_Scores"))) \
                                                   .withColumn("Negative_Score", udf_4(f.col("Polarity_Scores"))) \
                                                   .withColumn("Neutral_Score", udf_5(f.col("Polarity_Scores"))) \
                                                   .withColumn("Sentiment_Label", when(f.col("Compound_Score") < -0.05, "Negative") \
                                                                                  .when(f.col("Compound_Score") > 0.05, "Positive")  \
                                                                                  .otherwise("Neutral"))

In [52]:
df_sentiment_label_clean.select("id", "submission_id", "finished_clean", "string_form", \
                                "Polarity_Scores", "Compound_Score", "Positive_Score", \
                                "Negative_Score", "Neutral_Score", "Sentiment_Label").write.parquet("s3a://ppol567-llj40-bucket-4/worldnews/sentiment_labels")

23/04/10 15:33:44 WARN DAGScheduler: Broadcasting large task binary with size 1053.6 KiB
                                                                                

In [45]:
### Displaying the classes
df_sentiment_label_clean.groupby("Sentiment Label").count().show()



+---------------+-------+
|Sentiment Label|  count|
+---------------+-------+
|       Positive| 952184|
|        Neutral| 878283|
|       Negative|1346815|
+---------------+-------+



                                                                                

In [53]:
spark.stop()

### Supervised Sentiment Analysis

In [6]:
### Importing the data 
df_sentiment = spark.read.parquet("s3a://ppol567-llj40-bucket-4/worldnews/sentiment_labels")

In [4]:
print(df_sentiment.count())
df_sentiment.printSchema()



3177282
root
 |-- id: string (nullable = true)
 |-- submission_id: string (nullable = true)
 |-- finished_clean: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- string_form: string (nullable = true)
 |-- Polarity_Scores: string (nullable = true)
 |-- Compound_Score: string (nullable = true)
 |-- Positive_Score: string (nullable = true)
 |-- Negative_Score: string (nullable = true)
 |-- Neutral_Score: string (nullable = true)
 |-- Sentiment_Label: string (nullable = true)



                                                                                

In [10]:
### Splitting into training and testing data 
train_data, test_data = df_sentiment.randomSplit([0.8, 0.2], seed = 9)

### Applying TF-IDF to the string 
h_t = HashingTF(inputCol = "finished_clean", outputCol = "tf_features")
tf_train = h_t.transform(train_data)

### Inverse document frequency 
idf = IDF(inputCol = "tf_features", outputCol = "TF_IDF_features", minDocFreq = 1)
### Fitting only the train_data dataset 
idf_model = idf.fit(tf_train)


### Applying string indexer to the target array Sentiment_Label 
string_indexer_target = StringIndexer(inputCol = "Sentiment_Label", outputCol = "label")
string_idx_model = string_indexer_target.fit(train_data)

### Setting the feature matrix 
vectorAssembler_features = VectorAssembler(
    inputCols=["TF_IDF_features"], 
    outputCol= "features")

### Instantiating the model (Hyperparameter tuning of Logistic Regression necessary) 
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)
ovr = OneVsRest(classifier = lr, labelCol = "label", featuresCol = "features")

### Multinomial Naive Bayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol = "label", featuresCol = "features")

### Support Vector Machines 
l_svc = LinearSVC(maxIter=10, regParam=0.1)
ovr_svc = OneVsRest(classifier = l_svc, labelCol = "label", featuresCol = "features")

### Creating the labelConvertor transformer 
labelConverter = IndexToString(inputCol = "prediction", 
                               outputCol = "predicted_sentiment", 
                               labels= ["Negative", "Neutral", "Positive"])



                                                                                

In [14]:
### Defining the pipeline for Logistic Regression 
pipeline_lr = Pipeline(stages=[h_t, idf_model, 
                               string_indexer_target, 
                               vectorAssembler_features, 
                               ovr, labelConverter])

### Defining the pipeline for Naive Bayes 
pipeline_nb = Pipeline(stages=[h_t, idf_model, 
                               string_indexer_target, 
                               vectorAssembler_features, 
                               nb, labelConverter])

### Defining the pipeline for Linear Support Vector Machines 
pipeline_svc = Pipeline(stages=[h_t, idf_model, 
                                string_indexer_target, 
                                vectorAssembler_features, 
                                ovr_svc, labelConverter])


In [22]:
### Applying five cross validation for Logistic Regression 
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0, 0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator = pipeline_lr,
                          estimatorParamMaps = paramGrid_lr,
                          evaluator = MulticlassClassificationEvaluator(),
                          numFolds = 5) 

cvModel = crossval.fit(train_data)

### Getting the best model 
best_model = cvModel.bestModel
prediction_cv = best_model.transform(test_data)

### Getting the accuracy and printing it 
evaluator_cv = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "accuracy")
accuracy_lr = evaluator_cv.evaluate(prediction_cv)
evaluator_cv_pre = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "precisionByLabel")
precision_lr = evaluator_cv_pre.evaluate(prediction_cv)
evaluator_cv_rec = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "recallByLabel")
recall_lr = evaluator_cv_rec.evaluate(prediction_cv)
evaluator_cv_f1 = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "f1")
f1_lr = evaluator_cv_f1.evaluate(prediction_cv)


print("Accuracy = %g" % accuracy_lr)
print("Test Error = %g" % (1.0 - accuracy_lr))
print("Precision = %g" % precision_lr)
print("Recall = %g" % recall_lr)
print("F1 Score = %g" % f1_lr)

metrics_lr_dict = {"Accuracy" : [accuracy_lr],
                   "Precision" : [precision_lr],
                   "Recall" : [recall_lr], 
                   "F1 Score" : [f1_lr]}

metrics_lr_df = pd.DataFrame(metrics_lr_dict)
metrics_lr_df

23/04/13 02:07:39 WARN DAGScheduler: Broadcasting large task binary with size 26.5 MiB
23/04/13 02:08:01 WARN DAGScheduler: Broadcasting large task binary with size 26.5 MiB
23/04/13 02:08:21 WARN DAGScheduler: Broadcasting large task binary with size 26.5 MiB
23/04/13 02:08:40 WARN DAGScheduler: Broadcasting large task binary with size 26.5 MiB

Accuracy = 0.907908
Test Error = 0.0920925
Precision = 0.91925
Recall = 0.927843
F1 Score = 0.907904


                                                                                

Unnamed: 0,Accuracy,Precision,Recall,F1 Score
0,0.907908,0.91925,0.927843,0.907904


In [19]:
### Fitting and transforming the Naive Bayes Model 
nb_model = pipeline_nb.fit(train_data)

prediction_nb = nb_model.transform(test_data)

## Evaluating the metrics 
evaluator_nb = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "accuracy")
accuracy_nb = evaluator_nb.evaluate(prediction_nb)
evaluator_nb_pre = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "precisionByLabel")
precision_nb = evaluator_nb_pre.evaluate(prediction_nb)
evaluator_nb_rec = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "recallByLabel")
recall_nb = evaluator_nb_rec.evaluate(prediction_nb)
evaluator_nb_f1 = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "f1")
f1_nb = evaluator_nb_f1.evaluate(prediction_nb)

print("Accuracy = %g" % accuracy_nb)
print("Test Error = %g" % (1.0 - accuracy_nb))
print("Precision = %g" % precision_nb)
print("Recall = %g" % recall_nb)
print("F1 Score = %g" % f1_nb)

metrics_nb_dict = {"Accuracy" : [accuracy_nb],
                   "Precision" : [precision_nb],
                   "Recall" : [recall_nb], 
                   "F1 Score" : [f1_nb]}

metrics_nb_df = pd.DataFrame(metrics_nb_dict)
metrics_nb_df

Accuracy = 0.724425
Test Error = 0.275575
Precision = 0.741275
Recall = 0.797289
F1 Score = 0.723131


Unnamed: 0,Accuracy,Precision,Recall,F1 Score
0,0.724425,0.741275,0.797289,0.723131


In [38]:
### Applying five cross validation for Support Vector Machines 
paramGrid_svc = ParamGridBuilder() \
    .addGrid(l_svc.regParam, [1, 0.1, 0.01]) \
    .build()

crossval_svc = CrossValidator(estimator = pipeline_svc,
                          estimatorParamMaps = paramGrid_svc,
                          evaluator = MulticlassClassificationEvaluator(),
                          numFolds = 5)  

svc_model_cv = crossval_svc.fit(train_data)

best_model = svc_model_cv.bestModel
prediction_cv_svc = best_model.transform(test_data)

evaluator_cv_svc = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "accuracy")
accuracy_svc = evaluator_cv_svc.evaluate(prediction_cv_svc)
evaluator_svc_pre = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "precisionByLabel")
precision_svc = evaluator_svc_pre.evaluate(prediction_cv_svc)
evaluator_svc_rec = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "recallByLabel")
recall_svc = evaluator_svc_rec.evaluate(prediction_cv_svc)
evaluator_svc_f1 = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "f1")
f1_svc = evaluator_svc_f1.evaluate(prediction_cv_svc)


print("Accuracy = %g" % accuracy_svc)
print("Test Error = %g" % (1.0 - accuracy_svc))
print("Precision = %g" % precision_svc)
print("Recall = %g" % recall_svc)
print("F1 Score = %g" % f1_svc)

metrics_svc_dict = {"Accuracy" : [accuracy_svc],
                   "Precision" : [precision_svc],
                   "Recall" : [recall_svc], 
                   "F1 Score" : [f1_svc]}

metrics_svc_df = pd.DataFrame(metrics_svc_dict)
metrics_svc_df

Accuracy = 0.916524
Test Error = 0.0834762
Precision = 0.926347
Recall = 0.939276
F1 Score = 0.916418


Unnamed: 0,Accuracy,Precision,Recall,F1 Score
0,0.916524,0.926347,0.939276,0.916418


In [43]:
metrics = [metrics_lr_df, metrics_nb_df, metrics_svc_df]

metrics_df = pd.concat(metrics)
metrics_df.index = ["Logistic Regression Cross Validation", "Mulitnomial Naive Bayes", "SVM Cross Validation"]
metrics_df = metrics_df.round(2)
### Saving to csv 
metrics_df.to_csv("sentiment_metrics.csv")

In [39]:
### Saving the SVM cross-validation model 
svc_model_cv.save("s3a://ppol567-llj40-bucket-4/models/svc_model_cv")

23/04/13 05:03:20 WARN TaskSetManager: Stage 3801 contains a task of very large size (4187 KiB). The maximum recommended task size is 1000 KiB.
23/04/13 05:03:38 WARN TaskSetManager: Stage 3812 contains a task of very large size (4187 KiB). The maximum recommended task size is 1000 KiB.
23/04/13 05:03:51 WARN TaskSetManager: Stage 3823 contains a task of very large size (2097 KiB). The maximum recommended task size is 1000 KiB.
23/04/13 05:03:55 WARN TaskSetManager: Stage 3827 contains a task of very large size (2097 KiB). The maximum recommended task size is 1000 KiB.
23/04/13 05:03:59 WARN TaskSetManager: Stage 3831 contains a task of very large size (2097 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [44]:
### Loading in the saved model and assessing the accuracy 
svc_model_read = CrossValidatorModel.read().load("s3a://ppol567-llj40-bucket-4/models/svc_model_cv")
best_model_svc = svc_model_read.bestModel

      
prediction_cv_svc_read = svc_model_read.bestModel.transform(test_data)
evaluator_cv_read = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "accuracy")
accuracy = evaluator_cv_read.evaluate(prediction_cv_svc_read)

print("Accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

23/04/13 05:14:27 WARN DAGScheduler: Broadcasting large task binary with size 26.5 MiB

Accuracy = 0.916524
Test Error = 0.0834762


                                                                                

In [48]:
prediction_cv_svc_read.groupby("predicted_sentiment").count().show()

23/04/13 05:23:11 WARN DAGScheduler: Broadcasting large task binary with size 26.5 MiB
23/04/13 05:23:50 WARN DAGScheduler: Broadcasting large task binary with size 26.4 MiB
23/04/13 05:23:51 WARN DAGScheduler: Broadcasting large task binary with size 26.4 MiB

+-------------------+------+
|predicted_sentiment| count|
+-------------------+------+
|           Positive|175427|
|            Neutral|187431|
|           Negative|273144|
+-------------------+------+



                                                                                

In [49]:
prediction_cv_svc_read.select("id", "predicted_sentiment").show(5)

23/04/13 05:25:17 WARN DAGScheduler: Broadcasting large task binary with size 26.5 MiB
[Stage 3896:>                                                       (0 + 1) / 1]

+-------+-------------------+
|     id|predicted_sentiment|
+-------+-------------------+
|hsgr728|           Positive|
|hsgrg1t|            Neutral|
|hsgrhpz|           Negative|
|hsgrv8w|            Neutral|
|hsgs6gc|           Negative|
+-------+-------------------+
only showing top 5 rows



                                                                                

In [51]:
### Getting the anti-join of to get the unlabled comments
print(df_sentiment.count())
df_sentiment.printSchema()
print(df_com_processed.count())
df_com_processed.printSchema()


                                                                                

3177282
root
 |-- id: string (nullable = true)
 |-- submission_id: string (nullable = true)
 |-- finished_clean: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- string_form: string (nullable = true)
 |-- Polarity_Scores: string (nullable = true)
 |-- Compound_Score: string (nullable = true)
 |-- Positive_Score: string (nullable = true)
 |-- Negative_Score: string (nullable = true)
 |-- Neutral_Score: string (nullable = true)
 |-- Sentiment_Label: string (nullable = true)





16171595
root
 |-- id: string (nullable = true)
 |-- submission_id: string (nullable = true)
 |-- created_date: string (nullable = true)
 |-- date_clean: date (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- finished_clean: array (nullable = true)
 |    |-- element: string (containsNull = true)



                                                                                

In [52]:
df_sentiment_req = df_sentiment.select("id", "submission_id", "finished_clean")
df_com_req = df_com_processed.select("id", "submission_id", "finished_clean")
### Applying anti-join 
df_unlabel = df_com_req.join(df_sentiment_req, ((df_com_req.id == df_sentiment_req.id) & \
                             (df_com_req.submission_id == df_sentiment_req.submission_id) \
                             & (df_com_req.finished_clean == df_sentiment_req.finished_clean)), "leftanti")
df_unlabel.count()

                                                                                

12994313

In [55]:
### Generating sentiment labels for unlabeled comments 
df_unlabel = df_unlabel.withColumn("string_form", f.concat_ws(" ", f.col("finished_clean")))

### Further cleaning of text 
### Removing comments from subreddit moderators 
df_unlabel = df_unlabel.withColumn("moderator", f.col("string_form").rlike("(?i)moderator"))
df_unlabel_clean = df_unlabel.filter(f.col("moderator") == False)

### Removing empty rows 
df_unlabel_clean = df_unlabel_clean.filter(f.col("string_form") != "")

In [67]:
### The data is now ready to be labeled using our best performing model 
best_model_svc = svc_model_read.bestModel

### Avoiding the String-Indexer stage 
prediction_unlabel_clean = svc_model_read.bestModel.stages[0].transform(df_unlabel_clean) ## Term-Frequency
prediction_unlabel_clean = svc_model_read.bestModel.stages[1].transform(prediction_unlabel_clean) ## Inverse Document Frequency 
prediction_unlabel_clean = svc_model_read.bestModel.stages[3].transform(prediction_unlabel_clean) ## Vector Assembler
prediction_unlabel_clean = svc_model_read.bestModel.stages[4].transform(prediction_unlabel_clean) ## Model 
prediction_unlabel_clean = svc_model_read.bestModel.stages[5].transform(prediction_unlabel_clean) ## Label Convertor 

In [70]:
prediction_unlabel_clean.groupby("predicted_sentiment").count().show()

23/04/13 06:08:01 WARN DAGScheduler: Broadcasting large task binary with size 26.5 MiB
23/04/13 06:11:37 WARN DAGScheduler: Broadcasting large task binary with size 26.5 MiB
23/04/13 06:11:38 WARN DAGScheduler: Broadcasting large task binary with size 26.5 MiB

+-------------------+-------+
|predicted_sentiment|  count|
+-------------------+-------+
|           Positive|3499815|
|            Neutral|3743620|
|           Negative|5471710|
+-------------------+-------+



                                                                                

In [71]:
prediction_unlabel_clean.select("id", "submission_id", "finished_clean", "string_form", "predicted_sentiment").write.parquet("s3a://ppol567-llj40-bucket-4/worldnews/sentiment_model_labels")

23/04/13 06:14:53 WARN DAGScheduler: Broadcasting large task binary with size 26.7 MiB
                                                                                

In [8]:
### Reading in the predicted label data and combining it with the labeled dataset 
prediction_label_df = spark.read.parquet("s3a://ppol567-llj40-bucket-4/worldnews/sentiment_model_labels")
### Renaming predicted_sentiment column as Sentiment_Label
prediction_label_df = prediction_label_df.withColumnRenamed("predicted_sentiment", "Sentiment_Label")
prediction_label_df.printSchema()

print(prediction_label_df.count())

df_sentiment.printSchema()
print(df_sentiment.count())

final_sentiment_df = df_sentiment.select("id", "submission_id", "finished_clean", "string_form", "Sentiment_Label").union(prediction_label_df)

root
 |-- id: string (nullable = true)
 |-- submission_id: string (nullable = true)
 |-- finished_clean: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- string_form: string (nullable = true)
 |-- Sentiment_Label: string (nullable = true)



                                                                                

12715145
root
 |-- id: string (nullable = true)
 |-- submission_id: string (nullable = true)
 |-- finished_clean: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- string_form: string (nullable = true)
 |-- Polarity_Scores: string (nullable = true)
 |-- Compound_Score: string (nullable = true)
 |-- Positive_Score: string (nullable = true)
 |-- Negative_Score: string (nullable = true)
 |-- Neutral_Score: string (nullable = true)
 |-- Sentiment_Label: string (nullable = true)

3177282


                                                                                

In [9]:
print(final_sentiment_df.count())
final_sentiment_df.printSchema()
final_sentiment_df.show(5)

                                                                                

15892427
root
 |-- id: string (nullable = true)
 |-- submission_id: string (nullable = true)
 |-- finished_clean: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- string_form: string (nullable = true)
 |-- Sentiment_Label: string (nullable = true)



[Stage 24:>                                                         (0 + 1) / 1]

+-------+-------------+--------------------+--------------------+---------------+
|     id|submission_id|      finished_clean|         string_form|Sentiment_Label|
+-------+-------------+--------------------+--------------------+---------------+
|iuzifkc|       ylmqn0|[gtan, oppressive...|gtan oppressive p...|       Positive|
|iuzifu3|       yljxj2|[disagree, argument]|   disagree argument|       Negative|
|iuzigtq|       ylmqn0|[gtwe, forgive, r...|gtwe forgive russ...|       Positive|
|iuzik4u|       yljxj2|[645, year, calcu...|645 year calculat...|        Neutral|
|iuzikej|       ykrn8h|           [urukhai]|             urukhai|        Neutral|
+-------+-------------+--------------------+--------------------+---------------+
only showing top 5 rows



                                                                                

In [15]:
### Saving final_sentiment_df to S3
final_sentiment_df.write.parquet("s3a://ppol567-llj40-bucket-4/worldnews/final_complete_sentiment_labels")

                                                                                

In [10]:
sentiment_count = final_sentiment_df.groupby("Sentiment_Label")\
                                    .agg(f.count("Sentiment_Label").alias("Number of Comments")) \
                                    .orderBy(f.col("Number of Comments").desc()).toPandas()

                                                                                

In [32]:
total = sentiment_count["Number of Comments"].sum()
sentiment_count["Percentage of Comments"] = (sentiment_count["Number of Comments"]/total * 100).map("{:,.2f}%".format)
sentiment_count

Unnamed: 0,Sentiment_Label,Number of Comments,Percentage of Comments
0,Negative,6818525,42.90%
1,Neutral,4621903,29.08%
2,Positive,4451999,28.01%


In [33]:
sentiment_count.to_csv("sentiment_labels_count.csv", index = False)

In [17]:
### Only accessing live thread submissions and comments 
df_live_sub = df_sub_processed.filter(f.col("Live_Thread") == 1)

### Getting the live thread ids 
df_sub_id = df_live_sub.select("id").collect()
df_sub_live_id = [row.id for row in df_sub_id]

### Extracting the live thread comments 
df_live_com = final_sentiment_df.filter(f.col("submission_id").isin(df_sub_live_id))
total = df_live_com.count()



                                                                                

In [19]:
sentiment_count_live_thread = df_live_com.groupby("Sentiment_Label")\
                                         .agg(f.count("Sentiment_Label").alias("Number of Comments"),
                                              (f.count("Sentiment_Label")/total*100).alias("Percentage of Comments")) \
                                         .orderBy(f.col("Number of Comments").desc()).toPandas()

                                                                                

In [20]:
sentiment_count_live_thread["Percentage of Comments"] = sentiment_count_live_thread["Percentage of Comments"].map("{:,.2f}%".format)
sentiment_count_live_thread

Unnamed: 0,Sentiment_Label,Number of Comments,Percentage of Comments
0,Negative,749444,41.91%
1,Positive,528800,29.57%
2,Neutral,509961,28.52%


In [23]:
sentiment_count_live_thread.to_csv("sentiment_labels_live_threads.csv", index = False)

In [24]:
spark.stop()