### Possibly interesting features based on https://news.ycombinator.com/item?id=36590226
- Time of day [Done]
- Analysis on title: TF-IDF? Remove stopwords? Any other importance measures, and onehot encode the impt words?

In [None]:
import os

import pandas as pd

from pyspark import SparkContext

from pyspark.sql import SQLContext, Window, SparkSession
from pyspark.sql.functions import col,sum,desc,when,udf, percent_rank

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression, LinearSVC, RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder, Tokenizer, StopWordsRemover, CountVectorizer, IDF, PCA, HashingTF


from datetime import datetime

import warnings
warnings.filterwarnings('ignore')

In [None]:
# To deal with running out of memory
spark = SparkSession.builder.master('local[2]').config("spark.driver.memory", "8g").appName('PySparkShell').getOrCreate()

In [None]:
spark

In [None]:
# Change file path
wd = "/Users/hydraze/Library/CloudStorage/GoogleDrive-tohziyu2@gmail.com/My Drive/Studies/KU Leuven/Courses/Classes/Y1S2/Advanced Analytics in Business/Project/3/AdvancedAnalytics_Streaming-Text-Analytics"
os.chdir(wd)

In [None]:
# Load data
df = spark.read.json(wd + "/data/*")

In [None]:
# # Try to speed things up by coalescing the partitions
# """
# Source: https://stackoverflow.com/questions/35800795/number-of-partitions-in-rdd-and-performance-in-spark
# Too few partitions You will not utilize all of the cores available in the cluster.
# Too many partitions There will be excessive overhead in managing many small tasks.
# Between the two the first one is far more impactful on performance. Scheduling too many smalls tasks is a relatively small impact at this point for partition counts below 1000. If you have on the order of tens of thousands of partitions then spark gets very slow.
# """
# df = df.coalesce(10)

In [None]:
# # Numebr of rows:
# df.count() # All data: 12345

In [None]:
# Drop duplicates
df = df.dropDuplicates(subset=["aid"])
df.count() # 10035

In [None]:
# # Max and min posted-at date
# min_date = df.agg({"posted_at": "min"}).collect()[0][0]
# max_date = df.agg({"posted_at": "max"}).collect()[0][0]
# print(f"min posted at date: {min_date}")
# print(f"min posted at date: {max_date}")

In [None]:
# Convert frontpage to numeric
df = df.withColumn('frontpage', when(df.frontpage==True, 1).otherwise(0))

In [None]:
# Compile cleaning steps which cannot be fit into a pipeline. These steps will not cause data leakage
# Will have to be implemented on the script for doing streaming predictions

# Extracting type of post: Show HN
df = df.withColumn('isShowHN', when(df.title.contains("Show HN"), 1).otherwise(0))

# Extracting time of day
extract_time_of_day_udf = udf(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S').strftime('%H'))


df = df.withColumn('time_of_day', extract_time_of_day_udf(df.posted_at))

# Extracting day of week
weekDay =  udf(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S').strftime('%w'))

df = df.withColumn('day_of_week', weekDay(df.posted_at))

# Fill null values
df = df.na.fill({"title": "", "source_title": "", "source_text": ""})

### Modelling time

In [None]:
# Train test split based on time: Have to prevent data leakage. Sort dataframe by posted_at, and give a percentile rank allowing us to 
# split the dataset in to two parts based on time
df = df.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("posted_at"))).collect() # Computationally heavy step. to collect here first
df = spark.createDataFrame(df)

train = df.where("rank <= .8").drop("rank")
test = df.where("rank > .8").drop("rank")


In [None]:
# Other cleaning stages which can be fit into a pipeline. Will automatically apply all steps to test set
NUM_COL = ['votes', 'comments']

# Dealing with numerical variables
numerical_vector_assembler = VectorAssembler(inputCols=NUM_COL, outputCol='num_col_vector')
std_scaler = StandardScaler(inputCol='num_col_vector',
                            outputCol='scaled_num_col_vector',
                            withStd=True, withMean=True)

# Dealing with categorical variables
isShowHN_indexer = StringIndexer(inputCol='isShowHN',
                            outputCol='isShowHN_index',handleInvalid = "keep")
isShowHN_ohe = OneHotEncoder(inputCol='isShowHN_index',
                                outputCol='isShowHN_OHE',handleInvalid = "keep")

time_of_day_indexer = StringIndexer(inputCol='time_of_day',
                            outputCol='time_of_day_index',handleInvalid = "keep")
time_of_day_ohe = OneHotEncoder(inputCol='time_of_day_index',
                                outputCol='time_of_day_OHE',handleInvalid = "keep")

day_of_week_indexer = StringIndexer(inputCol='day_of_week',
                            outputCol='day_of_week_index',handleInvalid = "keep")
day_of_week_ohe = OneHotEncoder(inputCol='day_of_week_index',
                                outputCol='day_of_week_OHE',handleInvalid = "keep")

# Text processing steps
# 1. Tokenization
tokenizer_title = Tokenizer(inputCol="title", outputCol="title_tokens")
tokenizer_source = Tokenizer(inputCol="source_title", outputCol="source_title_tokens")

# 2. Stop Word Removal
remover_title = StopWordsRemover(inputCol="title_tokens", outputCol="title_filtered")
remover_source = StopWordsRemover(inputCol="source_title_tokens", outputCol="source_title_filtered")

# 3. Hashing
hashingTF_title = HashingTF(inputCol="title_filtered", outputCol="title_rawFeatures", numFeatures=20)
hashingTF_source = HashingTF(inputCol="source_title_filtered", outputCol="source_rawFeatures", numFeatures=20)

# 4. TF-IDF
idf_title = IDF(inputCol="title_rawFeatures", outputCol="title_features")
idf_source = IDF(inputCol="source_rawFeatures", outputCol="source_features")

# 5. PCA - Cut hash feastures by half
pca_title = PCA(k=10, inputCol="title_features", outputCol="title_pcaFeatures")
pca_source = PCA(k=10, inputCol="source_features", outputCol="source_pcaFeatures")

# To put all numerical vectors and onehotencoded categorical variables into the same final_feature_vector vector
overall_assembler = VectorAssembler(inputCols=['scaled_num_col_vector',
                                               'isShowHN_OHE',
                                               'time_of_day_OHE',
                                               'day_of_week_OHE',
                                              'title_pcaFeatures',
                                              'source_pcaFeatures'],
                                    outputCol='final_feature_vector')


In [None]:
# Initiate model(s) and params
lr = LogisticRegression(maxIter=100, family="binomial",
                        featuresCol='final_feature_vector', labelCol='frontpage',
                       # weightCol="frontpage"
                       )

param_grid_lr = (ParamGridBuilder()
                .addGrid(lr.regParam, [0.2, 0.6, 0.8, 1]) 
                .addGrid(lr.elasticNetParam, [0.0, 0.2, 0.8, 1.0]) 
                .build())

rfc = RandomForestClassifier(maxDepth=30, seed=42, 
                             #weightCol="frontpage", 
                             labelCol='frontpage', featuresCol='final_feature_vector')

param_grid_rfc = (ParamGridBuilder()
                 .addGrid(rfc.numTrees, [10, 20, 160, 640]) 
                 .addGrid(rfc.impurity, ['gini', 'entropy']) 
                 .build())

svc = LinearSVC(labelCol='frontpage', featuresCol='final_feature_vector',
                #weightCol="frontpage" 
               )

param_grid_svc = (ParamGridBuilder()
                .addGrid(svc.regParam, [0.001, 0.1, 1, 10, 1000]) 
                .build())

In [None]:
# Compile final pipelines
cleaning_stages = [numerical_vector_assembler, std_scaler, 
                   isShowHN_indexer, isShowHN_ohe, time_of_day_indexer, time_of_day_ohe, day_of_week_indexer, day_of_week_ohe, 
                   tokenizer_title, tokenizer_source, remover_title, remover_source, hashingTF_title, hashingTF_source,
                   idf_title, idf_sourcepca_title, pca_source,
                   overall_assembler]

pipeline_lr = Pipeline(stages=cleaning_stages + [lr])
pipeline_rfc = Pipeline(stages=cleaning_stages + [rfc])
pipeline_svc = Pipeline(stages=cleaning_stages + [svc])

In [None]:
# Defining evaluator and crossvalidation object
evaluator = BinaryClassificationEvaluator().setLabelCol("frontpage")

kfolds = 2
seed = 42
n_cores = 4

# Join everything together using a CrossValidator object for each model
crossval_lr = CrossValidator(estimator=pipeline_lr, estimatorParamMaps=param_grid_lr, evaluator=evaluator, 
    numFolds=kfolds, parallelism=n_cores, seed=seed
)

crossval_rfc = CrossValidator(estimator=pipeline_rfc, estimatorParamMaps=param_grid_rfc, evaluator=evaluator, 
    numFolds=kfolds, parallelism=n_cores, seed=seed
)

crossval_svc = CrossValidator(estimator=pipeline_svc, estimatorParamMaps=param_grid_svc, evaluator=evaluator, 
    numFolds=kfolds, parallelism=n_cores, seed=seed
)

In [None]:
# Training and evaluating LR
cvModel_lr = crossval_lr.fit(train)

best_model_lr = cvModel_lr.bestModel
best_score_lr = cvModel_lr.avgMetrics[0]



In [None]:
print("Best LR model: ", best_model_lr)
print("Best LR score: ", best_score_lr)

print("\n")

best_lr_params = best_model_lr.stages[-1].extractParamMap()
for parameter, value in best_lr_params.items():
    print(f"{str(parameter):50s}, {value}")

print("\n")

# Test scores
test_pred_lr = best_model_lr.transform(test)
print("Test scores for LR", evaluator.evaluate(test_pred_lr))

In [None]:
# Training and evaluating RFC
cvModel_rfc = crossval_rfc.fit(train)


best_model_rfc = cvModel_rfc.bestModel
best_score_rfc = cvModel_rfc.avgMetrics[0]


In [None]:
print("Best RFC model: ", best_model_rfc)
print("Best RFC score: ", best_score_rfc)

print("\n")


best_rfc_params = best_model_rfc.stages[-1].extractParamMap()
for parameter, value in best_rfc_params.items():
    print(f"{str(parameter):50s}, {value}")

print("\n")

# Test scores
test_pred_rfc = best_model_rfc.transform(test)
print("Test scores for RFC", evaluator.evaluate(test_pred_rfc))

In [None]:
# Training and evaluating SVC
cvModel_svc = crossval_svc.fit(train)

best_model_svc = cvModel_svc.bestModel
best_score_svc = cvModel_svc.avgMetrics[0]



In [None]:
print("Best SVC model: ", best_model_svc)
print("Best SVC score: ", best_score_svc)

print("\n")


best_svc_params = best_model_svc.stages[-1].extractParamMap()
for parameter, value in best_svc_params.items():
    print(f"{str(parameter):50s}, {value}")
    
print("\n")

# Test scores
test_pred_svc = best_model_svc.transform(test)
print("Test scores for SVC", evaluator.evaluate(test_pred_svc))

In [None]:
# save best model based on test scores: One from [best_model_lr, best_model_rfc, best_model_svc]
# The beest one trained on all training data is: SVC
mPath =  wd+"/models/best_model"
best_model_svc.write().overwrite().save(mPath)