In [1]:
import findspark
findspark.init()

In [2]:
import pyspark.sql.functions as f
from pyspark.sql.functions import col, lit
from pyspark.sql import SparkSession

import seaborn as sns

In [3]:
spark = SparkSession.builder.appName("reddit").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/03 01:13:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/05/03 01:13:50 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [4]:
spark

In [151]:
df = spark.read.parquet('s3://cn490-project/reddit/ym_partition=202106')

# Clean data

### Timestamps

In [152]:
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.types import IntegerType

df = df.withColumn("created_utc_ts", from_unixtime(col("created_utc"),"MM-dd-yyyy HH:mm:ss")) \
    .withColumn("author_created_utc_ts", from_unixtime(col("author_created_utc"),"MM-dd-yyyy HH:mm:ss")) \

In [153]:
# Get Date
df = df.withColumn("created_date", to_date(to_timestamp("created_utc_ts",'MM-dd-yyyy HH:mm:ss'))) \
        .withColumn("author_created_date", to_date(to_timestamp("author_created_utc_ts",'MM-dd-yyyy HH:mm:ss'))) \

In [154]:
# Get Time
df = df.withColumn("created_time", date_format(to_timestamp("created_utc_ts",'MM-dd-yyyy HH:mm:ss'), 'HH:mm:ss')) \
    .withColumn("author_created_time", date_format(to_timestamp("author_created_utc_ts",'MM-dd-yyyy HH:mm:ss'), 'HH:mm:ss')) \

In [155]:
# Get Hour
df = df.withColumn("created_hour", hour(to_timestamp("created_utc_ts",'MM-dd-yyyy HH:mm:ss'))) \
        .withColumn("author_created_hour", hour(to_timestamp("author_created_utc_ts",'MM-dd-yyyy HH:mm:ss')))

### Feature engineering

In [156]:
# Account age in days
df = df.withColumn("acc_age_days", (col("created_utc") - col("author_created_utc"))/86400)

# Log transform
df = df.withColumn("log_age", log(df.acc_age_days+1)).fillna(0,subset=['log_age'])

In [157]:
# Account created hour minus post created hour 
df = df.withColumn("diff_created", pow(col("created_hour")-col("author_created_hour"),2)).fillna(-1, subset=['diff_created'])

In [158]:
# Removed body
df = df.withColumn("rm_body", F.when(col("body")=="[removed]",True).otherwise(False))
# Deleted Body
df = df.withColumn("deleted_body", F.when(col("body")=="[deleted]",True).otherwise(False))

# Negative score
df = df.withColumn("neg_score", F.when(col("score") < 0, True).otherwise(False))

# Post got deleted
df = df.withColumn("deleted_post", F.when(col("author")=="[deleted]",True).otherwise(False))

# Comment collapsed because low score
df = df.withColumn("low_score", F.when(col("collapsed_reason").isNotNull(),True).otherwise(False))

# Has flair
df = df.withColumn("has_flair", F.when(col("author_flair_text").isNotNull(),True).otherwise(False))

# Is moderator
df = df.withColumn("is_mod", F.when(col("distinguished")=='moderator',True).otherwise(False))

In [159]:
# Day of week and posted on weekend
df = df.withColumn('day_of_week', date_format("created_date", "EEEE")) \
        .withColumn("posted_on_weekend", 
                F.when(
                    (date_format("created_date", "EEEE") =='Sunday')| \
                    (date_format("created_date", "EEEE") =='Saturday') \
                     ,1).otherwise(0)) 


In [160]:
df = df.fillna({'edited':False})

# Recast boolean to integer
df = df.withColumn('is_submitter', df.is_submitter.cast(IntegerType())) \
        .withColumn('deleted_post', df.deleted_post.cast(IntegerType())) \
        .withColumn('neg_score', df.neg_score.cast(IntegerType())) \
        .withColumn('has_flair', df.has_flair.cast(IntegerType())) \
        .withColumn('locked', df.locked.cast(IntegerType())) \
        .withColumn('can_gild', df.can_gild.cast(IntegerType())) \
        .withColumn('can_mod_post', df.can_mod_post.cast(IntegerType())) \
        .withColumn('edited', df.edited.cast(IntegerType())) \
        .withColumn('stickied', df.stickied.cast(IntegerType())) \
        .withColumn('gilded', df.gilded.cast(IntegerType())) \
        .withColumn('low_score', df.low_score.cast(IntegerType())) \
        .withColumn('send_replies', df.send_replies.cast(IntegerType())) \
        .withColumn('rm_body', df.rm_body.cast(IntegerType())) \
        .withColumn('deleted_body', df.deleted_body.cast(IntegerType())) 

In [161]:
# Number of comments by same author in thread
df = df.withColumn('user_comments_in_thread', F.count('id').over(Window.partitionBy('link_id', 'author')))

# Total score by same author in thread
df = df.withColumn('user_score_in_thread', F.sum('score').over(Window.partitionBy('link_id', 'author')))

# Number of total comments in thread
df = df.withColumn('thread_total_comments', F.count('id').over(Window.partitionBy('link_id')))

In [162]:
# Number of distinct users in thread

df = df.withColumn('users_in_thread', F.approx_count_distinct('author').over(Window.partitionBy('link_id')))

### Tokenize text columns

In [163]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

tokenizer = Tokenizer(inputCol="body", outputCol="tokenized_body")

countTokens = udf(lambda words: len(words), IntegerType())
df = tokenizer.transform(df)

In [164]:
df = df.withColumn("comment_length", countTokens(col("tokenized_body"))) 

In [165]:
# replace null with empty string
df = df.fillna({'author_flair_text':" "})

flair_tokenizer = Tokenizer(inputCol="author_flair_text", outputCol="tokenized_flair")
df = flair_tokenizer.transform(df)

In [166]:
df = df.withColumn("flair_length", size(col("tokenized_flair")))

# Model

## Classifier for deleted post

In [170]:
data = df.select('is_submitter',
                 'day_of_week',
                 'score','log_age',
                 'deleted_post','neg_score','diff_created',
                 'has_flair','comment_length','flair_length','is_mod',
                 'user_comments_in_thread','user_score_in_thread','users_in_thread','thread_total_comments',
                 'can_gild','can_mod_post','edited','stickied','locked','gilded',
                 'rm_body','deleted_body','send_replies'
                )

data.createOrReplaceTempView("data")

In [172]:
data.printSchema()

root
 |-- is_submitter: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- score: long (nullable = true)
 |-- log_age: double (nullable = false)
 |-- deleted_post: integer (nullable = false)
 |-- neg_score: integer (nullable = false)
 |-- diff_created: double (nullable = false)
 |-- has_flair: integer (nullable = false)
 |-- comment_length: integer (nullable = true)
 |-- flair_length: integer (nullable = false)
 |-- is_mod: boolean (nullable = false)
 |-- user_comments_in_thread: long (nullable = false)
 |-- user_score_in_thread: long (nullable = true)
 |-- users_in_thread: long (nullable = false)
 |-- thread_total_comments: long (nullable = false)
 |-- can_gild: integer (nullable = true)
 |-- can_mod_post: integer (nullable = true)
 |-- edited: integer (nullable = true)
 |-- stickied: integer (nullable = true)
 |-- locked: integer (nullable = true)
 |-- gilded: integer (nullable = true)
 |-- rm_body: integer (nullable = false)
 |-- deleted_body: integer (nullabl

In [173]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)
print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))

                                                                                

Number of training records: 260670




Number of testing records : 65305


                                                                                

In [174]:
from pyspark.ml.feature import OneHotEncoder,VectorAssembler, StringIndexer, IndexToString, MinMaxScaler, VectorIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml import Pipeline, Model
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [175]:
stringIndexer_day = StringIndexer(inputCol="day_of_week", outputCol="day_of_week_ix")
onehot_day = OneHotEncoder(inputCol="day_of_week_ix", outputCol="day_of_week_vec")


In [176]:
labelIndexer = StringIndexer(inputCol="is_submitter", outputCol="indexedLabel")

In [177]:
vectorAssembler_features =VectorAssembler(
    inputCols=['day_of_week_vec',
               'score','log_age',
               'deleted_post','neg_score','diff_created',
               'has_flair','comment_length','flair_length','is_mod',
               'user_comments_in_thread','user_score_in_thread','users_in_thread','thread_total_comments',
               'can_gild','can_mod_post','edited','stickied','locked','gilded',
               'rm_body','deleted_body','send_replies'],
               outputCol="features")

In [178]:
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="features", maxIter=20, seed=42)

In [179]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.fit(data).labels)

In [180]:
# Pipeline
pipeline_gbt = Pipeline(stages=[stringIndexer_day,
                               onehot_day,
                               labelIndexer,
                               vectorAssembler_features,
                               gbt, labelConverter])

In [181]:
# Tune the learning rate
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.stepSize, [0.001,0.01,0.1]) \
    .build() 


In [182]:
# Build the evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="weightedFMeasure", metricLabel=1.0, beta = 2.0)

In [183]:
# Cross validation with 5 folds
crossval = CrossValidator(estimator=pipeline_gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# Train model

In [184]:
# CV on train data
cvModel = crossval.fit(train_data)

22/05/03 02:40:30 WARN BlockManager: Asked to remove block broadcast_15131, which does not exist
                                                                                

### Predict on train data

In [185]:
prediction_train = cvModel.transform(train_data)

In [189]:
# Print out scores
print("F-2 score: ", evaluator.evaluate(prediction_train,  {evaluator.metricName: "weightedFMeasure"}), "\n")
print("Weighted Precision: ", evaluator.evaluate(prediction_train,  {evaluator.metricName: "weightedPrecision"}), "\n")
print("Weighted Recall: ", evaluator.evaluate(prediction_train,  {evaluator.metricName: "weightedRecall"}), "\n","\n","\n")


print("F-1 Score: ", evaluator.evaluate(prediction_train,  {evaluator.metricName: "f1"}), "\n")
print("Precision: ", evaluator.evaluate(prediction_train,  {evaluator.metricName: "precisionByLabel"}), "\n")
print("Recall: ",evaluator.evaluate(prediction_train,  {evaluator.metricName: "recallByLabel"}), "\n")

                                                                                

F-2 score:  0.9286666560590882 



                                                                                

Weighted Precision:  0.9263207547203822 



                                                                                

Weighted Recall:  0.9300057438253877 
 
 



                                                                                

F-1 Score:  0.9271786498585414 



                                                                                

Precision:  0.786638005755296 





Recall:  0.6517205721836044 



                                                                                

### Predict on test data

In [190]:
prediction = cvModel.transform(test_data)

In [191]:
# Print out scores
print("F-2 score: ", evaluator.evaluate(prediction,  {evaluator.metricName: "weightedFMeasure"}), "\n")
print("Weighted Precision: ", evaluator.evaluate(prediction,  {evaluator.metricName: "weightedPrecision"}), "\n")
print("Weighted Recall: ", evaluator.evaluate(prediction,  {evaluator.metricName: "weightedRecall"}), "\n","\n","\n")


print("F-1 Score: ", evaluator.evaluate(prediction,  {evaluator.metricName: "f1"}), "\n")
print("Precision: ", evaluator.evaluate(prediction,  {evaluator.metricName: "precisionByLabel"}), "\n")
print("Recall: ",evaluator.evaluate(prediction,  {evaluator.metricName: "recallByLabel"}), "\n")

                                                                                

F-2 score:  0.926080355158036 



                                                                                

Weighted Precision:  0.9235119050585092 



                                                                                

Weighted Recall:  0.9274971075973776 
 
 



                                                                                

F-1 Score:  0.9244899814995238 



                                                                                

Precision:  0.7751304102636402 





Recall:  0.6390793909101476 



                                                                                

In [192]:
evaluator.isLargerBetter()

True

### Feature Importance

In [193]:
# Print out the stages of the best model
cvModel.bestModel.stages

[StringIndexerModel: uid=StringIndexer_ab9d2d246f56, handleInvalid=error,
 OneHotEncoderModel: uid=OneHotEncoder_3018d2040cc3, dropLast=true, handleInvalid=error,
 StringIndexerModel: uid=StringIndexer_0b20d132e019, handleInvalid=error,
 VectorAssembler_bdc03cd2938d,
 GBTClassificationModel: uid = GBTClassifier_543f140c01a7, numTrees=20, numClasses=2, numFeatures=28,
 IndexToString_01ed38c8a743]

In [194]:
# get the vector assembler
va = cvModel.bestModel.stages[-3]
# get the tree
tree = cvModel.bestModel.stages[-2]

list(zip(va.getInputCols(), tree.featureImportances))

[('day_of_week_vec', 0.0),
 ('score', 0.00031662353954156017),
 ('log_age', 2.106649406499217e-05),
 ('deleted_post', 0.0),
 ('neg_score', 0.0),
 ('diff_created', 0.0),
 ('has_flair', 0.02561544625895525),
 ('comment_length', 0.10782892774386349),
 ('flair_length', 0.031015483417720535),
 ('is_mod', 0.0),
 ('user_comments_in_thread', 0.009897865269599607),
 ('user_score_in_thread', 0.0219226793591167),
 ('users_in_thread', 0.03074720662993639),
 ('thread_total_comments', 0.02092841363957716),
 ('can_gild', 0.0012807442970613416),
 ('can_mod_post', 0.31203774088523223),
 ('edited', 0.05566625350984668),
 ('stickied', 0.07808590541612836),
 ('locked', 0.30463564353935574),
 ('gilded', 0.0),
 ('rm_body', 0.0),
 ('deleted_body', 0.0),
 ('send_replies', 0.0)]

In [195]:
display(tree)

GBTClassificationModel: uid = GBTClassifier_543f140c01a7, numTrees=20, numClasses=2, numFeatures=28

# Save the best model

In [196]:
cvModel.bestModel.write().overwrite().save("s3://cn490-project/model_location/gbt_small_f2")

                                                                                

In [None]:
spark.stop()