In [1]:
import sys, time, pickle
from pyspark import SparkContext, SparkConf
from pyspark.sql import *
from IPython.core.display import display, HTML
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import ClusteringEvaluator
import pandas as pd
import numpy as np

from sklearn.feature_selection import mutual_info_regression

import matplotlib.pyplot as plt

# make matplotlib plot sizes larger
plt.rcParams['figure.figsize'] = [30, 20]

conf = SparkConf().setAppName('Steam Random Forest Regressor').setMaster('spark://sparkmaster:7077')
SparkContext.setSystemProperty('spark.executor.memory', '2g') # memory per executor
SparkContext.setSystemProperty('spark.executor.cores', '6') # cores per executor
SparkContext.setSystemProperty('spark.executor.instances', '3') # per worker (computer)

# https://spark.apache.org/docs/3.0.0-preview/configuration.html#dynamic-allocation
# https://stackoverflow.com/questions/26168254/how-to-set-amount-of-spark-executors
# https://blog.cloudera.com/how-to-tune-your-apache-spark-jobs-part-2/

# SparkContext.setSystemProperty("spark.shuffle.service.enabled", "True") # required for dynamic allocation below
# SparkContext.setSystemProperty("spark.dynamicAllocation.enabled", "True")
# SparkContext.setSystemProperty("spark.executor.cores", "4")
# SparkContext.setSystemProperty("spark.dynamicAllocation.minExecutors", "1")
# SparkContext.setSystemProperty("spark.dynamicAllocation.maxExecutors", "5")
# SparkContext.setSystemProperty('spark.executor.memory', '2g') # memory per executor

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [3]:
sc._conf.getAll()

[('spark.executor.memory', '2g'),
 ('spark.driver.host', 'jupyterlab'),
 ('spark.app.id', 'app-20210419213454-0005'),
 ('spark.driver.port', '35941'),
 ('spark.executor.instances', '3'),
 ('spark.app.name', 'Steam Random Forest Regressor'),
 ('spark.executor.id', 'driver'),
 ('spark.master', 'spark://sparkmaster:7077'),
 ('spark.executor.cores', '6'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.startTime', '1618889694183'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

In [None]:
df = pickle.load(open("df.p", "rb"))
feature_list = pickle.load(open("feature_list.p", "rb"))
assembler = pickle.load(open("assembler.p", "rb"))

In [None]:
rf = RandomForestRegressor(labelCol="days_until_discount", featuresCol="features")
gbt = GBTRegressor(labelCol="days_until_discount", featuresCol="features")
lr = LinearRegression(labelCol="days_until_discount", featuresCol="features")

pipeline = Pipeline(stages=[assembler, rf])
gbt_pipeline = Pipeline(stages=[assembler, gbt])
linear_pipeline = Pipeline(stages=[assembler, lr])

In [None]:
# TODO: tweak these parameters more

# Random Forest
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [int(x) for x in np.linspace(start=10, stop=200, num=8)]) \
.addGrid(rf.maxDepth, [int(x) for x in np.linspace(start=1, stop=20, num=8)]) \
.build()
#.addGrid(rf.maxBins, ?) \ # should this also be set?

# Gradient Boosted Tree
gbt_paramGrid = ParamGridBuilder() \
.addGrid(gbt.maxIter, [int(x) for x in np.linspace(start=5, stop=25, num=2)]) \
.addGrid(gbt.maxDepth, [int(x) for x in np.linspace(start=1, stop=20, num=8)]) \
.build()
#.addGrid(gbt.maxBins, ?) \ # should this also be set?

# Linear Regression
linear_paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.1, 0.01]) \
.addGrid(lr.fitIntercept, [False, True]) \
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
.build()

In [None]:
evaluator = RegressionEvaluator().setLabelCol("days_until_discount")

crossval = CrossValidator(estimator=pipeline,
                         estimatorParamMaps=paramGrid,
                         evaluator=evaluator,
                         numFolds=3)

gbt_crossval = CrossValidator(estimator=gbt_pipeline,
                         estimatorParamMaps=gbt_paramGrid,
                         evaluator=evaluator,
                         numFolds=3)

linear_crossval = CrossValidator(estimator=linear_pipeline,
                         estimatorParamMaps=linear_paramGrid,
                         evaluator=evaluator,
                         numFolds=3)

In [None]:
df.info()

In [None]:
trainingData = df.sample(frac=0.8)
testData = df.drop(trainingData.index)

In [None]:
# convert pandas dataframe to spark dataframe
trainingData = sqlContext.createDataFrame(trainingData)
testData = sqlContext.createDataFrame(testData)

In [None]:
trainingData.summary().show()

In [None]:
trainingData.head()

In [None]:
testData.summary().show()

In [None]:
testData.head()

In [None]:
# TODO: look into partitions
# https://luminousmen.com/post/spark-partitions
# https://www.dezyre.com/article/how-data-partitioning-in-spark-helps-achieve-more-parallelism/297



In [None]:
# calculate mutual information
# https://www.kaggle.com/ryanholbrook/mutual-information

def make_mi_scores(X, y, discrete_features):
    mi_scores = mutual_info_regression(X, y, discrete_features=discrete_features)
    mi_scores = pd.Series(mi_scores, name="MI Scores", index=X.columns)
    mi_scores = mi_scores.sort_values(ascending=False)
    return mi_scores

integer_df = df.select_dtypes(include=['int', 'float'])

X = integer_df.copy()
X.pop("appid")
y = X.pop("days_until_discount")

discrete_features = X.dtypes == int

mi_scores = make_mi_scores(X, y, discrete_features)
mi_scores[::2]

In [None]:
def plot_mi_scores(scores):
    scores = scores.sort_values(ascending=True)
    width = np.arange(len(scores))
    ticks = list(scores.index)
    plt.barh(width, scores)
    plt.yticks(width, ticks)
    plt.title("Mutual Information Scores")


plt.figure(dpi=100, figsize=(10, 14))
plot_mi_scores(mi_scores)

In [None]:
mi_scores.head(n=20)

In [None]:
starttime = time.time()
cvModel = crossval.fit(trainingData)
endtime = time.time()

print("Random Forest Training took: " + str((endtime-starttime)/60) + " minutes")

cvModel.write().overwrite().save("/work/steam-randomforest-model")

In [None]:
starttime = time.time()
gbt_model = gbt_crossval.fit(trainingData)
endtime = time.time()

print("Gradient Boosted Tree Training took: " + str((endtime-starttime)/60) + " minutes")

gbt_model.write().overwrite().save("/work/steam-gbt-model")

In [None]:
starttime = time.time()
linear_model = linear_crossval.fit(trainingData)
endtime = time.time()

print("Linear Regression Training took: " + str((endtime-starttime)/60) + " minutes")

linear_model.write().overwrite().save("/work/steam-linear-model")

In [None]:
predictions = cvModel.transform(testData)
gbt_predictions = gbt_model.transform(testData)
linear_predictions = linear_model.transform(testData)

In [None]:
sc.stop()