# AirBnb NY Listing Price Prediction: Spark Model Tuning

Creating distributed RandomForest model tuning from the `spark.ml` library.

In [0]:
import os
import mlflow
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [0]:
### Define Global Variables
catalog_ = os.getenv('CATALOG_NAME')
schema_ = os.getenv('SCHEMA_NAME')
spark.sql("USE CATALOG "+catalog_)
spark.sql("USE SCHEMA "+schema_)

SEED_ = 111
Target_Var_ = 'price_log'
experiment_name_ = 'Airbnb_NY_Tuning'

### Read Gold Table

In [0]:
# Import gold data
gold_data = spark.sql("SELECT * from airbnb_ny_gold_data")
display(gold_data.take(3))

In [0]:
# Double-check Presence of NAs values
# display(dbutils.data.summarize(gold_data))

In [0]:
# Double-check DataTypes - pyspark.ml.regression.RandomForestRegressor supports both Float and Double
for field in gold_data.schema.fields:
    print(f"Column '{field.name}' has data type: {field.dataType}")

### Split into Train & Test

In [0]:
### Split
train_df, test_df = gold_data.randomSplit([.85, .15], seed = SEED_)

### Configure Hyperparameter Tuning with RandomForestRegressor

In [0]:
### Create Features
feature_cols = [field for (field, dataType) in train_df.dtypes if ((dataType == "float") & (field != "price") & (field != "id") & (field != "price_log"))]
vec_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

### Create Base Model
RF_ = RandomForestRegressor(labelCol = Target_Var_, seed = SEED_)

In [0]:
### Create the Parameter Space
paramGrid_ = (ParamGridBuilder()
		.addGrid(RF_.maxDepth, [2, 5, 10, 20]) # max 30
		.addGrid(RF_.numTrees, [10, 30, 100, 200, 300])
    .addGrid(RF_.featureSubsetStrategy, ['onethird', 'sqrt', 'log2'])
		.build())

### Create the Evaluator
evaluator_ = RegressionEvaluator(
    labelCol = Target_Var_,
		predictionCol = "prediction", # predicted price, as calculated by the model
		metricName = "r2")

### Create the CV object
cv_ = CrossValidator(estimator = RF_,
			estimatorParamMaps = paramGrid_,
			evaluator = evaluator_,
			numFolds = 3,
			parallelism = 10, # nr of models to train in parallel, do not put all available cores, but set it up to 10 (https://spark.apache.org/docs/latest/ml-tuning.html)
			seed = SEED_)

#### Create the Pipeline
Pipeline_ = Pipeline(stages = [vec_assembler, cv_])

### Launch the pipeline in an mlflow run

In [0]:
### Set up the MLFlow Experiment
experiment_path = f'/Users/gabriele.albini@databricks.com/{experiment_name_}'
experiment = mlflow.get_experiment_by_name(experiment_path)

if experiment is not None:
    experiment_id = experiment.experiment_id
else:
    experiment_id = mlflow.create_experiment(name=experiment_path)

print(experiment_id)

In [0]:
### Launch the run
mlflow.autolog(disable=True)
with mlflow.start_run(experiment_id=experiment_id, run_name="RF_Spark_cv"):

    # Fit the Pipeline object
    Pipeline_fitted = Pipeline_.fit(train_df)

    # Extract cv step from pipeline
    cv_fitted = Pipeline_fitted.stages[-1]

    # Start nested runs to ;og params and metric for each model
    for i in range(len(cv_fitted.getEstimatorParamMaps())):

      with mlflow.start_run(experiment_id=experiment_id, run_name="RF_model_"+str(i+1), nested=True):

        ## Log Params
        params_ = cv_fitted.getEstimatorParamMaps()[i]
        for p in params_:
          p_pretty = str(p).split('__') # Extract parameter name from a string like 'RandomForestRegressor_ef34cc9d8f09__maxDepth'
          mlflow.log_param(str(p_pretty[-1]), params_[p])

        ## Log Metric as avg across folds
        avg_metric = cv_fitted.avgMetrics[i]
        mlflow.log_metric("Obj Metric", avg_metric)