Let's load the SF Airbnb dataset and look at its schema

In [0]:
filePath = "/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb.csv"

rawDF = spark.read.csv(filePath, header="true", inferSchema="true", multiLine="true", escape='"')

display(rawDF)

In [0]:
rawDF.printSchema()

For the sake of simplicity, we'll only keep certain columns from this dataset

In [0]:
columnsToKeep = [
  "host_is_superhost",
  "cancellation_policy",
  "instant_bookable",
  "host_total_listings_count",
  "neighbourhood_cleansed",
  "latitude",
  "longitude",
  "property_type",
  "room_type",
  "accommodates",
  "bathrooms",
  "bedrooms",
  "beds",
  "bed_type",
  "minimum_nights",
  "number_of_reviews",
  "review_scores_rating",
  "review_scores_accuracy",
  "review_scores_cleanliness",
  "review_scores_checkin",
  "review_scores_communication",
  "review_scores_location",
  "review_scores_value",
  "price"]

baseDF = rawDF.select(columnsToKeep)
baseDF.cache().count()
display(baseDF)

## Fixing Data Types

Take a look at the schema above. You'll notice that the `price` field got picked up as string. For our task, we need it to be a numeric (double type) field. 

Let's fix that.

In [0]:
from pyspark.sql.functions import col, translate

fixedPriceDF = baseDF.withColumn("price", translate(col("price"), "$,", "").cast("double"))

display(fixedPriceDF)

## Data Exploration

Padas is a handy tool when it comes to data exploration before actual modeling starts. There are a few caveats whe using pandas:

* If dataset size is too big to fit in memory of a single machine
* If any data cleanup and transfomation code has to be translate into Spark for prod runs

## Koalas
Let's use Koalas to perform Pandas style data exploration operations but in a distributed environment.

In [0]:
import databricks.koalas as ks
# Before we start we need to convert Spark dataframe into Koalas dataframe
# https://koalas.readthedocs.io/en/latest/reference/api/databricks.koalas.DataFrame.to_koalas.html?highlight=to_koalas
#
# ksDF = <Your code goes here>

# Let's try to understand our data by displaying some stats. You'd normally do that using https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.describe.html in pandas

# FIXME
# <Your answer goes here>

## Dealing with missing values

If we look at the count row in the output above, we'll notice that all counts vary. Describe method takes into account only numeric types and not null values. It means we have nulls in the data frame. There are a lot of different ways to handle null values. Sometimes, null can actually be a key indicator of the thing you are trying to predict (e.g. if you don't fill in certain portions of a form, probability of it getting approved decreases).

Some ways to handle nulls:
* Drop any records that contain nulls
* Numeric:
  * Replace them with mean/median/zero/etc.
* Categorical:
  * Replace them with the mode
  * Create a special category for null
* Use techniques like ALS which are designed to impute missing values
  
**If you do ANY imputation techniques for categorical/numerical features, you MUST include an additional field specifying that field was imputed (think about why this is necessary)**

In [0]:
# Let's determine if we have empty values in any of dataframe column. In Pandas you'd probably use the following two methods for this task
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.isna.html
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.any.html

# FIXME
# ksDF = <Your answer here>

## Impute: Cast to Double

If pure Spark had been used for data imputation, SparkML's `Imputer` woud've required all fields be of type double [Python](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.Imputer)/[Scala](https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.ml.feature.Imputer). As an exercise let's cast all integer fields to double.

In [0]:
# In Spark it this operation would've looked like this
#
# from pyspark.sql.functions import col
# from pyspark.sql.types import IntegerType
#
# integerColumns = [x.name for x in baseDF.schema.fields if x.dataType == IntegerType()]
#
# for c in integerColumns:
#   doublesDF = doublesDF.withColumn(c, col(c).cast("double"))

imputeCols = ksDF.columns[ksDF.isna().any().to_pandas()].to_list()
conversionDict = dict(zip(imputeCols, ["double"] * len(imputeCols)))

# API reference
# https://pandas.pydata.org/pandas-docs/dev/reference/api/pandas.DataFrame.astype.html

# FIXME
# doublesDF = <Your answer here>

# Print types in the DF
display(doublesDF.dtypes)

Let's add column with dummy variable for each column where we impute any value. This will help us to trace which values were imputed for further investigations. I.e.

| bedrooms | bedrooms_na |
|:--------:|:-----------:|
|    3     |     0       |
|   null   |     1       |

In [0]:
# In Spark that would've been looked like this

# from pyspark.sql.functions import when

# imputeCols = [
#   "bedrooms",
#   "bathrooms",
#   "beds", 
#   "review_scores_rating",
#   "review_scores_accuracy",
#   "review_scores_cleanliness",
#   "review_scores_checkin",
#   "review_scores_communication",
#   "review_scores_location",
#   "review_scores_value"
# ]

# for c in imputeCols:
#   doublesDF = doublesDF.withColumn(c + "_na", when(col(c).isNull(), 1.0).otherwise(0.0))

# API Reference
#
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.isnull.html
# https://pandas.pydata.org/pandas-docs/dev/reference/api/pandas.DataFrame.astype.html

for c in imputeCols:
  # FIXME
  #doublesDF[c + "_na"] = <Your answer here>
  
display(doublesDF)

In [0]:
# Spark version of data imputation
# 
# from pyspark.ml.feature import Imputer
# imputer = Imputer(strategy="median", inputCols=imputeCols, outputCols=imputeCols)
# imputedDF = imputer.fit(doublesDF).transform(doublesDF)

# API References 
# https://pandas.pydata.org/pandas-docs/dev/reference/api/pandas.DataFrame.fillna.html
# https://pandas.pydata.org/pandas-docs/dev/reference/api/pandas.DataFrame.median.html

#FIXME
# imputedDF = doublesDF.<Your answer here>

display(imputedDF)

#### Getting rid of extreme values

Let's take a look at the *min* and *max* values of the `price` column:

In [0]:
# API reference
#
# https://pandas.pydata.org/pandas-docs/dev/reference/api/pandas.DataFrame.describe.html

# FIXME
#display(<Your answer here>)

There are some super-expensive listings. We'll assume that those are legit. We can certainly filter the "free" Airbnbs though.

Let's see first how many listings we can find where the *price* is zero.

In [0]:
# Spark code
#
# imputedDF.filter(col("price") == 0).count()

# API reference 
#
# https://pandas.pydata.org/pandas-docs/dev/reference/api/pandas.DataFrame.count.html

# FIXME
# print(<Your answer here>)

Now only keep rows with a strictly positive *price*.

In [0]:
# Spark code for reference
#
# posPricesDF = imputedDF.filter(col("price") > 0)

# API reference
#
# https://pandas.pydata.org/pandas-docs/dev/user_guide/10min.html#boolean-indexing

# FIXME
# posPricesDF = <Your answer here>

Let's take a look at the *min* and *max* values of the *minimum_nights* column:

In [0]:
display(posPricesDF["minimum_nights"].describe())

Let's see what how many unique values do we have in 'minimum_nights' column

In [0]:
# Spark code
#
#display(posPricesDF
# .groupBy("minimum_nights").count()
# .orderBy(col("count").desc(), col("minimum_nights"))
#)

# API reference
#
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.value_counts.html

# FIXME
#display(<Your answer here>)

A minimum stay of one year seems to be a reasonable limit here. Let's filter out those records where the *minimum_nights* is greater then 365:

In [0]:
# Spark code
# cleanDF = posPricesDF.filter(col("minimum_nights") <= 365)

# API reference
#
# https://pandas.pydata.org/pandas-docs/dev/user_guide/10min.html#boolean-indexing

# FIXME
# cleanDF = <Your answer here>
display(cleanDF)

OK, our data is cleansed now. Let's save this DataFrame to a file so that we can start building models with it.

In [0]:
outputPath = "/tmp/datapalooza-2021/sf-airbnb/sf-airbnb-clean.parquet"

cleanDF.to_spark().write.mode("overwrite").parquet(outputPath)

# Modeling

Key advantages of ML on Spark:
- No need to downsize training dataset
- Distributed nature of Spark offers performance improvements
- Same code for data engineering in research and prod application

Important MLlib concepts:
- Transformer. Applies rule-based transformations to either prepare data for model training or generate predictions using a trained MLlib model.
- Estimator. Learns (aka “fits”) parameters from input DataFrame via a .fit() method and returns a Model, which is a transformer.
- Pipeline. Organizes a series of transformers and estimators into a single model.

## Training and test data sets

Let's use dataset we created during the previous step to derive our training and test data sets. We'll use 80/20 as train/test split and 42 as seed value.

In [0]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

airbnbDF = spark.read.parquet(outputPath)
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)

categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]

stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")

numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]

assemblerInputs = indexOutputCols + numericCols

vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

## Random Forest

In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

# Please use the following params for RandomForestRegressor: maxDepth=5, numTrees=20, maxBins=40, seed=42

# API reference
#
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.RandomForestRegressor.html

# FIXME
# rf = <Your answer here>

# And let's create a pipeline to train the model
# API reference
#
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.Pipeline.html#pyspark.ml.Pipeline

# FIXME
# pipeline = <Your answer here>

# and train the model 
# API reference
#
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.Pipeline.html#pyspark.ml.Pipeline.fit

# FIXME
# rfModel = <Your answer here>

Let's see how is model's performance

In [0]:
# Let's use our test data to generate some predictions
# API reference
#
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.PipelineModel.html#pyspark.ml.PipelineModel.transform

# FIXME
# predictionDF = rfModel.<Your answer here>
display(predictionDF)

In [0]:
# Let's calculate RMSE for prediction results
#
# API reference 
#
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.RegressionEvaluator.html#pyspark.ml.evaluation.RegressionEvaluator
#

evaluator = RegressionEvaluator(labelCol="price", 
                                predictionCol="prediction", 
                                metricName="rmse")

accuracy = evaluator.evaluate(predictionDF)

display(accuracy)

## Grid Search

There are a lot of hyperparameters we could tune, and it would take a long time to manually configure.

Let's use Spark's `ParamGridBuilder` to find the optimal hyperparameters in a more systematic approach [Python](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.ParamGridBuilder)/[Scala](https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.ml.tuning.ParamGridBuilder).

Let's define a grid of hyperparameters to test:
  - maxDepth: max depth of the decision tree (Use the values `2, 4, 6`)
  - numTrees: number of decision trees (Use the values `10, 100`)

In [0]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [2, 4, 6])
            .addGrid(rf.numTrees, [10, 100])
            .build())

## Cross Validation

We are also going to use 3-fold cross validation to identify the optimal maxDepth.

![crossValidation](https://files.training.databricks.com/images/301/CrossValidation.png)

With 3-fold cross-validation, we train on 2/3 of the data, and evaluate with the remaining (held-out) 1/3. We repeat this process 3 times, so each fold gets the chance to act as the validation set. We then average the results of the three rounds.

We pass in the `estimator` (pipeline), `evaluator`, and `estimatorParamMaps` to `CrossValidator` so that it knows:
- Which model to use
- How to evaluate the model
- What hyperparameters to set for the model

We can also set the number of folds we want to split our data into (3), as well as setting a seed so we all have the same split in the data [Python](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator)/[Scala](https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.ml.tuning.CrossValidator).

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator

evaluator = RegressionEvaluator(labelCol="price", 
                                predictionCol="prediction", 
                                metricName="rmse")

cv = CrossValidator(estimator=pipeline, 
                    evaluator=evaluator, 
                    estimatorParamMaps=paramGrid, 
                    numFolds=3, 
                    seed=42)

**Question**: How many models are we training right now?

In [0]:
cvModel = cv.fit(trainDF)

Let's take a look at the model with the best hyperparameter configuration

In [0]:
print(cvModel.bestModel.stages[2].getMaxDepth())
print(cvModel.bestModel.stages[2].getNumTrees)

In [0]:
print(cvModel.avgMetrics)

Let's see how it does on the test dataset.

In [0]:
predDF = cvModel.transform(testDF)

regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")

rmse = regressionEvaluator.evaluate(predDF)
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"RMSE is {rmse}")
print(f"R2 is {r2}")


# Tracking Models with MLflow

MLflow is pre-installed on the Databricks Runtime for ML. If you are not using the ML Runtime, you will need to install mlflow. MLflow has four main components:

- Tracking. Provides APIs to record parameters, metrics, code versions, models, and artifacts such as plots, and text.
- Projects. A standardized format to package your data science projects and their dependencies to run on other platforms. It helps you manage the model training process.
- Models. A standardized format to package models to deploy to diverse execution environments. It provides a consistent API for loading and applying models, regardless of the algorithm or library used to build the model.
- Registry. A repository to keep track of model lineage, model versions, stage transitions, and annotations.

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

filePath = "dbfs:/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet"
airbnbDF = spark.read.parquet(filePath)
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)

categoricalCols = [field for (field, dataType) in trainDF.dtypes 
                   if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols=categoricalCols, 
                              outputCols=indexOutputCols, 
                              handleInvalid="skip")

numericCols = [field for (field, dataType) in trainDF.dtypes 
               if ((dataType == "double") & (field != "price"))]
assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, 
                               outputCol="features")

rf = RandomForestRegressor(labelCol="price", maxBins=40, maxDepth=5, 
                           numTrees=100, seed=42)

pipeline = Pipeline(stages=[stringIndexer, vecAssembler, rf])

## MLflow

Before you deploy your machine learning model, you should ensure that you can reproduce and track the model’s performance. For us, end-to-end reproducibility of machine learning solutions means that we need to be able to reproduce the code that generated a model, the environment used in training, the data it was trained on, and the model itself. Every data scientist loves to remind you to set your seeds so you can reproduce your experiments (e.g., for the train/test split, when using models with inherent randomness such as random forests).

In [0]:
import mlflow
import mlflow.spark
import pandas as pd

with mlflow.start_run(run_name="random-forest") as run:
  # Log params: Num Trees and Max Depth
  mlflow.log_param("num_trees", rf.getNumTrees())
  mlflow.log_param("max_depth", rf.getMaxDepth())
 
  # Log model
  pipelineModel = pipeline.fit(trainDF)
  mlflow.spark.log_model(pipelineModel, "model")

  # Log metrics: RMSE and R2
  predDF = pipelineModel.transform(testDF)
  regressionEvaluator = RegressionEvaluator(predictionCol="prediction", 
                                            labelCol="price")
  rmse = regressionEvaluator.setMetricName("rmse").evaluate(predDF)
  r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
  mlflow.log_metrics({"rmse": rmse, "r2": r2})

  # Log artifact: Feature Importance Scores
  rfModel = pipelineModel.stages[-1]
  pandasDF = (pd.DataFrame(list(zip(vecAssembler.getInputCols(), 
                                    rfModel.featureImportances)), 
                          columns=["feature", "importance"])
              .sort_values(by="importance", ascending=False))
  # First write to local filesystem, then tell MLflow where to find that file
  pandasDF.to_csv("feature-importance.csv", index=False)
  mlflow.log_artifact("feature-importance.csv")

## MLflowClient

In [0]:
from mlflow.tracking import MlflowClient

client = MlflowClient()
runs = client.search_runs(run.info.experiment_id,
                          order_by=["attributes.start_time desc"], 
                          max_results=1)
run_id = runs[0].info.run_id
runs[0].data.metrics

%md ## Generate Batch Predictions

Let's load the model back in to generate batch predictions

In [0]:
# Load saved model with MLflow
pipelineModel = mlflow.spark.load_model(f"runs:/{run_id}/model")

# Generate Predictions
inputPath = "dbfs:/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet"
inputDF = spark.read.parquet(inputPath)
predDF = pipelineModel.transform(inputDF)
display(predDF)