In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, TimestampType, DateType, BooleanType
from pyspark.sql import functions as f
from pyspark.sql.functions import isnan, when, count, col, year, lower, regexp_replace, length

In [0]:
spark = SparkSession.builder.appName('Eluvio').getOrCreate()

In [0]:
#load data 
path = '/FileStore/tables/Eluvio_DS_Challenge.csv'
df = spark.read.csv(path, header = 'true')

In [0]:
df.show()

##### check and remove missing value

In [0]:
print((df.count(), len(df.columns)))

In [0]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [0]:
data = df.dropna()

In [0]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

In [0]:
print((data.count(), len(data.columns)))

##### modify data types

In [0]:
# check data type
data.printSchema()

In [0]:
data = data.withColumn("date_created",data.date_created.cast(DateType()))\
.withColumn('up_votes', data.up_votes.cast(IntegerType()))\
.withColumn('down_votes', data.down_votes.cast(IntegerType()))\
.withColumn('over_18', data.over_18.cast(BooleanType()))

In [0]:
data.printSchema()

##### quick look at each variables

In [0]:
data = data.withColumn('year_created', year(data['date_created']))

In [0]:
data['year_created', 'up_votes'].groupBy('year_created').mean('up_votes').show()
# there are significant variation in number of up_votes from 2009 to 2016
# but it can be seperate into three period, 2008-2010, 2011-2013, 2014-2016

In [0]:
data.groupBy('year_created').count().show()

In [0]:
data.groupBy('category').count().show()
# the number of titles belong to worldnews class is far more than others
# so we can took it as only one category

In [0]:
data.groupBy('over_18').count().show()

##### Research Question: Popularity prediction using data from 2014-2016

###### extract data from 2014-2016

In [0]:
df = data.filter(('year_created== 2014') or ('year_created== 2015') or ('year_created== 2016')).select('up_votes','title')

###### label data with two classes: popular, unpopular

In [0]:
# take median as threshold
threshold = df.approxQuantile('up_votes', [0.5], 0.25)

In [0]:
threshold
# half of the videos cannot get at least 2 up_votes

In [0]:
# add class to the datafrom
df = df.withColumn('class', f.when((df['up_votes']>threshold[0]),'popular').otherwise('unpopular'))

In [0]:
df.show()

###### text data clean

In [0]:
# to lowercase
df = df.withColumn('text', lower(df['title']))

In [0]:
# remove numbers and punctuations
df = df.withColumn('text_only', regexp_replace(df['text'],'\W', ' '))

In [0]:
# check text length (see if there is clear difference in length of two classes)
df = df.withColumn('length', length(df['text_only']))
df.groupBy('class').mean('length').show()
# length difference can be ignored 

###### create data processing pipeline

In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [0]:
tokenizer = Tokenizer(inputCol = 'text_only', outputCol = 'token_text')
stop_remover = StopWordsRemover(inputCol = 'token_text', outputCol = 'stop_tokens')
hashing_tf = HashingTF(inputCol = 'stop_tokens', outputCol = 'rawFeatures')
idf = IDF(inputCol = 'rawFeatures', outputCol = 'tf_idf')
class_to_numeric = StringIndexer(inputCol = 'class', outputCol = 'Label')

In [0]:
from pyspark.ml.feature import VectorAssembler

In [0]:
clean_up = VectorAssembler(inputCols = ['tf_idf'], outputCol = 'features')

In [0]:
from pyspark.ml import Pipeline

In [0]:
data_prep_pipe = Pipeline(stages = [class_to_numeric, tokenizer, stop_remover, hashing_tf, idf, clean_up])

In [0]:
cleaner = data_prep_pipe.fit(df)

In [0]:
clean_data = cleaner.transform(df)

In [0]:
clean_data = clean_data.select('features','Label')

###### modeling and predition

In [0]:
# split data into training and test set
training, test = clean_data.randomSplit([0.7,0.3])

In [0]:
# select model
from pyspark.ml.classification import (NaiveBayes, LogisticRegression, 
                                       DecisionTreeClassifier, RandomForestClassifier, GBTClassifier)

In [0]:
nb = NaiveBayes(labelCol = 'Label',featuresCol = 'features')
rf = RandomForestClassifier(labelCol = 'Label',featuresCol = 'features')
# gbt = GBTClassifier(maxIter = 10, labelCol = 'Label')

In [0]:
nb_model = nb.fit(training)
nb_predictions = nb_model.transform(test)

In [0]:
rf_model = rf.fit(training)
rf_predictions = rf_model.transform(test)

In [0]:
# gbt_model = gbt.fit(training)
# gbt_predictions = gbt_model.transform(test)

###### evaluation

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
evaluator = MulticlassClassificationEvaluator(labelCol = 'Label', predictionCol = 'prediction', 
                                              metricName = 'accuracy')

In [0]:
nb_accuracy = evaluator.evaluate(nb_predictions)

In [0]:
print("Accuracy of NaiveBayes Classifier is = %g"% (nb_accuracy))

In [0]:
rf_accuracy = evaluator.evaluate(rf_predictions)
print("Accuracy of Random Forest Classifier is = %g"% (rf_accuracy))