In [1]:
# Configuration: DBR 7.0 ML and spark 3.0.0
# =============================

In [2]:
# Installing Libraries
# ====================

!pip install textblob

In [3]:
# Importing Necessary Libraries
# =============================

import pyspark
from pyspark.sql.functions import col
from pyspark.sql import functions as F, SparkSession
from pyspark.sql.types import FloatType

In [4]:
# Starting a Spark Session
# ========================

spark = SparkSession.builder.getOrCreate()

In [5]:
# Loading DataFrame from Pre-saved Parquet File
# =============================================

# Importing libraries
from pyspark.sql.types import *

# Defining the schema
schema = StructType([\
                      StructField("author",StringType(),True),\
                      StructField("author_cakeday",BooleanType(),False),\
                      StructField("created_utc_day",IntegerType(),True),\
                      StructField("created_utc_hr",IntegerType(),True),\
                      StructField("brand_safe",BooleanType(),True),\
                      StructField("can_gild",BooleanType(),True),\
                      StructField("domain",StringType(),True),\
                      StructField("is_crosspostable",BooleanType(),True),\
                      StructField("no_follow",BooleanType(),True),\
                      StructField("num_comments",LongType(),True),\
                      StructField("log_num_comments",DoubleType(),True),\
                      StructField("over_18",BooleanType(),True),\
                      StructField("subreddit_id",StringType(),True),\
                      StructField("commentsUrl",StringType(),True),\
                      StructField("whitelist_status",StringType(),False),\
                      StructField("suggested_sort",StringType(),False),\
                      StructField("titleClean",StringType(),True),\
                      StructField("commentsClean",StringType(),False),\
                      StructField("score",LongType(),True),\
                      StructField("log_score",DoubleType(),True),
                      StructField("test",BooleanType(),False)\
                    ])

# Reading the file into a dataframe
# 2 filepaths are stated below 
# - one for training data and initial test set
# - one for training data and the OOT test set

# Filepath for training tata and initial test set
filePath = "dbfs:/FileStore/df/train_test_data.parquet"

# Filepath for training tata and final OOT test set
filePathOOT = "dbfs:/FileStore/df/train_OOT_data.parquet"

# Choose the correct filepath from above to read into the dataframe
clean_df = spark.read.format("parquet").option("header", "true").schema(schema).load(filePath)

# Drop any row having any column as "null"
clean_df = clean_df.na.drop(how="any")

# Separating training dataframe
train_df = clean_df.where(col("test")==False)

In [6]:
# Statistical summary of number of comments and scores for training data set
# Log transform applied to num_comments and score to reduce skewness of data
# ==========================================================================

train_df.select("num_comments",\
                "log_num_comments",\
                "score",\
                "log_score").describe().show()
train_df.agg(F.skewness("num_comments"),\
             F.skewness("log_num_comments"),\
             F.skewness("score"),\
             F.skewness("log_score")).show()

In [7]:
# Undersampling the majority data based on score 
# because a vast majority of the training data has a score of 0
# =============================================================

# Defining a function to undersample
def removeMajority(df, targetcol="scoreOutlier", targetval=0.0, removal=0.9):
  sample_df = df.where((col(targetcol)==targetval) & (col("test")==False))
  sample_df2 = df.where((col(targetcol)!=targetval) & (col("test")==False))
  sample_df = sample_df.sampleBy(col=targetcol, fractions = {targetval: 1-removal}, seed = 69)
  sample_df2 = sample_df2.union(sample_df)
  sample_df3 = df.where(col("test")==True)
  sample_df3 = sample_df3.union(sample_df2)
  return sample_df3

# Create a new training dataframe with majority undersampled
train_df2 = removeMajority(df = train_df, targetcol = "score", targetval = 0, removal = 0.75)

# Printing the change in number of training datapoints due to undersampling
print("No. of training datapoints before undersampling = {}".format(train_df.agg(F.count("test")).first()[0]))
print("No. of training datapoints after undersampling = {}".format(train_df2.agg(F.count("test")).first()[0]))

# Printing the change in skewness of data as a result of undersampling
print("Skewness of log_score data before undersampling = {}".format(train_df.agg(F.skewness("log_score")).first()[0]))
print("Skewness of log_score data after undersampling = {}".format(train_df2.agg(F.skewness("log_score")).first()[0]))
print("Skewness of log_num_comments data before undersampling = {}".format(train_df.agg(F.skewness("log_num_comments")).first()[0]))
print("Skewness of log_num_comments data after undersampling = {}".format(train_df2.agg(F.skewness("log_num_comments")).first()[0]))

In [8]:
# Display Training Data Before Undersampling for Plots
# ====================================================

display(train_df)

In [9]:
# Display Training Data After Undersampling for Plots
# ===================================================

display(train_df2)

In [10]:
# Removing Outliers Using Z-Score
# We are using the log_num_comments and log_score features to determine outliers
# Datapoints in which both these feature values fall outside the mean +/- 3*std_dev range are discarded
# =====================================================================================================

# Computing quartiles and z-scores for the 2 chosen numerical data fields - log_score and log_num_comments
from pyspark.sql.types import BooleanType
qtlScore = train_df2.where(col("test")==False).approxQuantile("log_score", [0.00, 0.75], 0.0)
qtlComments = train_df2.where(col("test")==False).approxQuantile("log_num_comments", [0.00, 0.75], 0.0)
stdevScore, meanScore = train_df2.select(F.stddev("log_score"), F.mean("log_score")).first()
stdevComments, meanComments = train_df2.select(F.stddev("log_num_comments"), F.mean("log_num_comments")).first()

# Defining an outlier detection function based on z-score
def outlier(comments, score):
  Outlier = (score>meanScore+(3*stdevScore)) and (comments>meanComments+(3*stdevComments))
  if Outlier:
    return True
  else:
    return False
outlier_udf = udf(outlier, BooleanType())

# Defining an outlier detection function based on IQR
def outlierIQR(comments, score):
  Outlier = (score>qtlScore[1]) and (comments>qtlComments[1])
  if Outlier:
    return True
  else:
    return False
outlierIQR_udf = udf(outlierIQR, BooleanType())

# Computing outliers based on
df3 = train_df2.withColumn("Outlier", outlier_udf(col("log_num_comments"), col("log_score")))
df3IQR = train_df2.withColumn("Outlier", outlierIQR_udf(col("log_num_comments"), col("log_score")))
df4Zscore = df3.where(col("Outlier")==False)# | (col("Outlier")==True)) #can remove outliers if needed

# Printing the change in number of training datapoints due to removal of outliers
print("Choosing Z-score to detect and eliminate outliers...")
print("No. of training datapoints before dropping outliers = {}".format(df3.agg(F.count("test")).first()[0]))
print("No. of training datapoints after dropping outliers = {}".format(df4Zscore.agg(F.count("test")).first()[0]))

# Printing the change in skewness of data as a result of dropping the outliers
print("Skewness of log_score data before dropping outliers = {}".format(df3.agg(F.skewness("log_score")).first()[0]))
print("Skewness of log_score data after dropping outliers = {}".format(df4Zscore.agg(F.skewness("log_score")).first()[0]))
print("Skewness of log_num_comments data before dropping outliers = {}".format(df3.agg(F.skewness("log_num_comments")).first()[0]))
print("Skewness of log_num_comments data after dropping outliers = {}".format(df4Zscore.agg(F.skewness("log_num_comments")).first()[0]))

In [11]:
# Display Outliers Detected Based on Z-score
# ==========================================

display(df3)

In [12]:
# Display Outliers Detected Based on IQR
# ======================================

display(df3IQR)

In [13]:
# Splitting Training DataFrame into Training and Validation Sets
# ==============================================================

# Setting the ratio of training set
trainSplitRatio = 0.7

# Splitting the majority data into training and validation sets
train_set0, val_set0 = df4Zscore.where(col("score")==0).randomSplit([trainSplitRatio, 1-trainSplitRatio], seed = 420)

# Splitting the remaining (minority) data into training and validation sets
train_set, val_set = df4Zscore.where(col("score")!=0).randomSplit([trainSplitRatio, 1-trainSplitRatio], seed = 125)

# Combining the majority and minority datasets to produce the final training and validation datasets
train_dataset = train_set.union(train_set0).cache()
val_dataset = val_set.union(val_set0).cache()

In [14]:
# Additional Features Based on Window Functions
# =============================================

# Importing libraries
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType

# Defining the respective windows
window1 = Window.partitionBy(clean_df.subreddit_id)
window2 = Window.partitionBy(clean_df.author)

# Separating the test data into a new dataframe
test_dataset = clean_df.where(col("test")==True)

# Adding window features to training dataset
train_dataset = train_dataset.withColumn("Avg_Comments_Subreddit",F.avg("num_comments").over(window1))
train_dataset = train_dataset.withColumn("Sum_Comments_Subreddit",F.sum("num_comments").over(window1))
train_dataset = train_dataset.withColumn("Avg_Comments_Author",F.avg("num_comments").over(window2))
train_dataset = train_dataset.withColumn("Sum_Comments_Author",F.sum("num_comments").over(window2))
train_dataset = train_dataset.withColumn("avg_is_crosspostable_subreddit", F.avg(col("is_crosspostable").cast(FloatType())).over(window1))
train_dataset = train_dataset.withColumn("avg_is_crosspostable_author", F.avg(col("is_crosspostable").cast(FloatType())).over(window2))
train_dataset = train_dataset.withColumn("avg_no_follow_subreddit", F.avg(col("no_follow").cast(FloatType())).over(window1))
train_dataset = train_dataset.withColumn("avg_no_follow_author", F.avg(col("no_follow").cast(FloatType())).over(window2))

# Adding window features to validation dataset
val_dataset = val_dataset.withColumn("Avg_Comments_Subreddit",F.avg("num_comments").over(window1))
val_dataset = val_dataset.withColumn("Sum_Comments_Subreddit",F.sum("num_comments").over(window1))
val_dataset = val_dataset.withColumn("Avg_Comments_Author",F.avg("num_comments").over(window2))
val_dataset = val_dataset.withColumn("Sum_Comments_Author",F.sum("num_comments").over(window2))
val_dataset = val_dataset.withColumn("avg_is_crosspostable_subreddit", F.avg(col("is_crosspostable").cast(FloatType())).over(window1))
val_dataset = val_dataset.withColumn("avg_is_crosspostable_author", F.avg(col("is_crosspostable").cast(FloatType())).over(window2))
val_dataset = val_dataset.withColumn("avg_no_follow_subreddit", F.avg(col("no_follow").cast(FloatType())).over(window1))
val_dataset = val_dataset.withColumn("avg_no_follow_author", F.avg(col("no_follow").cast(FloatType())).over(window2))

# Adding window features to test dataset
test_dataset = test_dataset.withColumn("Avg_Comments_Subreddit",F.avg("num_comments").over(window1))
test_dataset = test_dataset.withColumn("Sum_Comments_Subreddit",F.sum("num_comments").over(window1))
test_dataset = test_dataset.withColumn("Avg_Comments_Author",F.avg("num_comments").over(window2))
test_dataset = test_dataset.withColumn("Sum_Comments_Author",F.sum("num_comments").over(window2))
test_dataset = test_dataset.withColumn("avg_is_crosspostable_subreddit", F.avg(col("is_crosspostable").cast(FloatType())).over(window1))
test_dataset = test_dataset.withColumn("avg_is_crosspostable_author", F.avg(col("is_crosspostable").cast(FloatType())).over(window2))
test_dataset = test_dataset.withColumn("avg_no_follow_subreddit", F.avg(col("no_follow").cast(FloatType())).over(window1))
test_dataset = test_dataset.withColumn("avg_no_follow_author", F.avg(col("no_follow").cast(FloatType())).over(window2))

# Display training dataset
display(train_dataset)

In [15]:
# Dropping Rows with [deleted] Authors from ALL datasets
df4 = train_dataset.where(col("author")!="[deleted]")

print("Rows with [deleted] authors removed from train dataset...\n")
print("Training datapoints before dropping [deleted] authors = {}".format(train_dataset.agg(F.count("author")).first()[0]))
print("Training datapoints after dropping [deleted] authors = {}\n".format(df4.agg(F.count("author")).first()[0]))

In [16]:
# Correlation Matrix to Eliminate Boolean / Numerical Columns
# ===========================================================

# Importing libraries
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

# Columns by each type
allNumericColumns = [\
                     'log_num_comments',\
                    ]
allBooleanColumns = [\
                     'author_cakeday', \
                     'brand_safe', \
                     'is_crosspostable', \
                     'no_follow', \
                     'can_gild', \
                     'over_18', \
                    ]

corr_features = allNumericColumns + allBooleanColumns

dfcorr = train_df.select(corr_features)

# convert to vector column first
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=dfcorr.columns, outputCol=vector_col)
df_vector = assembler.transform(dfcorr).select(vector_col)

matrix = Correlation.corr(df_vector, vector_col).collect()[0][0]
corrmatrix = matrix.toArray().tolist()
dfcorrcoefficient = spark.createDataFrame(corrmatrix,corr_features)
display(dfcorrcoefficient)

In [17]:
# Transforming all datasets into feature vectors
# ==============================================

# Importing libraries
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
import string
import re

# Copying all datasets into new dataframes
df5 = df4
df5v = val_dataset
df5_t = test_dataset

# Defining the target variable
labelColumn = 'log_score'

# Selected boolean features
booleanColumns = [\
                  'author_cakeday', \
                  'brand_safe', \
                  'is_crosspostable', \
                  'no_follow', \
                  'can_gild', \
                 ]

# Selected categorical features
categoricalColumns = [\
                      'domain', \
                      'subreddit_id', \
                      'whitelist_status', \
                      'suggested_sort',\
                     ]

# Selected and boolean-derived numeric features
numericColumns = [\
                  'log_num_comments',\
                  'created_utc_day',\
                  'created_utc_hr',\
                  'Avg_Comments_Author',\
                  'Avg_Comments_Subreddit',\
                  'avg_is_crosspostable_subreddit',\
                  'avg_is_crosspostable_author',\
                  'avg_no_follow_subreddit',\
                  'avg_no_follow_author',\
                 ]

# Selected text features
textColumns = [\
               "titleClean", \
               "commentsClean",\
              ]

# Scaling the selected date features to a range of (0,1]
dateFields = {'created_utc_day': 31.0, 'created_utc_hr': 24.0}
dateColumns = []
for dateField in dateFields:
  df5 = df5.withColumn(dateField+"Scaled", (1.0*col(dateField))/dateFields[dateField])
  df5v = df5v.withColumn(dateField+"Scaled", (1.0*col(dateField))/dateFields[dateField])
  df5_t = df5_t.withColumn(dateField+"Scaled", (1.0*col(dateField))/dateFields[dateField])
  dateColumns.append(dateField+"Scaled")

# Initialize transformation pipeline stages
stages = []

# Encoding boolean features
for boolCol in booleanColumns:
  df5 = df5.withColumn(boolCol+"_str", col(boolCol).cast(StringType()))
  df5v = df5v.withColumn(boolCol+"_str", col(boolCol).cast(StringType())) 
  df5_t = df5_t.withColumn(boolCol+"_str", col(boolCol).cast(StringType())) 
  stringIndexer0 = StringIndexer(inputCol = boolCol+"_str", outputCol = boolCol + 'Index').setHandleInvalid("keep")
  encoder0 = OneHotEncoder(inputCols = [stringIndexer0.getOutputCol()], \
                                   outputCols = [boolCol + "classVec"])
  stages += [stringIndexer0, encoder0]

# Encoding categorical features
for categoricalCol in categoricalColumns:
  stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index').setHandleInvalid("keep")
  encoder = OneHotEncoder(inputCols = [stringIndexer.getOutputCol()], \
                          outputCols = [categoricalCol + "classVec"])
  stages += [stringIndexer, encoder]

# Encoding text features
for textCol in textColumns:
  tokenizer = Tokenizer(inputCol = textCol, outputCol = textCol + 'words')
  stopwords = StopWordsRemover().setInputCol(textCol + "words").setOutputCol(textCol+"filtered")
  #ngram = NGram(n=3, inputCol=stopwords.getOutputCol(), outputCol=textCol+"Ngrams")
  hashtf = HashingTF(numFeatures = 500, inputCol = textCol+"filtered", #2**16 \
                     outputCol = textCol + 'tf')
  idf = IDF(inputCol = hashtf.getOutputCol(), outputCol = textCol + 'idf', minDocFreq = 1)
  word_embeddings = Word2Vec(vectorSize=100, minCount=0, inputCol=textCol+"filtered", outputCol=textCol+"Embeddings")
  stages += [tokenizer, stopwords, hashtf, idf, word_embeddings]

# Building feature extraction pipeline and extracting features
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df5)
train_dataset = pipelineModel.transform(df5)

# Selecting top features from categorical columns using chi-square selector
num_features = 45
categoricalColumnsVec = [c + 'classVec' for c in categoricalColumns]
assembler_cat = VectorAssembler(inputCols = categoricalColumnsVec, outputCol = 'categoricalColumnsVecBefore')
selector_cat = ChiSqSelector(numTopFeatures=num_features, \
							 featuresCol="categoricalColumnsVecBefore", \
							 outputCol="categoricalColumnsVecAfter", \
							 labelCol=labelColumn)
cat_selection = Pipeline(stages = [assembler_cat, selector_cat])
cat_selection_model = cat_selection.fit(train_dataset)
train_dataset = cat_selection_model.transform(train_dataset)

# Removal of non ASCII characters from text features
textFeats = ["title", "comments"]
def strip_non_ascii(data_str):
  # Returns the string without non ASCII characters
  stripped = (c for c in data_str if 0 < ord(c) < 127)
  return ''.join(stripped)
# setup pyspark udf function
strip_non_ascii_udf = udf(strip_non_ascii, StringType())
for text in textFeats:
  train_dataset = train_dataset.withColumn(text+'_non_asci',strip_non_ascii_udf(train_dataset[text+'Clean']))

# Fixing of abbreviations in text features
def fix_abbreviation(data_str):
  data_str = data_str.lower()
  data_str = re.sub(r'\bthats\b', 'that is', data_str)
  data_str = re.sub(r'\bits\b', 'it is', data_str)
  data_str = re.sub(r'\bhes\b', 'he is', data_str)
  data_str = re.sub(r'\bshes\b', 'she is', data_str)
  data_str = re.sub(r'\btheyre\b', 'they are', data_str)
  data_str = re.sub(r'\bthatd\b', 'that would', data_str)
  data_str = re.sub(r'\bive\b', 'i have', data_str)
  data_str = re.sub(r'\bim\b', 'i am', data_str)
  data_str = re.sub(r'\bya\b', 'yeah', data_str)
  data_str = re.sub(r'\bcant\b', 'can not', data_str)
  data_str = re.sub(r'\bdont\b', 'do not', data_str)
  data_str = re.sub(r'\bwont\b', 'will not', data_str)
  data_str = re.sub(r'\bisnt\b', 'is not', data_str)
  data_str = re.sub(r'\barent\b', 'are not', data_str)
  data_str = re.sub(r'\bwerent\b', 'were not', data_str)
  data_str = re.sub(r'\bwouldnt\b', 'would not', data_str)
  data_str = re.sub(r'\bshouldnt\b', 'should not', data_str)
  data_str = re.sub(r'\bcouldnt\b', 'could not', data_str)
  data_str = re.sub(r'\bwasnt\b', 'was not', data_str)
  data_str = re.sub(r'\byoure\b', 'you are', data_str)
  data_str = re.sub(r'\bid\b', 'i would', data_str)
  data_str = re.sub(r'wtf', 'what the fuck', data_str)
  data_str = re.sub(r'\bwth\b', 'what the hell', data_str)
  data_str = re.sub(r'\bomg\b', 'oh my god', data_str)
  data_str = re.sub(r'\blol\b', 'laughing', data_str)
  data_str = re.sub(r'\brofl\b', 'laughing', data_str)
  data_str = re.sub(r'\blmao\b', 'laughing', data_str)
  data_str = re.sub(r'\br\b', 'are', data_str)
  data_str = re.sub(r'\bu\b', 'you', data_str)
  data_str = re.sub(r'\bur\b', 'your', data_str)
  data_str = re.sub(r'\bk\b', 'OK', data_str)
  data_str = re.sub(r'\bsux\b', 'sucks', data_str)
  data_str = re.sub(r'\bno+\b', 'no', data_str)
  data_str = re.sub(r'\bcoo+\b', 'cool', data_str)
  data_str = re.sub(r'rt\b', '', data_str)
  data_str = data_str.strip()
  return data_str
fix_abbreviation_udf = udf(fix_abbreviation, StringType())
for text in textFeats:
  train_dataset = train_dataset.withColumn(text+'_fixed_abbrev',fix_abbreviation_udf(train_dataset[text+'_non_asci']))

# Removal of features irrelevant to sentiment analysis
def remove_features(data_str):
  # compile regex
  url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
  punc_re = re.compile('[%s]' % re.escape(string.punctuation))
  num_re = re.compile('(\\d+)')
  mention_re = re.compile('@(\w+)')
  alpha_num_re = re.compile("^[a-z0-9_.]+$")
  # convert to lowercase
  data_str = data_str.lower()
  # remove hyperlinks
  data_str = url_re.sub(' ', data_str)
  # remove @mentions
  data_str = mention_re.sub(' ', data_str)
  # remove puncuation
  data_str = punc_re.sub(' ', data_str)
  # remove numeric 'words'
  data_str = num_re.sub(' ', data_str)
  # remove non a-z 0-9 characters and words shorter than 1 characters
  list_pos = 0
  cleaned_str = ''
  for word in data_str.split():
    if list_pos == 0:
      if alpha_num_re.match(word) and len(word) > 1:
        cleaned_str = word
      else:
        cleaned_str = ' '
    else:
      if alpha_num_re.match(word) and len(word) > 1:
        cleaned_str = cleaned_str + ' ' + word
      else:
        cleaned_str += ' '
    list_pos += 1
    # remove unwanted space, *.split() will automatically split on
    # whitespace and discard duplicates, the " ".join() joins the
    # resulting list into one string.
  return " ".join(cleaned_str.split())
remove_features_udf = udf(remove_features, StringType())
for text in textFeats:
  train_dataset = train_dataset.withColumn(text+'_removed',remove_features_udf(train_dataset[text+'_fixed_abbrev']))

# Sentiemnt analysis function
from pyspark.sql.types import FloatType
from textblob import TextBlob

def sentiment_analysis(text):
  value = TextBlob(text).sentiment.polarity
  if -0.1<value<=0.1:
    value = 0.0
  elif value>0.1:
    value = 1.0
  else:
    value = -1.0
  return value
sentiment_analysis_udf = udf(sentiment_analysis , FloatType())
for text in textFeats:
  train_dataset  = train_dataset.withColumn(text+'_sentiment_score', sentiment_analysis_udf(train_dataset[text+'_removed'] ))

# Calculating length of title  
train_dataset = train_dataset.withColumn("titleLength", F.log(1+F.size(col("titleCleanfiltered"))))

# Calculating average comment length
def commentLength(num, comment):
  if num>0:
    length = len(comment)
    avglength = 1.0*length/num
  else:
    avglength = 0.0
  return avglength
commentLength_udf = udf(commentLength, FloatType())
train_dataset = train_dataset.withColumn("commentLength", F.log(1+commentLength_udf(col("num_comments"), col("commentsClean"))))

# Assembling all features into a vector for the training set
assemblerInputs =  ["categoricalColumnsVecAfter"] \
+ [b + 'classVec' for b in booleanColumns] \
+ numericColumns \
+ dateColumns\
+ ['title_sentiment_score', 'titleLength'] \
+ ['comments_sentiment_score', 'commentLength'] \
+ [t + 'Embeddings' for t in textColumns]
assembler = VectorAssembler(inputCols = assemblerInputs, outputCol = 'features')
train_dataset = assembler.transform(train_dataset)

# Selecting train dataset only where the title at least has a meaningful word
train_dataset = train_dataset.where((F.size(col('titleCleanfiltered'))>1))
train_set = train_dataset.select('features', labelColumn)
train_set = train_set.cache()

# Fitting trained pipelines on validation and test datasets
val_set = pipelineModel.transform(df5v)
val_set = cat_selection_model.transform(val_set)
test_dataset = pipelineModel.transform(df5_t)
test_dataset = cat_selection_model.transform(test_dataset)

# remove non ASCII characters from validation and test sets
val_set = val_set.withColumn('title_non_asci',strip_non_ascii_udf(val_set['titleClean']))
val_set = val_set.withColumn('comments_non_asci',strip_non_ascii_udf(val_set['commentsClean']))
test_dataset = test_dataset.withColumn('title_non_asci',strip_non_ascii_udf(test_dataset['titleClean']))
test_dataset = test_dataset.withColumn('comments_non_asci',strip_non_ascii_udf(test_dataset['commentsClean']))

# fixed abbreviation from validation and test sets
val_set = val_set.withColumn('title_fixed_abbrev',fix_abbreviation_udf(val_set['title_non_asci']))
val_set = val_set.withColumn('comments_fixed_abbrev',fix_abbreviation_udf(val_set['comments_non_asci']))
test_dataset = test_dataset.withColumn('title_fixed_abbrev',fix_abbreviation_udf(test_dataset['title_non_asci']))
test_dataset = test_dataset.withColumn('comments_fixed_abbrev',fix_abbreviation_udf(test_dataset['comments_non_asci']))

# remove features irrelevant to sentiment analysis from validation and test sets
val_set = val_set.withColumn('title_removed',remove_features_udf(val_set['title_fixed_abbrev']))
val_set = val_set.withColumn('comments_removed',remove_features_udf(val_set['comments_fixed_abbrev']))
test_dataset = test_dataset.withColumn('title_removed',remove_features_udf(test_dataset['title_fixed_abbrev']))
test_dataset = test_dataset.withColumn('comments_removed',remove_features_udf(test_dataset['comments_fixed_abbrev']))

# sentiemnt analysis on validation and test sets
val_set  = val_set.withColumn("title_sentiment_score", sentiment_analysis_udf(val_set['title_removed'] ))
val_set  = val_set.withColumn("comments_sentiment_score", sentiment_analysis_udf(val_set['comments_removed'] ))
val_set = val_set.withColumn("titleLength", F.log(1+F.size(col("titleCleanfiltered"))))
test_dataset  = test_dataset.withColumn("title_sentiment_score", sentiment_analysis_udf(test_dataset['title_removed'] ))
test_dataset  = test_dataset.withColumn("comments_sentiment_score", sentiment_analysis_udf(test_dataset['comments_removed'] ))
test_dataset = test_dataset.withColumn("titleLength", F.log(1+F.size(col("titleCleanfiltered"))))

# Average comment length for validation and test sets
val_set = val_set.withColumn("commentLength", F.log(1+commentLength_udf(col("num_comments"), col("commentsClean"))))
test_dataset = test_dataset.withColumn("commentLength", F.log(1+commentLength_udf(col("num_comments"), col("commentsClean"))))

# Assembling features into a vector for validation and test sets
val_set = assembler.transform(val_set)
val_set.cache()
test_dataset = assembler.transform(test_dataset)
test_dataset.cache()

val_set = val_set.select('features', labelColumn)
val_set = val_set.cache()
test_set = test_dataset.select('features', labelColumn)
test_set = test_set.cache()


In [18]:
# Displaying Training Dataset for Plotting
# ========================================

display(train_dataset)

In [19]:
train_set.show(5)
val_set.show(5)
test_set.show(5)

In [20]:
# Defining Performance Measurement Functions
# ==========================================

# Importing libraries
from pyspark.ml.evaluation import RegressionEvaluator

# Root mean square error
def rmse(df, predCol="predictedScore", actCol="actualScore"):
  zero = df.where(col(actCol)==0)
  nonzero = df.where(col(actCol)!=0)
  evaluator = RegressionEvaluator(labelCol=actCol, predictionCol=predCol, metricName="rmse")
  rmse_total = evaluator.evaluate(df)
  rmse_zero = evaluator.evaluate(zero)
  rmse_nonzero = evaluator.evaluate(nonzero)
  return [rmse_total, rmse_zero, rmse_nonzero]

# Symmetric mean absolute percentage error
def smape(df, predCol="predictedScore", actCol="actualScore"):
  zero = df.where(col(actCol)==0)
  nonzero = df.where(col(actCol)!=0)
  sm = df.withColumn("sm", 100*((F.abs(col(predCol) - col(actCol)))/(0.5*(F.abs(col(actCol))+F.abs(col(predCol))))))
  sm_zero = zero.withColumn("sm", 100*((F.abs(col(predCol) - col(actCol)))/(0.5*(F.abs(col(actCol))+F.abs(col(predCol))))))
  sm_nonzero = nonzero.withColumn("sm", 100*((F.abs(col(predCol) - col(actCol)))/(0.5*(F.abs(col(actCol))+F.abs(col(predCol))))))
  smape_total = sm.agg(F.mean("sm")).first()[0]
  smape_zero = sm_zero.agg(F.mean("sm")).first()[0]
  smape_nonzero = sm_nonzero.agg(F.mean("sm")).first()[0]
  return [smape_total, smape_zero, smape_nonzero]

# Mean average error
def mae(df, predCol="predictedScore", actCol="actualScore"):
  zero = df.where(col(actCol)==0)
  nonzero = df.where(col(actCol)!=0)
  evaluator = RegressionEvaluator(labelCol=actCol, predictionCol=predCol, metricName="mae")
  mae_total = evaluator.evaluate(df)
  mae_zero = evaluator.evaluate(zero)
  mae_nonzero = evaluator.evaluate(nonzero)
  return [mae_total, mae_zero, mae_nonzero]

# Lightweight regression evaluators
evaluatorR2 = RegressionEvaluator(labelCol="log_score", predictionCol="prediction", metricName="r2")
evaluatorMAE = RegressionEvaluator(labelCol="actualScore", predictionCol="predictedScore", metricName="mae")

In [21]:
# Training Models for Manual Hyperparameter Search
# Due to Cluster Limitations [initializing results list]
# ======================================================

results_rf = []
results_gbt = []
test_results = []

In [22]:
# Training RF Model for Manual Hyperparameter Search
# Due to Cluster Limitations [training the model]
# ==================================================

# Importing libraries
from pyspark.ml.regression import RandomForestRegressor

# Hyperparameters
num_trees = 13
max_depth = 6

print("Training Random Forest Regressor Model...")
rf = RandomForestRegressor(featuresCol="features", \
						   labelCol="log_score", \
						   numTrees=num_trees, \
						   minInstancesPerNode=5, \
						   maxDepth=max_depth, \
						   minInfoGain=0.00, \
						   featureSubsetStrategy="auto") 

model_rf = rf.fit(train_set)
print("Model Trained")

# Transform validation data
predictions = model_rf.transform(val_set)
output = predictions.withColumn("predictedScore", F.round(F.exp(col("prediction"))-1,0))
output = output.withColumn("actualScore", F.round(F.exp(col("log_score"))-1,0))
print("Validation Data Transformed")

# Transform training data
predictions_tr = model_rf.transform(train_set)
output_tr = predictions_tr.withColumn("predictedScore", F.round(F.exp(col("prediction"))-1,0))
output_tr = output_tr.withColumn("actualScore", F.round(F.exp(col("log_score"))-1,0))
print("Training Data Transformed")

# Printing errors
mae_train = evaluatorMAE.evaluate(output_tr)
print("Train MAE = {}".format(mae_train))
mae_val = evaluatorMAE.evaluate(output)
print("Validation MAE = {}".format(mae_val))

# Append results to list
results_rf.append((num_trees, max_depth, mae_train, mae_val))

In [23]:
# Training RF Model for Manual Hyperparameter Search
# Due to Cluster Limitations [plotting results - Trees]
# =====================================================

results_rfdf = spark.createDataFrame(results_rf, ["numTrees", "maxDepth", "MAE_train", "MAE_val"])
display(results_rfdf.where(col("maxDepth")==5))

In [24]:
# Training RF Model for Manual Hyperparameter Search
# Due to Cluster Limitations [plotting results - Depth]
# =====================================================

results_rfdf = spark.createDataFrame(results_rf, ["numTrees", "maxDepth", "MAE_train", "MAE_val"])
display(results_rfdf.where(col("numTrees")==13))

In [25]:
# Apply Tuned RF Model to Test Data
# =================================

# Transform test data
predictions_tst = model_rf.transform(test_set)
output_tst = predictions_tst.withColumn("predictedScore", F.round(F.exp(col("prediction"))-1,0))
output_tst = output_tst.withColumn("actualScore", F.round(F.exp(col("log_score"))-1,0))
print("Test Data Transformed")

# Printing errors
mae_test = mae(output_tst)
test_results.append(("Random Forest", mae_test[0], mae_test[1], mae_test[2]))
print("Test MAE = {}".format(mae_test))

In [26]:
# Training GBT Model for Manual Hyperparameter Search
# Due to Cluster Limitations
# ===================================================

# Importing libraries
from pyspark.ml.regression import GBTRegressor

# Hyperparameters
maxIter = 10
maxDepth = 4

print("Training Gradient Boosted Trees Regressor Model...")
gbt = GBTRegressor(\
                   featuresCol="features",\
                   labelCol="log_score",\
                   maxIter=maxIter,\
                   maxDepth=maxDepth,\
                  )

model_gbt = gbt.fit(train_set)
print("Model Trained")

# Transform validation data
predictions_gb = model_gbt.transform(val_set)
output_gb = predictions_gb.withColumn("predictedScore", F.round(F.exp(col("prediction"))-1,0))
output_gb = output_gb.withColumn("actualScore", F.round(F.exp(col("log_score"))-1,0))
print("Validation Data Transformed")

# Transform training data
predictions_gbtr = model_gbt.transform(train_set)
output_gbtr = predictions_gbtr.withColumn("predictedScore", F.round(F.exp(col("prediction"))-1,0))
output_gbtr = output_gbtr.withColumn("actualScore", F.round(F.exp(col("log_score"))-1,0))
print("Training Data Transformed")

# Error computation
# model_dict["rmse_val"] = rmse(output)
mae_valid = evaluatorMAE.evaluate(output_gb)
print("Validation MAE = {}".format(mae_valid))
mae_train = evaluatorMAE.evaluate(output_gbtr)
print("Training MAE = {}".format(mae_train))

# Append results to list
results_gbt.append((maxIter, maxDepth, mae_train, mae_valid))

In [27]:
# Training GBT Model for Manual Hyperparameter Search
# Due to Cluster Limitations [plotting results - maxIter]
# =======================================================

results_gbtdf = spark.createDataFrame(results_gbt, ["maxIter", "maxDepth", "MAE_train", "MAE_val"])
display(results_gbtdf.where(col("maxDepth")==1))

In [28]:
# Training GBT Model for Manual Hyperparameter Search
# Due to Cluster Limitations [plotting results - Depth]
# =====================================================

results_gbtdf = spark.createDataFrame(results_gbt, ["maxIter", "maxDepth", "MAE_train", "MAE_val"])
display(results_gbtdf.where(col("maxIter")==10))

In [29]:
# Apply Tuned GBT Model to Test Data
# ==================================

# Transform test data
predictions_gbtst = model_gbt.transform(test_set)
output_gbtst = predictions_gbtst.withColumn("predictedScore", F.round(F.exp(col("prediction"))-1,0))
output_gbtst = output_gbtst.withColumn("actualScore", F.round(F.exp(col("log_score"))-1,0))
print("Test Data Transformed")

# Computing errors
mae_test = mae(output_gbtst)
test_results.append(("Gradient Boosted Trees", mae_test[0], mae_test[1], mae_test[2]))
print("Test MAE = {}".format(mae_test))

In [30]:
# Displaying test results for plotting
# ====================================

results_testdf = spark.createDataFrame(test_results, ["Model", "MAE_total", "MAE_zero", "MAE_nonzero"])
display(results_testdf)

In [31]:
# Hyperparameter Tuning for RF Model Using Parameter Grid Search
# ==============================================================

# Importing Libraries
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

# Instantiating an RF model
rf = RandomForestRegressor(featuresCol="features", \
                           labelCol="log_score", \
                           minInstancesPerNode=5, \
                           minInfoGain=0.00, \
                           featureSubsetStrategy="auto")

# Setting parameters for grid search
num_trees = [11,12,13]
max_depth = [7,8,9]
paramGrid = ParamGridBuilder()\
            .addGrid(rf.numTrees, num_trees) \
            .addGrid(rf.maxDepth, max_depth)\
            .build()

# Train-validation split
tvs = TrainValidationSplit(estimator=rf,\
                           estimatorParamMaps=paramGrid,\
                           parallelism=10,\
                           evaluator=evaluatorMAE,\
                           trainRatio=0.70)

model_rf = tvs.fit(train_set.union(val_set))

# Best model parameters
best_rfmodel = model_rf.bestModel
param_dict = best_rfmodel.stages[-1].extractParamMap()
print("Tuned Model Parameters:")
hyperparameters = {}
for k, v in param_dict.items():
  hyperparameters[k.name] = v
  print("Hyper-parameter {} = {}".format(k.name, hyperparameters[k.name]))

# Transform test data
predictions_tst = best_rfmodel.transform(test_set)
output_tst = predictions_tst.withColumn("predictedScore", F.round(F.exp(col("prediction"))-1,0))
output_tst = output_tst.withColumn("actualScore", F.round(F.exp(col("log_score"))-1,0))
print("Test Data Transformed")
mae_tst = mae(output_tst)
print("Test MAE (total): {}".format(mae_tst[0]))
print("Test MAE (zero): {}".format(mae_tst[1]))
print("Test MAE (nonzero): {}\n".format(mae_tst[2]))

# Transform training data
predictions_tr = best_rfmodel.transform(train_set.union(val_set))
output_tr = predictions_tr.withColumn("predictedScore", F.round(F.exp(col("prediction"))-1,0))
output_tr = output_tr.withColumn("actualScore", F.round(F.exp(col("log_score"))-1,0))
print("Training Data Transformed")
mae_tr = mae(output_tr)
print("Training MAE (total): {}".format(mae_tr[0]))
print("Training MAE (zero): {}".format(mae_tr[1]))
print("Training MAE (nonzero): {}\n".format(mae_tr[2]))

In [32]:
# Hyperparameter Tuning for GBT Model Using Parameter Grid Search
# ==============================================================+

# Importing Libraries
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

# Instantiating an RF model
gbt = RandomForestRegressor(featuresCol="features", \
                            labelCol="log_score", \
                            minInstancesPerNode=5)

# Setting parameters for grid search
max_iter = [11,12,13]
max_depth = [2,3,4]
paramGrid = ParamGridBuilder()\
            .addGrid(gbt.maxIter, max_iter) \
            .addGrid(gbt.maxDepth, max_depth)\
            .build()

# Train-validation split
tvs = TrainValidationSplit(estimator=gbt,\
                           estimatorParamMaps=paramGrid,\
                           parallelism=10,\
                           evaluator=evaluatorMAE,\
                           trainRatio=0.70)

model_gbt = tvs.fit(train_set.union(val_set))

# Best model parameters
best_gbtmodel = model_gbt.bestModel
param_dict = best_gbtmodel.stages[-1].extractParamMap()
print("Tuned Model Parameters:")
hyperparameters = {}
for k, v in param_dict.items():
  hyperparameters[k.name] = v
  print("Hyper-parameter {} = {}".format(k.name, hyperparameters[k.name]))

# Transform test data
predictions_tst = best_gbtmodel.transform(test_set)
output_tst = predictions_tst.withColumn("predictedScore", F.round(F.exp(col("prediction"))-1,0))
output_tst = output_tst.withColumn("actualScore", F.round(F.exp(col("log_score"))-1,0))
print("Test Data Transformed")
mae_tst = mae(output_tst)
print("Test MAE (total): {}".format(mae_tst[0]))
print("Test MAE (zero): {}".format(mae_tst[1]))
print("Test MAE (nonzero): {}\n".format(mae_tst[2]))

# Transform training data
predictions_tr = best_gbtmodel.transform(train_set.union(val_set))
output_tr = predictions_tr.withColumn("predictedScore", F.round(F.exp(col("prediction"))-1,0))
output_tr = output_tr.withColumn("actualScore", F.round(F.exp(col("log_score"))-1,0))
print("Training Data Transformed")
mae_tr = mae(output_tr)
print("Training MAE (total): {}".format(mae_tr[0]))
print("Training MAE (zero): {}".format(mae_tr[1]))
print("Training MAE (nonzero): {}\n".format(mae_tr[2]))