# Connect to Hive

In [1]:
import math
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.regression import LinearRegression, GBTRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import when, avg, coalesce, date_format, sin, cos

# Add here your team number teamx
team = "team7"

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

# spark = SparkSession.builder\
#     .appName("{} - spark ML".format(team))\
#     .master("yarn")\
#     .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
#     .config("spark.sql.warehouse.dir", warehouse)\
#     .config("spark.sql.avro.compression.codec", "snappy")\
#     .enableHiveSupport()\
#     .getOrCreate()

spark = SparkSession.builder\
    .appName("{} - spark ML".format(team))\
    .master("yarn")\
    .config("spark.executor.instances", 32)\
    .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
    .config("spark.sql.warehouse.dir", warehouse)\
    .config("spark.sql.avro.compression.codec", "snappy")\
    .enableHiveSupport()\
    .getOrCreate()

sc = spark.sparkContext

In [2]:
spark

# list Hive databases

In [3]:
spark.sql("SHOW DATABASES;").show()

+--------------------+
|           namespace|
+--------------------+
|             default|
|             root_db|
|     team0_projectdb|
|team12_hive_proje...|
|    team13_projectdb|
|    team14_projectdb|
|    team15_projectdb|
|    team16_projectdb|
|    team18_projectdb|
|    team19_projectdb|
|     team1_projectdb|
|    team20_projectdb|
|    team21_projectdb|
|    team22_projectdb|
|    team23_projectdb|
|    team25_projectdb|
|    team26_projectdb|
|    team28_projectdb|
|    team29_projectdb|
|     team2_projectdb|
+--------------------+
only showing top 20 rows



# Specify the input and output features

In [4]:
# We will use the following features
# Excluede 'description' because there will be no text processing here
# Excluded all 'id's because it not relevant for model
features = [
    'family', 'onpromotion', 'store_nbr',  # main table
    'dcoilwtico',  # oil table
    'city', 'state', 'type', 'cluster',  # stores table
    'transactions',  # transactions table
    'year', 'month', 'day']  # encoded dates

# The output/target of our model
label = 'sales'

# Read hive tables

In [5]:
main = spark.read.format("avro").table('team7_projectdb.main_part')
oil = spark.read.format("avro").table('team7_projectdb.oil')
hol_events = spark.read.format("avro").table('team7_projectdb.holidays_events')
stores = spark.read.format("avro").table('team7_projectdb.stores')
transactions = spark.read.format("avro").table('team7_projectdb.transactions')

# Feature selection

In [6]:
encode_main = ['family']
encode_stores = ['city', 'state', 'type_store']

# Feature extraction

In [7]:
# Table main
# Encode categorical features in table main
indexers_main = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in encode_main]
pipeline = Pipeline(stages=indexers_main)
main = pipeline.fit(main).transform(main).drop(*encode_main + ["id"])

# Table oil
# Fill missing values in oil table with average of neighbors
window = Window.rowsBetween(-1, 1)
oil = oil.withColumn("avg_dcoilwtico", avg(oil["dcoilwtico"]).over(window))
oil = oil.withColumn("dcoilwtico", coalesce(oil["dcoilwtico"], oil["avg_dcoilwtico"]))
oil = oil.drop(*["avg_dcoilwtico", "id"])

# Table holidays_events
# Convert boolean "transferred" column to integer column
hol_events = hol_events.withColumn('transferred', when(hol_events.transferred, 1).otherwise(0)).drop("id")

# Table stores
# Encode categorical features in table stores
stores = stores.withColumnRenamed("type", "type_store")
indexers_stores = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in encode_stores]
pipeline = Pipeline(stages=indexers_stores)
stores = pipeline.fit(stores).transform(stores).drop(*encode_stores)

# Table transactions
transactions = transactions.drop("id")

# Join tables
transformed = main \
    .join(oil, on="dates", how="left") \
    .join(hol_events, on="dates", how="left") \
    .join(transactions, on=["dates", "store_nbr"], how="left") \
    .join(stores, on="store_nbr", how="left") \

# Split dates to year, month, day
transformed = transformed. \
    withColumn("year", date_format("dates", "yyyy").cast('int')). \
    withColumn("month", date_format("dates", "MM").cast('int')). \
    withColumn("day", date_format("dates", "dd").cast('int')). \
    drop("dates")

# Sort dataframe by date
transformed = transformed.sort(["year", "month", "day"])

# Fill na
transformed = transformed.fillna(0)

# Encode cyclical month and days
transformed = transformed.withColumn("month_sin", sin(2 * math.pi * transformed.month / 12))
transformed = transformed.withColumn("month_cos", cos(2 * math.pi * transformed.month / 12))
transformed = transformed.withColumn("day_sin", sin(2 * math.pi * transformed.day / 31))
transformed = transformed.withColumn("day_cos", cos(2 * math.pi * transformed.day / 31))
transformed = transformed.drop(*["month", "day"])

# Assemble all features into single column
assembler = VectorAssembler(inputCols=[i for i in transformed.schema.names if i != "sales"], outputCol="features")
pipeline = Pipeline(stages=[assembler])
transformed = pipeline.fit(transformed).transform(transformed)
transformed = transformed.select(["sales", "features"]).withColumnRenamed("sales", "label")

# Display final table
transformed.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(18,[0,9,12,13,14...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
|  0.0|(18,[0,2,9,12,13,...|
+-----+--------------------+
only showing top 20 rows



# Split the dataset

In [8]:
train_data = transformed.limit(int(transformed.count() * 0.8))
test_data = transformed.subtract(train_data)


def run(command):
    import os
    return os.popen("cd ..\n" + command).read()


train_data.select("features", "label").\
    coalesce(1).\
    write.\
    mode("overwrite").\
    format("json").\
    save("project/data/train")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/train/*.json > data/train.json")

test_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/test/*.json > data/test.json")

''

# First model

## Build a model

In [9]:
# Create Linear Regression Model
lr = LinearRegression()

# Fit the data to the pipeline stages
model_lr = lr.fit(train_data)

## Predict for test data

In [10]:
predictions = model_lr.transform(test_data)
predictions.show()

+-----+--------------------+------------------+
|label|            features|        prediction|
+-----+--------------------+------------------+
|  0.0|(18,[0,2,9,10,11,...|206.27213973793187|
|  0.0|(18,[0,2,9,10,11,...| 188.6857160605723|
|  0.0|(18,[0,2,9,10,11,...|-29.65082859296308|
|  0.0|(18,[0,2,9,10,11,...|238.97975115597728|
|  0.0|(18,[0,2,9,10,11,...|270.62187663342047|
|  0.0|(18,[0,2,9,10,12,...| 241.6565731994233|
|  0.0|(18,[0,2,9,12,13,...|391.06250452183303|
|  0.0|(18,[0,2,9,12,13,...|264.18518732598386|
|  0.0|(18,[0,2,9,12,13,...| 502.9187520369924|
|  0.0|(18,[0,2,9,12,13,...| 494.9844296604424|
|  0.0|(18,[0,2,9,12,13,...| 342.0023355195299|
|  0.0|(18,[0,2,9,12,13,...| 314.3912482432461|
|  0.0|(18,[0,2,9,12,13,...| 510.0474777482741|
|  0.0|(18,[0,2,9,12,13,...| 323.4067162596439|
|  0.0|(18,[0,2,9,12,13,...|275.94262579175484|
|  0.0|(18,[0,2,9,12,13,...| 263.8783702622168|
|  0.0|(18,[0,2,9,12,13,...|195.05600305839107|
|  0.0|(18,[0,2,9,12,13,...| 514.7276531

## Evaluate the model

In [11]:
# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator1_rmse.evaluate(predictions)
r2 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse))
print("R^2 on test data = {}".format(r2))

Root Mean Squared Error (RMSE) on test data = 1008.5613767098458
R^2 on test data = 0.3458129987714744


## Hyperparameter optimization

In [12]:
model_lr.params

[Param(parent='LinearRegression_60f14a9d22c3', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'),
 Param(parent='LinearRegression_60f14a9d22c3', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'),
 Param(parent='LinearRegression_60f14a9d22c3', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'),
 Param(parent='LinearRegression_60f14a9d22c3', name='featuresCol', doc='features column name.'),
 Param(parent='LinearRegression_60f14a9d22c3', name='fitIntercept', doc='whether to fit an intercept term.'),
 Param(parent='LinearRegression_60f14a9d22c3', name='labelCol', doc='label column name.'),
 Param(parent='LinearRegression_60f14a9d22c3', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.'),
 Param(parent='LinearRegression_60f14a9d22c3', name='m

In [13]:
grid = ParamGridBuilder()
grid = grid.addGrid(model_lr.aggregationDepth, [2, 3, 4])\
           .addGrid(model_lr.loss, ["squaredError", "huber"])\
           .build()

cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=grid,
                    evaluator=evaluator1_r2,
                    parallelism=5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

LinearRegressionModel: uid=LinearRegression_60f14a9d22c3, numFeatures=18

## Best model 1


In [14]:
model1 = bestModel
pprint(model1.extractParamMap())

{Param(parent='LinearRegression_60f14a9d22c3', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0).'): 1e-06,
 Param(parent='LinearRegression_60f14a9d22c3', name='solver', doc='The solver algorithm for optimization. Supported options: auto, normal, l-bfgs.'): 'auto',
 Param(parent='LinearRegression_60f14a9d22c3', name='regParam', doc='regularization parameter (>= 0).'): 0.0,
 Param(parent='LinearRegression_60f14a9d22c3', name='standardization', doc='whether to standardize the training features before fitting the model.'): True,
 Param(parent='LinearRegression_60f14a9d22c3', name='labelCol', doc='label column name.'): 'label',
 Param(parent='LinearRegression_60f14a9d22c3', name='maxIter', doc='max number of iterations (>= 0).'): 100,
 Param(parent='LinearRegression_60f14a9d22c3', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LinearRegression_60f14a9d22c3', name='predictionCol', doc='prediction column name.'): 'prediction',
 Pa

## Save the model to HDFS

In [15]:
model1.write().overwrite().save("project/models/model1")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model1 models/model1")

''

## Predict for test data using best model1

In [16]:
predictions = model1.transform(test_data)
predictions.show()

+-----+--------------------+------------------+
|label|            features|        prediction|
+-----+--------------------+------------------+
|  0.0|(18,[0,2,9,10,11,...|156.46585834069265|
|  0.0|(18,[0,2,9,10,11,...|149.06519265946554|
|  0.0|(18,[0,2,9,10,11,...|250.70277358208114|
|  0.0|(18,[0,2,9,10,11,...| 200.1891585885587|
|  0.0|(18,[0,2,9,10,11,...|-50.99772896027571|
|  0.0|(18,[0,2,9,10,11,...|-109.2437005925076|
|  0.0|(18,[0,2,9,10,11,...|317.80491369681476|
|  0.0|(18,[0,2,9,10,11,...|58.035079528650385|
|  0.0|(18,[0,2,9,10,11,...| 513.1632305907424|
|  0.0|(18,[0,2,9,10,11,...|156.57404779417084|
|  0.0|(18,[0,2,9,10,11,...|  71.5611123186718|
|  0.0|(18,[0,2,9,10,11,...| 452.1329683750846|
|  0.0|(18,[0,2,9,10,11,...|364.83214622467494|
|  0.0|(18,[0,2,9,10,11,...|184.61249398128712|
|  0.0|(18,[0,2,9,10,11,...| 437.7034495233911|
|  0.0|(18,[0,2,9,10,11,...|398.09099764444727|
|  0.0|(18,[0,2,9,10,11,...|209.71325532375886|
|  0.0|(18,[0,2,9,10,12,...| 137.3472029

In [17]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header", "true")\
    .save("project/output/model1_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model1_predictions.csv/*.csv > output/model1_predictions.csv")

''

## Evaluate the best model1

In [18]:
# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse1 = evaluator1_rmse.evaluate(predictions)
r21 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse1))
print("R^2 on test data = {}".format(r21))

Root Mean Squared Error (RMSE) on test data = 1082.8878390771276
R^2 on test data = 0.28347856206431354


# Second model

## Build a model

In [19]:
# Create Linear Regression Model
gbt = GBTRegressor(maxBins=4993, seed=42)

# Fit the data to the pipeline stages
model_gbt = gbt.fit(train_data)

## Predict for test data

In [20]:
predictions = model_gbt.transform(test_data)
predictions.show()

+-----+--------------------+-------------------+
|label|            features|         prediction|
+-----+--------------------+-------------------+
|  0.0|(18,[0,2,9,10,11,...| 3.7753098067632216|
|  0.0|(18,[0,2,9,10,11,...| 14.217631103790527|
|  0.0|(18,[0,2,9,10,11,...|-2.3880762246987284|
|  0.0|(18,[0,2,9,10,11,...| -6.128442461716215|
|  0.0|(18,[0,2,9,10,11,...|-20.553011284843407|
|  0.0|(18,[0,2,9,10,11,...|  69.96485411963971|
|  0.0|(18,[0,2,9,10,11,...|-2.3880762246987284|
|  0.0|(18,[0,2,9,10,11,...|-10.731848870517034|
|  0.0|(18,[0,2,9,10,11,...|  43.06685039576997|
|  0.0|(18,[0,2,9,10,11,...| 28.423550310690015|
|  0.0|(18,[0,2,9,10,11,...| 24.471793547965735|
|  0.0|(18,[0,2,9,10,11,...|  16.44981971070481|
|  0.0|(18,[0,2,9,10,11,...| 102.53108441971517|
|  0.0|(18,[0,2,9,10,11,...|  6.123211652466696|
|  0.0|(18,[0,2,9,10,11,...| 12.498062947980529|
|  0.0|(18,[0,2,9,10,11,...|  -6.31517886844145|
|  0.0|(18,[0,2,9,10,11,...| -46.66011499153804|
|  0.0|(18,[0,2,9,10

In [21]:
predictions = predictions.withColumn("prediction", when(predictions.prediction < 0, 0).otherwise(predictions.prediction))

## Evaluate the model

In [22]:
# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r22 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R^2 on test data = {}".format(r22))

Root Mean Squared Error (RMSE) on test data = 596.220163047231
R^2 on test data = 0.7703978943504188


## Hyperparameter optimization

In [23]:
model_gbt.params

[Param(parent='GBTRegressor_899d5f4ba5d9', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'),
 Param(parent='GBTRegressor_899d5f4ba5d9', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'),
 Param(parent='GBTRegressor_899d5f4ba5d9', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'a

In [24]:
grid = ParamGridBuilder()
grid = grid\
    .addGrid(model_gbt.maxDepth, [2, 5])\
    .addGrid(model_gbt.lossType, ['squared', 'absolute'])\
    .build()

cv = CrossValidator(estimator=gbt,
                    estimatorParamMaps=grid,
                    evaluator=evaluator2_rmse,
                    parallelism=5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

GBTRegressionModel: uid=GBTRegressor_899d5f4ba5d9, numTrees=20, numFeatures=18

## Best model 2


In [25]:
model2 = bestModel
pprint(model2.extractParamMap())

{Param(parent='GBTRegressor_899d5f4ba5d9', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10,
 Param(parent='GBTRegressor_899d5f4ba5d9', name='labelCol', doc='label column name.'): 'label',
 Param(parent='GBTRegressor_899d5f4ba5d9', name='lossType', doc='Loss function which GBT tries to minimize (case-insensitive). Supported options: squared, absolute'): 'squared',
 Param(parent='GBTRegressor_899d5f4ba5d9', name='leafCol', doc='Leaf indices column name. Predicted leaf index of each instance in each tree by preorder.'): '',
 Param(parent='GBTRegressor_899d5f4ba5d9', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest)

## Save the model to HDFS

In [26]:
model2.write().overwrite().save("project/models/model2")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model2 models/model2")

''

## Predict for test data using best model2

In [27]:
predictions = predictions.withColumn("prediction", when(predictions.prediction < 0, 0).otherwise(predictions.prediction))

In [28]:
predictions = model2.transform(test_data)
predictions.show()

+-----+--------------------+-------------------+
|label|            features|         prediction|
+-----+--------------------+-------------------+
|  0.0|(18,[0,2,9,10,11,...|  38.14515880052817|
|  0.0|(18,[0,2,9,10,11,...|  37.73350817060205|
|  0.0|(18,[0,2,9,10,11,...| -4.804385463457203|
|  0.0|(18,[0,2,9,10,11,...| 13.513694771588291|
|  0.0|(18,[0,2,9,10,11,...|  55.86531986291475|
|  0.0|(18,[0,2,9,10,11,...|  52.71693169878639|
|  0.0|(18,[0,2,9,10,11,...| -56.60343631227797|
|  0.0|(18,[0,2,9,10,11,...|-23.081423373772033|
|  0.0|(18,[0,2,9,10,11,...|  53.71329943194769|
|  0.0|(18,[0,2,9,10,11,...|  4.896379526389485|
|  0.0|(18,[0,2,9,10,11,...|  99.80818302650258|
|  0.0|(18,[0,2,9,10,11,...| 23.173417436704316|
|  0.0|(18,[0,2,9,10,11,...|  4.896379526389485|
|  0.0|(18,[0,2,9,10,11,...|-26.146546365804433|
|  0.0|(18,[0,2,9,10,11,...|-31.397062129813463|
|  0.0|(18,[0,2,9,10,11,...|-26.146546365804433|
|  0.0|(18,[0,2,9,10,11,...|  40.11414467379731|
|  0.0|(18,[0,2,9,10

In [29]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header", "true")\
    .save("project/output/model2_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model2_predictions.csv/*.csv > output/model2_predictions.csv")

''

## Evaluate the best model2

In [30]:
# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r22 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R^2 on test data = {}".format(r22))

Root Mean Squared Error (RMSE) on test data = 675.6849929246447
R^2 on test data = 0.7613964461484728


# Third model

In [31]:
# Create Linear Regression Model
rfr = RandomForestRegressor(maxBins=4993, seed=42)

# Fit the data to the pipeline stages
model_rfr = gbt.fit(train_data)

In [32]:
predictions = model_rfr.transform(test_data)
predictions.show()

+-----+--------------------+-------------------+
|label|            features|         prediction|
+-----+--------------------+-------------------+
|  0.0|(18,[0,2,9,10,11,...| 23.836159761211782|
|  0.0|(18,[0,2,9,10,11,...|  10.30450638673183|
|  0.0|(18,[0,2,9,10,11,...|  484.9025319453021|
|  0.0|(18,[0,2,9,10,11,...| 1963.3844006941072|
|  0.0|(18,[0,2,9,10,11,...|  66.43129750266876|
|  0.0|(18,[0,2,9,10,11,...|  9.513321416016899|
|  0.0|(18,[0,2,9,10,11,...|-14.162242391867778|
|  0.0|(18,[0,2,9,10,11,...|-3.9299335318062845|
|  0.0|(18,[0,2,9,10,11,...| -3.183196662098368|
|  0.0|(18,[0,2,9,10,11,...|  76.28277581146611|
|  0.0|(18,[0,2,9,10,11,...| 117.96027849990314|
|  0.0|(18,[0,2,9,10,11,...| 16.420251200464943|
|  0.0|(18,[0,2,9,10,11,...| 18.154493938185688|
|  0.0|(18,[0,2,9,10,11,...| 123.44271099478156|
|  0.0|(18,[0,2,9,10,11,...|  35.08327606301023|
|  0.0|(18,[0,2,9,10,11,...|  35.63237161413732|
|  0.0|(18,[0,2,9,10,11,...|  12.03620864513039|
|  0.0|(18,[0,2,9,10

In [33]:
predictions = predictions.withColumn("prediction", when(predictions.prediction < 0, 0).otherwise(predictions.prediction))

In [34]:
# Evaluate the performance of the model
evaluator3_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator3_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse3 = evaluator3_rmse.evaluate(predictions)
r23 = evaluator3_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse3))
print("R^2 on test data = {}".format(r23))

Root Mean Squared Error (RMSE) on test data = 546.2336557106364
R^2 on test data = 0.6526316922849664


In [35]:
model_rfr.params

[Param(parent='GBTRegressor_899d5f4ba5d9', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'),
 Param(parent='GBTRegressor_899d5f4ba5d9', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'),
 Param(parent='GBTRegressor_899d5f4ba5d9', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'a

In [36]:
grid = ParamGridBuilder()
grid = grid\
    .addGrid(model_rfr.maxDepth, [2, 5])\
    .addGrid(model_rfr.lossType, ['squared', 'absolute'])\
    .build()

cv = CrossValidator(estimator=rfr,
                    estimatorParamMaps=grid,
                    evaluator=evaluator3_rmse,
                    parallelism=5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

RandomForestRegressionModel: uid=RandomForestRegressor_909411dde03c, numTrees=20, numFeatures=18

In [37]:
model3 = bestModel
pprint(model3.extractParamMap())

{Param(parent='RandomForestRegressor_909411dde03c', name='maxMemoryInMB', doc='Maximum memory in MB allocated to histogram aggregation. If too small, then 1 node will be split per iteration, and its aggregates may exceed this size.'): 256,
 Param(parent='RandomForestRegressor_909411dde03c', name='seed', doc='random seed.'): 42,
 Param(parent='RandomForestRegressor_909411dde03c', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 1,
 Param(parent='RandomForestRegressor_909411dde03c', name='minWeightFractionPerNode', doc='Minimum fraction of the weighted sample count that each child must have after split. If a split causes the fraction of the total weight in the left or right child to be less than minWeightFractionPerNode, the split will be discarded as invalid. Should be in interval [0.0, 0.5).'): 0.0,
 P

In [38]:
model3.write().overwrite().save("project/models/model3")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model3 models/model3")

''

In [39]:
predictions = model3.transform(test_data)
predictions.show()

+-----+--------------------+------------------+
|label|            features|        prediction|
+-----+--------------------+------------------+
|  0.0|(18,[0,2,9,10,11,...| 80.42865093628491|
|  0.0|(18,[0,2,9,10,11,...| 65.19999343344023|
|  0.0|(18,[0,2,9,10,11,...| 65.86733253560615|
|  0.0|(18,[0,2,9,10,11,...| 67.43601823083797|
|  0.0|(18,[0,2,9,10,11,...| 66.46880808427643|
|  0.0|(18,[0,2,9,10,11,...| 85.09847854720779|
|  0.0|(18,[0,2,9,10,11,...|225.69454165831038|
|  0.0|(18,[0,2,9,10,11,...| 97.84538582587659|
|  0.0|(18,[0,2,9,10,11,...|242.21393954636537|
|  0.0|(18,[0,2,9,10,11,...| 59.31526951081719|
|  0.0|(18,[0,2,9,10,11,...|237.51580667753632|
|  0.0|(18,[0,2,9,10,11,...| 221.6188258043856|
|  0.0|(18,[0,2,9,10,11,...| 60.28247965737874|
|  0.0|(18,[0,2,9,10,11,...| 61.85116535261055|
|  0.0|(18,[0,2,9,10,11,...|175.58054334684405|
|  0.0|(18,[0,2,9,10,11,...|175.58054334684405|
|  0.0|(18,[0,2,9,10,11,...| 76.00756338928902|
|  0.0|(18,[0,2,9,10,12,...| 98.80185046

In [40]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header", "true")\
    .save("project/output/model3_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model3_predictions.csv/*.csv > output/model3_predictions.csv")

''

In [41]:
# Evaluate the performance of the model
evaluator3_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator3_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse3 = evaluator3_rmse.evaluate(predictions)
r23 = evaluator3_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse3))
print("R^2 on test data = {}".format(r23))

Root Mean Squared Error (RMSE) on test data = 787.2231305943947
R^2 on test data = 0.4588469834619404


# Compare best models

In [42]:
models = [[str(model1), rmse1, r21], [str(model2), rmse2, r22], [str(model3), rmse3, r23]]

df = spark.createDataFrame(models, ["model", "RMSE", "R2"])
df.show(truncate=False)

+------------------------------------------------------------------------------------------------+------------------+-------------------+
|model                                                                                           |RMSE              |R2                 |
+------------------------------------------------------------------------------------------------+------------------+-------------------+
|LinearRegressionModel: uid=LinearRegression_60f14a9d22c3, numFeatures=18                        |1082.8878390771276|0.28347856206431354|
|GBTRegressionModel: uid=GBTRegressor_899d5f4ba5d9, numTrees=20, numFeatures=18                  |675.6849929246447 |0.7613964461484728 |
|RandomForestRegressionModel: uid=RandomForestRegressor_909411dde03c, numTrees=20, numFeatures=18|787.2231305943947 |0.4588469834619404 |
+------------------------------------------------------------------------------------------------+------------------+-------------------+



In [43]:
df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header", "true")\
    .save("project/output/evaluation.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/evaluation.csv/*.csv > output/evaluation.csv")

''