# Connect to Hive

In [1]:
from pyspark.sql import SparkSession

# Add here your team number teamx
team = "team25"

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

sc = spark.sparkContext

In [2]:
spark

# list Hive databases

In [3]:
print(spark.catalog.listDatabases())
spark.sql("SHOW DATABASES;").show()

[Database(name='default', description='Default Hive database', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/apps/hive/warehouse'), Database(name='root_db', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/root/root_db'), Database(name='team0_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team0/project/hive/warehouse'), Database(name='team12_hive_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team12/project/hive/warehouse'), Database(name='team13_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team13/project/hive/warehouse'), Database(name='team14_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team14/project/hive/warehouse'), Database(name='team15_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team15/project/hive/warehouse'), Database(name='team16_projectdb', description

# Specify the input and output features

In [4]:
numerical_columns = ["travelDuration", "isBasicEconomy", "isRefundable",
                     "isNonStop", "seatsRemaining", "totalTravelDistance", "segmentsDurationInSeconds", "day_sin",
                     "day_cos", "month_sin", "month_cos"]
categorical_columns = ["startingAirport", "destinationAirport", "segmentsCabinCode", "segmentsAirlineCode"]

features = numerical_columns + categorical_columns

# The output/target of our model
label = "baseFare"

# Read hive tables

In [5]:
data = spark.read.format("avro").table('team25_projectdb.flights_opt')


In [6]:
num_rows = data.count()
num_columns = len(data.columns)
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_columns}")
data.printSchema()
data.show(10)


Number of rows: 4106938
Number of columns: 27
root
 |-- legid: string (nullable = true)
 |-- searchdate: date (nullable = true)
 |-- flightdate: date (nullable = true)
 |-- destinationairport: string (nullable = true)
 |-- farebasiscode: string (nullable = true)
 |-- travelduration: string (nullable = true)
 |-- elapseddays: integer (nullable = true)
 |-- isbasiceconomy: boolean (nullable = true)
 |-- isrefundable: boolean (nullable = true)
 |-- isnonstop: boolean (nullable = true)
 |-- basefare: decimal(10,2) (nullable = true)
 |-- totalfare: decimal(10,2) (nullable = true)
 |-- seatsremaining: integer (nullable = true)
 |-- totaltraveldistance: string (nullable = true)
 |-- segmentsdeparturetimeepochseconds: string (nullable = true)
 |-- segmentsdeparturetimeraw: string (nullable = true)
 |-- segmentsarrivaltimeepochseconds: string (nullable = true)
 |-- segmentsarrivaltimeraw: string (nullable = true)
 |-- segmentsarrivalairportcode: string (nullable = true)
 |-- segmentsdepartureai

# Preprocessing the data

## Feature Selection

In [7]:
from pyspark.sql.functions import udf, col, count, when, dayofmonth, month, sin, cos, lit, regexp_extract
from pyspark.sql.types import StringType, IntegerType, FloatType
import pyspark.sql.functions as F
from collections import Counter
import re
from math import pi


# Define user-defined functions for transformations
def find_most_frequent(codes):
    if codes is None:
        return None
    codes_list = codes.split('||')
    most_common = Counter(codes_list).most_common(1)
    return most_common[0][0]

def sum_list_val(codes):
    if codes is None:
        return None
    codes_list = str(codes).split('||')
    return sum(float(val) for val in codes_list if val != "None")

def time_parse(duration):
    total_minutes = 0
    pattern = r"P(?:(\d+)D)?T?(?:(\d+)H)?(?:(\d+)M)?"
    match = re.match(pattern, duration)
    if match:
        days, hours, minutes = match.groups()
        total_minutes += int(days) * 1440 if days else 0
        total_minutes += int(hours) * 60 if hours else 0
        total_minutes += int(minutes) if minutes else 0
    return total_minutes * 60

# Register UDFs
find_most_frequent_udf = udf(find_most_frequent, StringType())
sum_list_val_udf = udf(sum_list_val, FloatType())
time_parse_udf = udf(time_parse, IntegerType())




In [8]:
# Data Cleaning: Drop unnecessary columns
#data = data.drop(*columns_to_drop)

# Feature Engineering: Apply UDFs, handle nulls, extract date features, and create cyclic features
data = data.withColumn("segmentsCabinCode", find_most_frequent_udf(col("segmentsCabinCode")))
data = data.withColumn("segmentsAirlineCode", find_most_frequent_udf(col("segmentsAirlineCode")))
data = data.withColumn("travelDuration", time_parse_udf(col("travelDuration")))
data = data.withColumn("segmentsDurationInSeconds", sum_list_val_udf(col("segmentsDurationInSeconds")))
data = data.withColumn("totalTravelDistance", sum_list_val_udf(col("segmentsdistance")))

for categorical_col in categorical_columns:
    data = data.withColumn(categorical_col, F.when(col(categorical_col).isNull(), "Unknown").otherwise(col(categorical_col)))

data = data.withColumn("flightDate", col("flightDate").cast("date"))
data = data.withColumn("month", month("flightDate"))
data = data.withColumn("day", dayofmonth("flightDate"))
data = data.withColumn("day_sin", sin((2 * pi * col("day") / 31)))
data = data.withColumn("day_cos", cos((2 * pi * col("day") / 31)))
data = data.withColumn("month_sin", sin(2 * pi * (col("month") - 1) / 12))
data = data.withColumn("month_cos", cos(2 * pi * (col("month") - 1) / 12))

data = data.withColumn("label", col("baseFare"))

In [9]:
columns_to_cast = ["isBasicEconomy", "isRefundable", "isNonStop"]

# Cast each column to float
for column in columns_to_cast:
    data = data.withColumn(column, when(data[column] == True, 1).otherwise(0))

# Cast string to float where necessary
for column in numerical_columns + ["baseFare"]:
    data = data.withColumn(column, col(column).cast("float"))

#data = data.drop("flightDate", "day", "month")
data.show()

+--------------------+----------+----------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+---------------+-----+---+--------------+-----------+-------------+----------+------+
|               legid|searchdate|flightDate|destinationAirport|farebasiscode|travelDuration|elapseddays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalfare|seatsRemaining|totalTravelDistance|segmentsdeparturetimeepochseconds|segmentsdeparturetimeraw|segmentsarrivaltimeepochseconds|segmentsarrivaltimeraw|segmentsarrivalairportcode|segmentsdepartureairportcode| segmentsairlinename|segmentsAirlineCode|segmentseq

## Building Pipline

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, Imputer

# Set up the ML pipeline
indexers = [StringIndexer(inputCol=c,
                          outputCol="{0}_indexed".format(c), handleInvalid='keep') for c in categorical_columns]

cat_imputers = [Imputer(inputCol=indexer.getOutputCol(),
                        outputCol="{0}_imputed".format(indexer.getOutputCol()), strategy="mode") for indexer in indexers]

encoders = [OneHotEncoder(inputCol=imputer.getOutputCol(),
                          outputCol="{0}_encoded".format(imputer.getInputCol())) for imputer in cat_imputers]

num_imputer = Imputer(inputCols=numerical_columns,
                      outputCols=["{0}_imputed".format(c) for c in numerical_columns], strategy="mean")

assemblerInputs = [encoder.getOutputCol() for encoder in encoders] + ["{0}_imputed".format(c) for c in numerical_columns]

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="nonscaledfeatures")

scaler = StandardScaler(inputCol="nonscaledfeatures", outputCol="features")

pipeline = Pipeline(stages=indexers + cat_imputers + encoders + [num_imputer, assembler, scaler])

model = pipeline.fit(data)

transformed_data = model.transform(data)

transformed_data.show()

+--------------------+----------+----------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+---------------+-----+---+-----------+-----------+-------------+----------+------+-----------------------+--------------------------+-------------------------+---------------------------+-------------------------------+----------------------------------+---------------------------------+-----------------------------------+-------------------------------+----------------------------------+---------------------------------+-----------------------------------+----------------------+-------------------

In [11]:
# Show null values
transformed_data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+-----+----------+----------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+-------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+---------------+-----+---+-------+-------+---------+---------+-----+
|legid|searchdate|flightDate|destinationAirport|farebasiscode|travelDuration|elapseddays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalfare|seatsRemaining|totalTravelDistance|segmentsdeparturetimeepochseconds|segmentsdeparturetimeraw|segmentsarrivaltimeepochseconds|segmentsarrivaltimeraw|segmentsarrivalairportcode|segmentsdepartureairportcode|segmentsairlinename|segmentsAirlineCode|segmentsequipmentdescription|segmentsDurationInSeconds|segm

# Split the dataset

In [12]:
#  split the data into 60% training and 40% test (it is not stratified)
(train_data, test_data) = transformed_data.randomSplit([0.7, 0.3], seed = 10)

def run(command):
    import os
    return os.popen(command).read()

train_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/train")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/train/*.json > data/train.json")

test_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/test/*.json > data/test.json")

''

# First model

## Build a model

In [13]:
from pyspark.ml.regression import GBTRegressor

# Train a GBT model.
gbt = GBTRegressor(featuresCol="features", maxIter=10)


# Fit the data to the pipeline stages
model_gbt = gbt.fit(train_data)

## Predict for test data

In [14]:
predictions = model_gbt.transform(test_data)
predictions.show()

+--------------------+----------+----------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+---------------+-----+---+-----------+-----------+-------------+------------+------+-----------------------+--------------------------+-------------------------+---------------------------+-------------------------------+----------------------------------+---------------------------------+-----------------------------------+-------------------------------+----------------------------------+---------------------------------+-----------------------------------+----------------------+-----------------

## Evaluate the model

In [15]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator1_rmse.evaluate(predictions)
r2 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse))
print("R^2 on test data = {}".format(r2))

Root Mean Squared Error (RMSE) on test data = 117.93243361017127
R^2 on test data = 0.5843321639400942


## Hyperparameter optimization

In [16]:
model_gbt.params

[Param(parent='GBTRegressor_a44f392bfe19', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'),
 Param(parent='GBTRegressor_a44f392bfe19', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'),
 Param(parent='GBTRegressor_a44f392bfe19', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'a

In [17]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 

import numpy as np


grid = ParamGridBuilder()
grid = grid.addGrid(
                    model_gbt.maxDepth, [3, 4, 5])\
                    .addGrid(model_gbt.lossType , ["squared", "absolute"]
                    )\
                    .build()

cv = CrossValidator(estimator = gbt, 
                    estimatorParamMaps = grid, 
                    evaluator = evaluator1_rmse,
                    parallelism = 5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

GBTRegressionModel: uid=GBTRegressor_a44f392bfe19, numTrees=10, numFeatures=57

## Best model 1


In [18]:
from pprint import pprint
model1 = bestModel
pprint(model1.extractParamMap())

{Param(parent='GBTRegressor_a44f392bfe19', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: variance'): 'variance',
 Param(parent='GBTRegressor_a44f392bfe19', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10,
 Param(parent='GBTRegressor_a44f392bfe19', name='labelCol', doc='label column name.'): 'label',
 Param(parent='GBTRegressor_a44f392bfe19', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='GBTRegressor_a44f392bfe19', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpoin

## Save the model to HDFS

In [19]:
model1.write().overwrite().save("project/models/model1")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model1 models/model1")

''

## Predict for test data using best model1

In [20]:
predictions = model1.transform(test_data)
predictions.show()

+--------------------+----------+----------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+---------------+-----+---+-----------+-----------+----------+--------------+------+-----------------------+--------------------------+-------------------------+---------------------------+-------------------------------+----------------------------------+---------------------------------+-----------------------------------+-------------------------------+----------------------------------+---------------------------------+-----------------------------------+----------------------+------------------

In [21]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model1_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model1_predictions.csv/*.csv > output/model1_predictions.csv")

''

## Evaluate the best model1

In [22]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse1 = evaluator1_rmse.evaluate(predictions)
r21 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse1))
print("R^2 on test data = {}".format(r21))

Root Mean Squared Error (RMSE) on test data = 119.95159936371351
R^2 on test data = 0.5717973116498679


# Second model

## Build a model

In [23]:
from pyspark.ml.regression import RandomForestRegressor

# Create Linear Regression Model
rf = RandomForestRegressor()

# Fit the data to the pipeline stages
model_rf = rf.fit(train_data)

## Predict for test data

In [24]:
predictions = model_rf.transform(test_data)
predictions.show()

+--------------------+----------+----------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+---------------+-----+---+-----------+-----------+-------------+--------------+-------+-----------------------+--------------------------+-------------------------+---------------------------+-------------------------------+----------------------------------+---------------------------------+-----------------------------------+-------------------------------+----------------------------------+---------------------------------+-----------------------------------+----------------------+--------------

## Evaluate the model

In [25]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r22 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R^2 on test data = {}".format(r22))

Root Mean Squared Error (RMSE) on test data = 130.1153372693691
R^2 on test data = 0.4943987826924128


## Hyperparameter optimization

In [26]:
model_rf.params

[Param(parent='RandomForestRegressor_c11ddb253bc9', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'),
 Param(parent='RandomForestRegressor_c11ddb253bc9', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'),
 Param(parent='RandomForestRegressor_c11ddb253bc9', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'),
 Param(parent='RandomForestRegressor_c11ddb253bc9', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto'

In [27]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 

import numpy as np


grid = ParamGridBuilder()
grid = grid.addGrid(model_rf.subsamplingRate, [0.3, 0.6, 1.0]).addGrid(model_rf.numTrees, [30, 40, 50]).build()

cv = CrossValidator(estimator = rf, 
                    estimatorParamMaps = grid, 
                    evaluator = evaluator2_rmse,
                    parallelism = 5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

RandomForestRegressionModel: uid=RandomForestRegressor_c11ddb253bc9, numTrees=20, numFeatures=57

## Best model 2


In [28]:
from pprint import pprint
model2 = bestModel
pprint(model2.extractParamMap())

{Param(parent='RandomForestRegressor_c11ddb253bc9', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10,
 Param(parent='RandomForestRegressor_c11ddb253bc9', name='leafCol', doc='Leaf indices column name. Predicted leaf index of each instance in each tree by preorder.'): '',
 Param(parent='RandomForestRegressor_c11ddb253bc9', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False,
 Param(parent='RandomForestRegressor_c11ddb253bc9', name='maxMemoryInMB', doc='Maximum memory in MB allocated to histogram aggrega

## Save the model to HDFS

In [29]:
model2.write().overwrite().save("project/models/model2")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model2 models/model2")

''

## Predict for test data using best model2

In [30]:
predictions = model2.transform(test_data)
predictions.show()

+--------------------+----------+----------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+---------------+-----+---+-----------+-----------+-------------+------------+------+-----------------------+--------------------------+-------------------------+---------------------------+-------------------------------+----------------------------------+---------------------------------+-----------------------------------+-------------------------------+----------------------------------+---------------------------------+-----------------------------------+----------------------+-----------------

In [31]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model2_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model2_predictions.csv/*.csv > output/model2_predictions.csv")

''

## Evaluate the best model2

In [32]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r22 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R^2 on test data = {}".format(r22))

Root Mean Squared Error (RMSE) on test data = 129.0482092550575
R^2 on test data = 0.5007361130917505


# Compare best models

In [33]:
models = [[str(model1),rmse1, r21], [str(model2),rmse2, r22]]

df = spark.createDataFrame(models, ["model", "RMSE", "R2"])
df.show(truncate=False)

+------------------------------------------------------------------------------------------------+------------------+------------------+
|model                                                                                           |RMSE              |R2                |
+------------------------------------------------------------------------------------------------+------------------+------------------+
|GBTRegressionModel: uid=GBTRegressor_a44f392bfe19, numTrees=10, numFeatures=57                  |119.95159936371351|0.5717973116498679|
|RandomForestRegressionModel: uid=RandomForestRegressor_c11ddb253bc9, numTrees=20, numFeatures=57|129.0482092550575 |0.5007361130917505|
+------------------------------------------------------------------------------------------------+------------------+------------------+



In [None]:
df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/evaluation.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/evaluation.csv/*.csv > output/evaluation.csv")

''

In [None]:
import json
with open('./data/model1ParamMap.json', 'w') as json_file:
    json.dump(model1.extractParamMap(), json_file)


In [None]:
with open('./data/model2ParamMap.json', 'w') as json_file:
    json.dump(model2.extractParamMap(), json_file)
