# Machine Learning with Spark MLlib

In this lab we will analyze a San Francisco AirBnB data. The dataset was cleaned in advance, so we can use it directly.

In [0]:
# Open file (if it doesn't exist, upload the data yourself and change the location and the name)
filePath = "dbfs:/mnt/training/airbnb/sf-listings/sf-listings-2019-03-06-clean.parquet/"
airbnbDF = spark.read.parquet(filePath)

In [0]:
# Look at the data
display(airbnbDF)

In [0]:
# Count the rows in the dataset
print(airbnbDF.count())

## Demo: Linear Regression

In [0]:
# Split dataset to train and test
trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=123)
print(trainDF.cache().count())

In [0]:
# Check out the trainDF
# It contains many columns, but for the baseline model, we will only use column accommodates to predict price
display(trainDF.select("price", "accommodates"))

In [0]:
# Calculate the basic statistics on the columns used and verify that there are no outilers
display(trainDF.select("price", "accommodates").summary())

In [0]:
# Define linear regression estimator and fit the model
# Why doesn't it work?
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="accommodates", labelCol="price")
#lrModel = lr.fit(trainDF)

In [0]:
# Vectorize the column 'accommodates'
# Pay attention to .transform()
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["accommodates"], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select(["accommodates", "features"]).show()

In [0]:
# Let's try with two input columns
vecAssembler_2 = VectorAssembler(inputCols=["accommodates", "bedrooms"], outputCol="features")
vecTrainDF_2 = vecAssembler_2.transform(trainDF)
vecTrainDF_2.select(["accommodates", "bedrooms", "features"]).show()

In [0]:
# Which parameters can the estimator take? We can print them out.
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="price")
lr.explainParams().split('\n')


In [0]:
# Fit the estimator to the data
lrModel = lr.fit(vecTrainDF)
lrModel

In [0]:
# Print the coefficients of the model
m = lrModel.coefficients[0]
b = lrModel.intercept

print(f"The formula for the linear regression line is y = {m:.2f}x + {b:.2f}")

In [0]:
# Perform the cross validation (and optionally grid search)
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

#pipeline = Pipeline(stages=[vecAssembler, lr])

params = (ParamGridBuilder()
          .addGrid(lr.elasticNetParam, [0.0, 1.0])
          .build())
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")

cv = CrossValidator(estimator=lr, evaluator=evaluator, estimatorParamMaps=params, numFolds=3, seed=11)
cvModel = cv.setParallelism(2).fit(vecTrainDF)
print(cvModel.avgMetrics)
print(cvModel.bestModel.coefficients)
print(cvModel.bestModel.intercept)

In [0]:
# Apply to test set
vecTestDF = vecAssembler.transform(testDF)
predDF = cvModel.transform(vecTestDF)

predDF.select("accommodates", "features", "price", "prediction").show()

In [0]:
# ALTERNATIVE
# Create a pipeline from all of the stages before (data preparation + model definition. Don't put cv in the pipeline.)
# Apply the pipeline to test frame
from pyspark.ml import Pipeline

vecAssembler = VectorAssembler(inputCols=["accommodates"], outputCol="features")
lr = LinearRegression(featuresCol="features", labelCol="price")
pipeline = Pipeline(stages=[vecAssembler, lr])

pipelineModel = pipeline.fit(trainDF)

predDF = pipelineModel.transform(testDF)
predDF.select("accommodates", "features", "price", "prediction").show()

In [0]:
# How good is the model?
# RegressionEvaluator will compute the differences between the predicted price and the real one and calculate the metric of overall model quality (R2)
from pyspark.ml.evaluation import RegressionEvaluator

regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="r2")

r2 = regressionEvaluator.evaluate(predDF)
print(f"Rsquare is {r2}")

## Lab: Random Forest Regression

Build and evaluate a Random Forest model the same way we did with Linear Regression.

In [0]:
# Split data to train and test set. Use the same split and seed as for linear regression, so we can compare the models at the end.


In [0]:
# Select three numeric atributes - accommodates, bathrooms, review_scores_rating. Predict the price as before. Name the dataframe trainDF. 


In [0]:
# Vectorize the attribute (feature) columns with VectorAssembler


In [0]:
# Build a RandomForestRegressor. The parameter that you need to define is labelCol (as before), but in addition define also 'maxDepth' = 7. Read about maxDepth and other parameters using rf.explainParams().split('\n').


In [0]:
# Fit the model to the dataframe with the vectorized features


In [0]:
# Cross Validation 


In [0]:
# Prepare test frame


In [0]:
# Apply the model to the test data


In [0]:
# Create a pipeline from all of the stages before (data preparation + model definition). Apply the pipeline to the test frame.


In [0]:
# Evaluate the quality of the model using R2 and RMSE


In [0]:
# Which feature is the most important for our model?
print('Feature importance for rfModel:', rfModel.featureImportances)
print('Feature importance for cvModel:', cvModel.bestModel.featureImportances)

In [0]:
# Print feature importance as pandas table.
# Since it's a small table, it doesn't need to be distributed and we can use pandas.
import pandas as pd

featuresDF = pd.DataFrame(list(zip(vecAssembler.getInputCols(), rfModel.featureImportances)), columns=["feature", "importance"])
featuresDF