# Massive Data Final Project

## Author: Douglas Hummel-Price | Created: 04.29.20

## Goal: analyze reddit comments: See write-up for more complete information

### Initial Setup

In [1]:
## Import necessary libraries
from pyspark.sql.types import *

## Start Spark Session
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext().getOrCreate()
spark = SparkSession.builder.appName("project").getOrCreate()

import json
from pyspark.sql import Row
import pyspark.sql.functions as F

## Import necessary libraries for modeling
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, Model
from pyspark.sql import types

In [2]:
## Verify that Spark initiated
spark

In [3]:
## This prevents some errors that I had been encountering related to AWS limits.

hc = sc._jsc.hadoopConfiguration()

## setting the config is the same for both languages
hc.set("fs.sa.impl", "org.apache.hadoop.fs.sa.S3FileSystem")
hc.setInt("fs.sa.connection.maximum", 10000)

### Read in sample reddit LZO data files

In [4]:
## read in file
json_lzo = sc.newAPIHadoopFile("s3://bigdatateaching/reddit/lzo/RC_*.lzo",
                              "com.hadoop.mapreduce.LzoTextInputFormat", 
                              "org.apache.hadoop.io.LongWritable",
                              "org.apache.hadoop.io.Text")

In [5]:
## Get schema for sample data
data = spark.read.json("s3://bigdatateaching/reddit/sample-data/1m-line-sample.json")
schema = data.schema.json()

new_schema = StructType.fromJson(json.loads(schema))
new_schema

StructType(List(StructField(archived,BooleanType,true),StructField(author,StringType,true),StructField(author_cakeday,BooleanType,true),StructField(author_created_utc,LongType,true),StructField(author_flair_background_color,StringType,true),StructField(author_flair_css_class,StringType,true),StructField(author_flair_richtext,ArrayType(StructType(List(StructField(a,StringType,true),StructField(e,StringType,true),StructField(t,StringType,true),StructField(u,StringType,true))),true),true),StructField(author_flair_template_id,StringType,true),StructField(author_flair_text,StringType,true),StructField(author_flair_text_color,StringType,true),StructField(author_flair_type,StringType,true),StructField(author_fullname,StringType,true),StructField(author_patreon_flair,BooleanType,true),StructField(body,StringType,true),StructField(can_gild,BooleanType,true),StructField(can_mod_post,BooleanType,true),StructField(collapsed,BooleanType,true),StructField(collapsed_reason,StringType,true),StructFiel

In [6]:
data.printSchema()

root
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_created_utc: long (nullable = true)
 |-- author_flair_background_color: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_richtext: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- e: string (nullable = true)
 |    |    |-- t: string (nullable = true)
 |    |    |-- u: string (nullable = true)
 |-- author_flair_template_id: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- author_flair_text_color: string (nullable = true)
 |-- author_flair_type: string (nullable = true)
 |-- author_fullname: string (nullable = true)
 |-- author_patreon_flair: boolean (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- can_mod_post: boolean (nullable = true)
 |-

### Run this when ready for full data

In [7]:
data = json_lzo.map(lambda x:Row(x[1]))\
.toDF(['raw'])\
.select(F.from_json('raw',new_schema).alias('json')).select('json.*')

### Count of NAs on full original dataset

In [None]:
#Checking for missing values 
data.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in data.columns]).show()

|                      Variable |                    Number NA |
|------------------------------:|-----------------------------:|
|                      archived |                    129352624 |
|                        author |                            0 |
|                author_cakeday |                    474710486 |
|            author_created_utc |                     42738322 |
| author_flair_background_color |                    323529952 |
|        author_flair_css_class |                    371349643 |
|         author_flair_richtext |                     42737313 |
|      author_flair_template_id |                    414663455 |
|             author_flair_text |                    368524948 |
|       author_flair_text_color |                    323529952 |
|             author_flair_type |                     42737313 |
|               author_fullname |                     42737313 |
|          author_patreon_flair |                     49013039 |
|                          body |                            0 |
|                      can_gild |                            0 |
|                  can_mod_post |                            0 |
|                     collapsed |                            0 |
|              collapsed_reason |                    469533816 |
|              controversiality |                            0 |
|                   created_utc |                            0 |
|                 distinguished |                    468689710 |
|                        edited |                            0 |
|                        gilded |                            0 |
|                      gildings |                            0 |
|                            id |                            0 |
|                  is_submitter |                            0 |
|                       link_id |                            0 |
|                     no_follow |                            0 |
|                     parent_id |                            0 |
|                     permalink |                            0 |
|                removal_reason |                    476258225 |
|                  retrieved_on |                            0 |
|                         score |                            0 |
|                  send_replies |                            0 |
|                      stickied |                            0 |
|                     subreddit |                            0 |
|                  subreddit_id |                            0 |
|       subreddit_name_prefixed |                            0 |
|                subreddit_type |                            0 |

### Drop columns that won't be used

In [8]:
droplist = ["archived", "author_cakeday", "author_created_utc", "author_flair_background_color", "author_flair_css_class", "author_flair_richtext", 
            "author_flair_template_id", "author_flair_text", "author_flair_text_color", "author_flair_type", "author_fullname", 
            "author_patreon_flair", "collapsed_reason", "distinguished","removal_reason","author", "body", "retrieved_on", "gildings",
            "id","link_id", "parent_id", "permalink", "subreddit", "subreddit_id", "subreddit_name_prefixed","created_utc","can_mod_post"]
for col in droplist:
    data = data.drop(col)
data.printSchema()

root
 |-- can_gild: boolean (nullable = true)
 |-- collapsed: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- no_follow: boolean (nullable = true)
 |-- score: long (nullable = true)
 |-- send_replies: boolean (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit_type: string (nullable = true)



### Convert a few columns
The variable "edited" shows false if the post has not been edited. Otherwise, it shows the date of the last edit in UTC format, so we need to convert it to 0 if "false" else 1. 

In [9]:
## Create UDF to do this
def makebool(value):
    if value == "false":
        return 0
    else:
        return 1
    
udfedit = F.udf(makebool,IntegerType())

In [10]:
## Convert each boolean to 0/1
data = data.withColumn("edited", udfedit("edited"))

ints = ["can_gild","collapsed","is_submitter","no_follow","send_replies","stickied"]
for x in ints:
    data = data.withColumn(x,data[x].cast(types.IntegerType()))

## Make a column with the absolute value of the score and the logged absolute value of the score

data = data.withColumn('abs_score',F.abs(data.score))
data = data.withColumn('logged_score',F.log(data.abs_score))

## Convert nulls to 0  produced by taking the log of 0 
data = data.fillna(0)

data.printSchema() ## Use this to verify

root
 |-- can_gild: integer (nullable = true)
 |-- collapsed: integer (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- edited: integer (nullable = true)
 |-- gilded: long (nullable = true)
 |-- is_submitter: integer (nullable = true)
 |-- no_follow: integer (nullable = true)
 |-- score: long (nullable = true)
 |-- send_replies: integer (nullable = true)
 |-- stickied: integer (nullable = true)
 |-- subreddit_type: string (nullable = true)
 |-- abs_score: long (nullable = true)
 |-- logged_score: double (nullable = false)



In [11]:
## cache and count the data
data.cache()
data.count()

476259744

In [12]:
#Checking for missing values 
data.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in data.columns]).show()

+--------+---------+----------------+------+------+------------+---------+-----+------------+--------+--------------+---------+------------+
|can_gild|collapsed|controversiality|edited|gilded|is_submitter|no_follow|score|send_replies|stickied|subreddit_type|abs_score|logged_score|
+--------+---------+----------------+------+------+------------+---------+-----+------------+--------+--------------+---------+------------+
|       0|        0|               0|     0|     0|           0|        0|    0|           0|       0|             0|        0|           0|
+--------+---------+----------------+------+------+------------+---------+-----+------------+--------+--------------+---------+------------+



## Descriptive statistics for revised dataset

In [13]:
data.describe().show()

+-------+-------------------+-------------------+--------------------+--------------------+--------------------+-------------------+------------------+------------------+-------------------+--------------------+---------------+-----------------+------------------+
|summary|           can_gild|          collapsed|    controversiality|              edited|              gilded|       is_submitter|         no_follow|             score|       send_replies|            stickied| subreddit_type|        abs_score|      logged_score|
+-------+-------------------+-------------------+--------------------+--------------------+--------------------+-------------------+------------------+------------------+-------------------+--------------------+---------------+-----------------+------------------+
|  count|          476259744|          476259744|           476259744|           476259744|           476259744|          476259744|         476259744|         476259744|          476259744|           4762

In [14]:
import pandas as pd
descpretty = pd.read_csv("descriptivetableraw.txt",sep="|")
descpretty.drop(["Unnamed: 0","Unnamed: 15"],axis=1,inplace=True)
descpretty

Unnamed: 0,summary,can_gild,collapsed,controversiality,edited,gilded,is_submitter,no_follow,score,send_replies,stickied,subreddit_type,abs_score,logged_score
0,count,476259700.0,476259700.0,476259700.0,476259700.0,476259744,476259700.0,476259700.0,476259700.0,476259700.0,476259700.0,476259744,476259700.0,476259700.0
1,mean,0.984118,0.05190769,0.02198926,0.02773437,2.260384199089478...,0.08922097,0.7766563,9.162615,0.9861357,0.005540876,,9.713969,0.9188468
2,stddev,0.125019,0.2218407,0.1466483,0.1642108,0.017837149968014967,0.2850624,0.4164869,137.1076,0.1169276,0.07423055,,137.0697,1.138322
3,min,0.0,0.0,0.0,0.0,0,0.0,0.0,-22280.0,0.0,0.0,gold_restricted,0.0,0.0
4,max,1.0,1.0,1.0,1.0,66,1.0,1.0,90192.0,1.0,1.0,user,90192.0,11.4097


In [15]:
descpretty.to_csv("Pretty_Descriptives.csv",index=False)

In [17]:
## Print out counts for the simple variables
columns = ['controversiality', 'subreddit_type', 'edited', 'can_gild', 'collapsed', 
           'is_submitter',  'no_follow', 'send_replies', 'stickied']
for col in columns:
    print(f'The counts for the variable {col}:')
    data.groupby(col).count().show()
    data.groupby(col).count().toPandas().to_csv("Binary_"+ col+'_distribution.csv',index=False)

The counts for the variable controversiality:
+----------------+---------+
|controversiality|    count|
+----------------+---------+
|               0|465787145|
|               1| 10472599|
+----------------+---------+

The counts for the variable subreddit_type:
+---------------+---------+
| subreddit_type|    count|
+---------------+---------+
|gold_restricted|        3|
|           user|   721502|
|         public|463303117|
|     restricted| 12235122|
+---------------+---------+

The counts for the variable edited:
+------+---------+
|edited|    count|
+------+---------+
|     1| 13208764|
|     0|463050980|
+------+---------+

The counts for the variable can_gild:
+--------+---------+
|can_gild|    count|
+--------+---------+
|       1|468695795|
|       0|  7563949|
+--------+---------+

The counts for the variable collapsed:
+---------+---------+
|collapsed|    count|
+---------+---------+
|        1| 24721545|
|        0|451538199|
+---------+---------+

The counts for the var

In [19]:
distros = ['gilded','score','abs_score']
for col in distros:
    data.groupby(col).count().toPandas().to_csv("Cont_" + col+ '_distribution.csv',index=False)
  


In [20]:
controversial = data.filter(data['controversiality'] == 1)
controdistros = ['score','abs_score']     
for col in controdistros:
    controversial.groupby(col).count().toPandas().to_csv("Cont_" + col+'_contro_distribution.csv',index=False)

## Now we can build our pipeline

In [21]:
# df = data.limit(1000)
# df = data.sample(False,.0001)
df = data

In [22]:
splitted_data = df.randomSplit([0.80, 0.20], 24)
train_data = splitted_data[0]
test_data = splitted_data[1]

print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))

Number of training records: 381016616
Number of testing records : 95243128


In [27]:
target = "edited"
features_score = ["can_gild", "collapsed", "is_submitter","send_replies","stickied", "no_follow",
                "controversiality", "gilded","score","subreddit_type_O"]
features_abs_score = ["can_gild", "collapsed", "is_submitter","send_replies","stickied", "no_follow",
                "controversiality", "gilded","abs_score","subreddit_type_O"]
features_logged_score = ["can_gild", "collapsed", "is_submitter","send_replies","stickied", "no_follow",
                "controversiality", "gilded","logged_score","subreddit_type_O"]
justscore = ["score"]
justabs = ["abs_score"]
just_logged_score = ["logged_score"]
justcontrols = ["can_gild", "collapsed", "is_submitter","send_replies","stickied", "no_follow",
                "controversiality", "gilded","subreddit_type_O"]
featurelist = [features_score,features_abs_score,features_logged_score,justscore,justabs,just_logged_score,justcontrols]
featurelistname = ["features_score","features_abs_score","features_logged_score","justscore","justabs","just_logged_score","justcontrols"]

# these objects are used to test the pipeline loop
# featurelist = [justscore,justabs]
# featurelistname = ["justscore","abs_score"]

In [28]:
## Convert string columns to numbered categories for the logit
stringIndexer_sub = StringIndexer(inputCol="subreddit_type", outputCol="subreddit_typeI").setHandleInvalid("skip")

## Convert index categories to dummies with 
encoder = OneHotEncoderEstimator(inputCols=["subreddit_typeI"],
                                 outputCols=["subreddit_type_O"])
models = []

## Put together our features 
for features,featname in zip(featurelist,featurelistname):
    
    print(f'Currently modelling "{featname}" which contains the following variables:\n{sorted(features)}')
          
    vectorAssembler_features = VectorAssembler( 
        inputCols= features,
        outputCol="features")

    ## Create the Logit object
    lg = LogisticRegression(labelCol=target, featuresCol="features")

    ## Create Pipeline
    pipeline_lr = Pipeline(stages = [stringIndexer_sub, encoder,
                           vectorAssembler_features, lg])
    ## Run the pipeline
    model_lr = pipeline_lr.fit(train_data)
    
    ## Create predictions on the test data, evaluate them using the evaluator. 
    predictions = model_lr.transform(test_data)
    evaluator = BinaryClassificationEvaluator(labelCol=target, rawPredictionCol="rawPrediction")
    evaluatorACC = MulticlassClassificationEvaluator(labelCol=target, predictionCol="prediction", metricName="accuracy")
    
    AUC = evaluator.evaluate(predictions)
    accuracy = evaluatorACC.evaluate(predictions)
    
    ## Save model information to list
    model = (featname,features,AUC,accuracy)
    models.append(model)
    
    print(f'\nThe AUC for model "{featname}" is: \n{AUC}\n')
    print("Accuracy = %g" % accuracy)
    print("Test Error = %g" % (1.0 - accuracy))
    print("\n")
    
    #Save model itself
    model_lr.save("s3://dhp-massive-data/Project/Final_models/"+featname)

Currently modelling "features_score" which contains the following variables:
['can_gild', 'collapsed', 'controversiality', 'gilded', 'is_submitter', 'no_follow', 'score', 'send_replies', 'stickied', 'subreddit_type_O']

The AUC for model "features_score" is: 
0.6195354460002964

Accuracy = 0.972258
Test Error = 0.0277417
Currently modelling "features_abs_score" which contains the following variables:
['abs_score', 'can_gild', 'collapsed', 'controversiality', 'gilded', 'is_submitter', 'no_follow', 'send_replies', 'stickied', 'subreddit_type_O']

The AUC for model "features_abs_score" is: 
0.6225319834913726

Accuracy = 0.972258
Test Error = 0.0277417
Currently modelling "features_logged_score" which contains the following variables:
['can_gild', 'collapsed', 'controversiality', 'gilded', 'is_submitter', 'logged_score', 'no_follow', 'send_replies', 'stickied', 'subreddit_type_O']

The AUC for model "features_logged_score" is: 
0.6265093401784506

Accuracy = 0.972261
Test Error = 0.027739

In [29]:
pd.DataFrame(models,columns=["model_name","variables","AUC","Accuracy"])

Unnamed: 0,model_name,variables,AUC,Accuracy
0,features_score,"[can_gild, collapsed, is_submitter, send_repli...",0.619535,0.972258
1,features_abs_score,"[can_gild, collapsed, is_submitter, send_repli...",0.622532,0.972258
2,features_logged_score,"[can_gild, collapsed, is_submitter, send_repli...",0.626509,0.972261
3,justscore,[score],0.533891,0.972257
4,justabs,[abs_score],0.565673,0.972256
5,just_logged_score,[logged_score],0.569289,0.972263
6,justcontrols,"[can_gild, collapsed, is_submitter, send_repli...",0.613041,0.97226


In [30]:
pd.DataFrame(models,columns=["model_name","variables","AUC","Accuracy"]).to_csv("AUC_and_Accuracy.csv",index=False)

In [31]:
spark.stop()