# DS 5110 Group Project
Team: Alexandra Cathcart (adc6fs), Benjamin Feciura (bmf3bw), Jeremey Donovan (jdd5dw), Jordan Hiatt (jdh2e)

Original data: https://www.kaggle.com/reddit/reddit-comments-may-2015


## Includes & Spark Setup

In [1]:
import pandas as pd

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import *
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, lower, size, split, udf, when
from pyspark.sql.types import ArrayType, IntegerType,  StringType, StructType

#from pyspark import SparkContext
spark = SparkSession.builder.getOrCreate()
sc=spark.sparkContext

## Data Import and Pre-Processing

### Data Import

In [2]:
# Import the reddit data
full_path = '/project/ds5559/r-slash-group8/sample.csv'

df = spark.read.csv(full_path,  inferSchema=True, header = True)

In [3]:
# Import the Bad Word data
schema = StructType().add("badWord",StringType(),True)
dfBW=spark.read.format("csv").schema(schema).load('bad_words.csv')
#  dfBW.show(5)  # not showing since words are quite vulgar

# Also create in list format
listBW=list(dfBW.select('badWord').toPandas()['badWord']) 
# listBW


In [4]:
# Create a regex with all the bad words
# if there is an issue, try \\\\b instead; just \b probably has issues
listBW=list(map(lambda line: "\\b" + line + "\\b",listBW))
delim='|'
strBW=delim.join(listBW)



### Filtering

In [5]:
# Drop unneeded cols from dataframe
df=df.drop('_c0','created_utc','subreddit_id','link_id','name','score_hidden','author_flair_css_class', 'gilded', \
        'author_flair_text','id','archived','retrieved_on', 'edited','controversiality','parent_id','score')

# convert integer cols (ups, downs, and gilded) to integers
# Note: we could have done this by defining a schema before the csv read
df=df.withColumn("ups",df.ups.cast(IntegerType()))
df=df.withColumn("downs",df.downs.cast(IntegerType()))
#df=df.withColumn("gilded",df.gilded.cast(IntegerType()))  # Removed gilded since not used in this analysis

# Confirm new schema
df.printSchema()
df.show(5)

root
 |-- ups: integer (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- removal_reason: string (nullable = true)
 |-- downs: integer (nullable = true)
 |-- author: string (nullable = true)
 |-- body: string (nullable = true)
 |-- distinguished: string (nullable = true)

+----+---------+--------------+-----+------------+--------------------+-------------+
| ups|subreddit|removal_reason|downs|      author|                body|distinguished|
+----+---------+--------------+-----+------------+--------------------+-------------+
|   4|soccer_jp|            NA|    0|       rx109|                くそ|         null|
|null|     null|          null| null|        null|                null|         null|
|   0|     null|          null| null|        null|                null|         null|
|   4|      nba|            NA|    0|   WyaOfWade|gg this one's ove...|           NA|
|   0| politics|            NA|    0|Wicked_Truth|Are you really im...|           NA|
+----+---------+------------

In [6]:
# Count the number of rows before removing NA
df.count()
# There are 15,317,725 rows

15317725

In [7]:
# Remove rows where up, down, or body is null. We  do this since inference of these values is not applicable
df=df.filter(df['ups'].isNotNull())
df=df.filter(df['downs'].isNotNull())
df=df.filter(df['body'].isNotNull())

df.show(5)

+---+---------+--------------+-----+--------------+--------------------+-------------+
|ups|subreddit|removal_reason|downs|        author|                body|distinguished|
+---+---------+--------------+-----+--------------+--------------------+-------------+
|  4|soccer_jp|            NA|    0|         rx109|                くそ|         null|
|  4|      nba|            NA|    0|     WyaOfWade|gg this one's ove...|           NA|
|  0| politics|            NA|    0|  Wicked_Truth|Are you really im...|           NA|
|  3|AskReddit|            NA|    0|      jesse9o3|No one has a Euro...|           NA|
|  3|AskReddit|            NA|    0|beltfedshooter|"That the kid ""....|           NA|
+---+---------+--------------+-----+--------------+--------------------+-------------+
only showing top 5 rows



In [8]:
# Remove rows where the author was '[deleted]' 
df=df.filter(df['author']!='[deleted]')

# Remove author "0"
df=df.filter(df['author']!='0')


# Remove rows where the author was 'AutoModerator'
# see https://www.reddit.com/wiki/automoderator
df=df.filter(df['author']!='AutoModerator')

In [9]:
# Count the number of rows AFTER removing NA
df.count()
# There now 9,226,090 rows

9226090

### Binning & Feature Engineering

In [10]:
# Lowercase all body text
df=df.withColumn('body',lower(col('body')))

In [11]:
# Even though we dropped the column, adding score back into dataframe by computing it
df=df.withColumn('score',df['ups']-df['downs'])
df=df.withColumn("score",df.score.cast(IntegerType()))
df.show(5)

+---+---------+--------------+-----+--------------+--------------------+-------------+-----+
|ups|subreddit|removal_reason|downs|        author|                body|distinguished|score|
+---+---------+--------------+-----+--------------+--------------------+-------------+-----+
|  4|soccer_jp|            NA|    0|         rx109|                くそ|         null|    4|
|  4|      nba|            NA|    0|     WyaOfWade|gg this one's ove...|           NA|    4|
|  0| politics|            NA|    0|  Wicked_Truth|are you really im...|           NA|    0|
|  3|AskReddit|            NA|    0|      jesse9o3|no one has a euro...|           NA|    3|
|  3|AskReddit|            NA|    0|beltfedshooter|"that the kid ""....|           NA|    3|
+---+---------+--------------+-----+--------------+--------------------+-------------+-----+
only showing top 5 rows



In [12]:
# Determine a scoreSentiment as either postive, neutral, or negative.
# This will be our response variable

# Drop scoreSentiment if it already exists
df=df.drop('scoreSentiment')

# Set up bucketizer
splits = [-float("inf"), -0.1,0.1, float("inf")]
bkt = Bucketizer(splits=splits, inputCol="score", outputCol="scoreSentiment")

# Transform to add scoreSentiment: 0=negative; 1=neutral; 2=positive.
df=bkt.transform(df)

# !!! Cannot shift to -1,0,1 since LR must start with 0 !!!
# To make things more clear, shift to -1=negative; 0=neutral; 1=positive
#df=df.withColumn("scoreSentiment", \
#                 when(df['scoreSentiment']==0,-1) \
#                 .when(df['scoreSentiment']==1,0) \
#                 .otherwise(1)
#                ) 

df.show(2)



+---+---------+--------------+-----+---------+--------------------+-------------+-----+--------------+
|ups|subreddit|removal_reason|downs|   author|                body|distinguished|score|scoreSentiment|
+---+---------+--------------+-----+---------+--------------------+-------------+-----+--------------+
|  4|soccer_jp|            NA|    0|    rx109|                くそ|         null|    4|           2.0|
|  4|      nba|            NA|    0|WyaOfWade|gg this one's ove...|           NA|    4|           2.0|
+---+---------+--------------+-----+---------+--------------------+-------------+-----+--------------+
only showing top 2 rows



In [13]:
# Flag comments containing bad words
df=df.withColumn('bwFlag',col('body').rlike(strBW))

In [14]:
# Append bodyWordCount
df=df.withColumn("bodyWordCount", size(split(df['body'], ' ')))
#df.show(5)

In [15]:
# Though not the cleanest thing to do from a data sci perspective, we
# are going to drop the neutral sentiment rows so we can do binomial
# rather than multinomial regression; neutral currently "1"
df=df.filter(df['scoreSentiment']!=1)
# Shift positive from 2 to 1
df=df.withColumn("scoreSentiment", \
                 when(df['scoreSentiment']==2,1) \
                 .when(df['scoreSentiment']==0,0) \
                 .otherwise(-1)
                ) 
# we should never have the otherwise case!!!

In [16]:
# Cross-validator explicity wants response to be called "label"
# so copying scoreSentiment to label in all DFs
df=df.withColumn("label", df["scoreSentiment"])

## Data Splitting & Sampling

In [26]:
seed=314
trainDF,testDF=df.randomSplit([0.8,0.2],seed)

## EDA

In [32]:
# How many comments have bad words?
# Confirm the flagging worked by looking at how many comments contain bad words vs good
# NOTE: This has a rather long runtime!!!
df.groupby('bwFlag').agg({"bwFlag":"count"}).show()
#df.filter(df['bwFlag']==True).show(5,False)

+------+-------------+
|bwFlag|count(bwFlag)|
+------+-------------+
|  true|       392771|
| false|      8433257|
+------+-------------+



In [33]:
# How many authors are there?
df.select(countDistinct('author')).show()
# There are 1,216,598 authors

+----------------------+
|count(DISTINCT author)|
+----------------------+
|               1216598|
+----------------------+



In [20]:
# Show the top 10 authors with sum of ups and downs
df.groupby('author').agg({"author":"count","ups":"sum","downs":"sum","score":"sum"}).sort(col('count(author)').desc()).show(10)

+-------------------+----------+----------+-------------+--------+
|             author|sum(score)|sum(downs)|count(author)|sum(ups)|
+-------------------+----------+----------+-------------+--------+
|      TheNitromeFan|     10445|         0|         3997|   10445|
|        TweetPoster|      7090|         0|         3589|    7090|
|        autowikibot|      6420|         0|         3210|    6420|
|         PoliticBot|      3159|         0|         3142|    3159|
|TweetsInCommentsBot|      9965|         0|         3130|    9965|
|     atomicimploder|      7363|         0|         2616|    7363|
|       Removedpixel|      5333|         0|         2265|    5333|
|          TrollaBot|      2640|         0|         2247|    2640|
|          havoc_bot|      2120|         0|         2102|    2120|
|     MTGCardFetcher|      3089|         0|         2084|    3089|
+-------------------+----------+----------+-------------+--------+
only showing top 10 rows



Odd that the preceding authors have no down but this is correct

In [21]:
# Show authors with the lowest scores
df.groupby('author').agg({"score":"sum","ups":"sum","downs":"sum"}).sort(col('sum(score)').asc()).show(10)

+----------------+----------+----------+--------+
|          author|sum(score)|sum(downs)|sum(ups)|
+----------------+----------+----------+--------+
|    ItWillBeMine|     -6839|         0|   -6839|
|        blaghart|     -4233|         0|   -4233|
|       Shanondoa|     -3555|         0|   -3555|
|   bad_driverman|     -3053|         0|   -3053|
|      RSneedsEoC|     -2192|         0|   -2192|
|   b00gymonster1|     -2050|         0|   -2050|
|      frankenham|     -2024|         0|   -2024|
|   SaddharKadham|     -1485|         0|   -1485|
|letters_numbers-|     -1412|         0|   -1412|
|     djroomba322|     -1392|         0|   -1392|
+----------------+----------+----------+--------+
only showing top 10 rows



In [22]:
# Get a summary of score sentiment by label
#tmpDF.groupby('scoreSentiment').agg({"scoreSentiment":"count"}).show()
df.groupby('scoreSentiment').agg({"scoreSentiment":"count"}).show()

+--------------+---------------------+
|scoreSentiment|count(scoreSentiment)|
+--------------+---------------------+
|           0.0|               394008|
|           1.0|               400062|
|           2.0|              8432020|
+--------------+---------------------+



In [23]:
# Show graphical Distribution of sentiment (TBD)

## Model: Predict Sentiment from body

### Set up pipeline

In [17]:
# Create TF (Term Frequency) feature
tok = Tokenizer(inputCol="body", outputCol="words")
htf = HashingTF(inputCol="words", outputCol="tf")  # numFeatures will be a hyper-parameter  

#testing
tmpDF=tok.transform(df)
tmpDF=htf.transform(tmpDF)
tmpDF.select('words','tf').show(2)

+--------------------+--------------------+
|               words|                  tf|
+--------------------+--------------------+
|              [くそ]|(262144,[85691],[...|
|[gg, this, one's,...|(262144,[5674,905...|
+--------------------+--------------------+
only showing top 2 rows



In [18]:
# Create w2v (word to vec) feature

# the comment string needs to be turned into a vector for w2v to work
# unfortunately, VectorAssember does not work on string so we need a UDF

# Create UDF (note: split(anything,0) simply means don't split)
str_to_vec=spark.udf.register("str_to_vec",
                             lambda row:row.split("#",0),
                             ArrayType(StringType()))

# set up the tranformation
rva=SQLTransformer(statement="SELECT *, str_to_vec(body) bodyVec FROM __THIS__")

w2v = Word2Vec(inputCol='bodyVec', outputCol='w2v')  # not setting minCount 

# testing
tmpDF=rva.transform(df)
model=w2v.fit(tmpDF)
tmpDF=model.transform(tmpDF)
tmpDF.show(2)

+---+---------+--------------+-----+---------+--------------------+-------------+-----+--------------+------+-------------+-----+--------------------+--------------------+
|ups|subreddit|removal_reason|downs|   author|                body|distinguished|score|scoreSentiment|bwFlag|bodyWordCount|label|             bodyVec|                 w2v|
+---+---------+--------------+-----+---------+--------------------+-------------+-----+--------------+------+-------------+-----+--------------------+--------------------+
|  4|soccer_jp|            NA|    0|    rx109|                くそ|         null|    4|             1| false|            1|    1|              [くそ]|[0.0,0.0,0.0,0.0,...|
|  4|      nba|            NA|    0|WyaOfWade|gg this one's ove...|           NA|    4|             1| false|           12|    1|[gg this one's ov...|[0.0,0.0,0.0,0.0,...|
+---+---------+--------------+-----+---------+--------------------+-------------+-----+--------------+------+-------------+-----+---------------

In [19]:
# Assemble predictors
va=VectorAssembler(inputCols=['tf','w2v','bwFlag','bodyWordCount'],outputCol='features')

In [20]:
# Set up the regression model; regParam & elasticNetParam will be hyper-parameters
# CrossVal currently requires the labelCol to be precisely called 'label'
#lr = LogisticRegression(labelCol='scoreSentiment',maxIter=10)
lr = LogisticRegression(labelCol='label',maxIter=10)


In [21]:
# Build the pipeline
#pipeline=Pipeline(stages=[bkt,tok,htf,rva,w2v,va,lr])  # took out bkt since this is pre-EDA
pipeline=Pipeline(stages=[tok,htf,rva,w2v,va,lr])

### Set up hyperparameter tuning & Cross-Validation

In [22]:
# Set up the parameter grid

paramGrid = ParamGridBuilder() \
    .addGrid(htf.numFeatures, [100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()


# This paramGrid for testing
"""
paramGrid = ParamGridBuilder() \
    .addGrid(htf.numFeatures, [200]) \
    .addGrid(lr.regParam, [0.3]) \
    .addGrid(lr.elasticNetParam, [0.5]) \
    .build()
"""

'\nparamGrid = ParamGridBuilder()     .addGrid(htf.numFeatures, [200])     .addGrid(lr.regParam, [0.3])     .addGrid(lr.elasticNetParam, [0.5])     .build()\n'

In [23]:
# Too inspect paramGrid, uncomment next 4 lines
"""
print('-'*30)
#print('paramGrid', paramGrid, '\n')
#print('len(paramGrid): {}'.format(len(paramGrid)))
print('-'*30)
"""

"\nprint('-'*30)\n#print('paramGrid', paramGrid, '\n')\n#print('len(paramGrid): {}'.format(len(paramGrid)))\nprint('-'*30)\n"

In [24]:
# Treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# Using the pipeline as the estimator slows things down but is necessary if tuning featurziers.  If not, set the 
# model specification as the estimator with estimator=lr (I think; though not sure if that means lr needs to be removed from pipeline)
numFolds=5
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol='label'),
                          numFolds=numFolds)

### Train the Model

In [27]:
# Determine parallelism 
# This resource: see https://databricks.com/session/model-parallelism-in-spark-ml-cross-validation
# says that best practice is parallelism = (# cores)/(# partitions) but generally not more than 10
numPartitions=trainDF.rdd.getNumPartitions()
numCores=sc.defaultParallelism
parallelism=int(round(numCores/numPartitions,0))
# also see https://stackoverflow.com/questions/42171499/get-current-number-of-partitions-of-a-dataframe

# constrain to between 1 and 10
if parallelism<1:
    parallelism=1
elif parallelism > 10:
    parallelism=10

# -----------------------------------------------------------------------------
    
"""
# Another thing we can do is treat cores as fixed and repartition to get a target parallelism
# while avoiding memory issues that occur when != cores/partitions
# in the future: verify cores/partitions is correct; might want to do something to avoid having
# too few partitions
parallelism=2
targetNumPartitions=int(round(numCores/parallelism,0))
if (targetNumPartitions>=1):
    if (targetNumPartitions<numPartitions):
        trainDF = trainDF.coalesce(targetNumPartitions) # no shuffling but can only be used for decreasing numPartitions
    else: 
        trainDF = trainDF.repartition(targetNumPartitions)  # this involves shuffling to less efficient
""" 

# -----------------------------------------------------------------------------


# However, elsewhere, you typically see that partitions should be 2x to 4x the number of cores!
# So, we could just override (note: 4 yielded memory errors)
parallelism=2


In [28]:
# print out parallelism
parallelism

2

In [29]:
# Cache trainDF to speed up cross validation; we could use .select(colnames...) to use less memory
# Cache & persist failed with 32GB of memory
#trainDF=trainDF.cache()
#trainDF=trainDF.persist(StorageLevel.MEMORY_AND_DISK_ONLY)
#trainDF.count()  # call count to actually cache the data


In [None]:
# Run cross-validation, and choose the best set of parameters. Print the training time.
import time
t0 = time.time()
if parallelism==1:
    cvModel = crossval.fit(trainDF) # train models (no parallelism)
else:    
    cvModel = crossval.setParallelism(parallelism).fit(trainDF) # train models in parallel
print("train time:", time.time() - t0)
print('-'*30)
# Took 3580 secs (~1hr) to run single params set with 5 fold on 8 cores with 32 GB memmory & no parallelism & no cache/persist

In [None]:
# release the cache
#trainDF.unpersist()

In [None]:
# Save the model
cvModel.save("spark-log-reg-model")
pipeline.save("spark-log-reg-pipeline")

In [39]:
# Load the model and the pipeline  (should these be preceded by "val")
crossValidatorModel = CrossValidatorModel.load("spark-log-reg-model")
#sameModel = PipelineModel.load("/path-to-my-pipeline/spark-log-reg-transfer-pipeline")

# CODE ONLY OK ABOVE THIS POINT !!!!

In [30]:
# Fit the multinomial logistic regression model; this is old - before implemementing cross validation
mlrModel=pipeline.fit(trainDF)

In [24]:
# Training Summary
# source: https://spark.apache.org/docs/latest/ml-classification-regression.html

# Fix source: https://stackoverflow.com/questions/37278999/logistic-regression-with-spark-ml-data-frames
lrm=mlrModel.stages[-1]

# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrm.coefficientMatrix))
print("Intercept: " + str(lrm.interceptVector))

trainingSummary = lrm.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# for multiclass, we can inspect metrics on a per-label basis
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))


Coefficients: 
3 X 300 CSRMatrix

Intercept: [-1.0264586714522934,-1.0107704204551655,2.037229091907459]
objectiveHistory:
0.35298803317465105
False positive rate by label:
label 0: 0.0
label 1: 0.0
label 2: 1.0
True positive rate by label:
label 0: 0.0
label 1: 0.0
label 2: 1.0
Precision by label:
label 0: 0.0
label 1: 0.0
label 2: 0.9139359489937812
Recall by label:
label 0: 0.0
label 1: 0.0
label 2: 1.0
F-measure by label:
label 0: 0.0
label 1: 0.0
label 2: 0.9550329513109017
Accuracy: 0.9139359489937812
FPR: 0.9139359489937812
TPR: 0.9139359489937812
F-measure: 0.8728389466766606
Precision: 0.8352789188631633
Recall: 0.9139359489937812


In [25]:
# Make preductions on the test data
mlrPrediction=mlrModel.transform(testDF)

In [26]:
mlrPrediction.select('scoreSentiment','prediction').show(3)

+--------------+----------+
|scoreSentiment|prediction|
+--------------+----------+
|           0.0|       2.0|
|           0.0|       2.0|
|           0.0|       2.0|
+--------------+----------+
only showing top 3 rows



### TBD: Evaluate the predictions. Judging from the training though, it seems to over-predict category 2 "positive" --- which is the most prevalent

In [27]:
# Stuff with ngrams not currently used

#May need to drop col when rerunning
#df=df.drop('body2grams')
#df=df.drop('body3grams')

# Create 2grams
#ngram = NGram(n=2, inputCol="words", outputCol="body2grams")
#df = ngram.transform(df)

# Create 3grams
#ngram = NGram(n=3, inputCol="words", outputCol="body3grams")
#df = ngram.transform(df)

In [28]:
# NOT USED since scoreSentiment is multinomial response not predictor
# OneHotEncoding of Score_sentiment
# since it is already numeric, no need for StringIndexer
#encoder = OneHotEncoder(inputCol="score_sentiment", outputCol="scoreSentimentVec")
#model = encoder.fit(df)
#df = model.transform(df)

## Save notebook as PDF document

In [76]:
# Save notebook as PDF document
!jupyter nbconvert --to pdf `pwd`/*.ipynb

[NbConvertApp] Converting notebook /sfs/qumulo/qhome/jdd5dw/ds5110-project/Jeremey_code.ipynb to pdf
[NbConvertApp] Writing 57104 bytes to notebook.tex
[NbConvertApp] Building PDF
[NbConvertApp] Running xelatex 3 times: ['xelatex', 'notebook.tex', '-quiet']
[NbConvertApp] Running bibtex 1 time: ['bibtex', 'notebook']
[NbConvertApp] PDF successfully created
[NbConvertApp] Writing 68569 bytes to /sfs/qumulo/qhome/jdd5dw/ds5110-project/Jeremey_code.pdf
[NbConvertApp] Converting notebook /sfs/qumulo/qhome/jdd5dw/ds5110-project/test_file.ipynb to pdf
[NbConvertApp] Writing 26544 bytes to notebook.tex
[NbConvertApp] Building PDF
[NbConvertApp] Running xelatex 3 times: ['xelatex', 'notebook.tex', '-quiet']
[NbConvertApp] CRITICAL | xelatex failed: ['xelatex', 'notebook.tex', '-quiet']
This is XeTeX, Version 3.14159265-2.6-0.99999 (TeX Live 2019/dev/Debian) (preloaded format=xelatex)
 restricted \write18 enabled.
entering extended mode
(./notebook.tex
LaTeX2e <2018-12-01>
(/usr/share/texlive/t