d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Linear Regression: Improving our model

In this notebook we will be adding additional features to our model, as well as discuss how to handle categorical features.

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) In this lesson you:<br>
 - One Hot Encode categorical variables
 - Use the Pipeline API
 - Save and load models

In [0]:
%run "./Includes/Classroom-Setup"

In [0]:
filePath = "dbfs:/mnt/training/airbnb/sf-listings/sf-listings-2019-03-06-clean.delta/"
airbnbDF = spark.read.format("delta").load(filePath)

## Train/Test Split

Let's use the same 80/20 split with the same seed as the previous notebook so we can compare our results apples to apples (unless you changed the cluster config!)

In [0]:
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)

## Categorical Variables

There are a few ways to handle categorical features:
* Assign them a numeric value
* Create "dummy" variables (also known as One Hot Encoding)
* Generate embeddings (mainly used for textual data)

### One Hot Encoder
Here, we are going to One Hot Encode (OHE) our categorical variables. Spark doesn't have a `dummies` function, and OHE is a two step process. First, we need to use `StringIndexer` to map a string column of labels to an ML column of label indices [Python](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.StringIndexer)/[Scala](https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.ml.feature.StringIndexer).

Then, we can apply the `OneHotEncoder` to the output of the StringIndexer [Python](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.OneHotEncoder)/[Scala](https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.ml.feature.OneHotEncoder).

In [0]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]

stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")
oheEncoder = OneHotEncoder(inputCols=indexOutputCols, outputCols=oheOutputCols)

## Vector Assembler

Now we can combine our OHE categorical features with our numeric features.

In [0]:
from pyspark.ml.feature import VectorAssembler

numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]
assemblerInputs = oheOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

## Linear Regression

Now that we have all of our features, let's build a linear regression model.

In [0]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol="price", featuresCol="features")

## Pipeline

Let's put all these stages in a Pipeline. A `Pipeline` is a way of organizing all of our transformers and estimators [Python](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Pipeline)/[Scala](https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.ml.Pipeline).

This way, we don't have to worry about remembering the same ordering of transformations to apply to our test dataset.

In [0]:
from pyspark.ml import Pipeline

stages = [stringIndexer, oheEncoder, vecAssembler, lr]
pipeline = Pipeline(stages=stages)

pipelineModel = pipeline.fit(trainDF)

## Saving Models

We can save our models to persistent storage (e.g. DBFS) in case our cluster goes down so we don't have to recompute our results.

In [0]:
pipelinePath = userhome + "/machine-learning-p/lr_pipeline_model"
pipelineModel.write().overwrite().save(pipelinePath)

## Loading models

When you load in models, you need to know the type of model you are loading back in (was it a linear regression or logistic regression model?).

For this reason, we recommend you always put your transformers/estimators into a Pipeline, so you can always load the generic PipelineModel back in.

In [0]:
from pyspark.ml import PipelineModel

savedPipelineModel = PipelineModel.load(pipelinePath)

## Apply model to test set

In [0]:
predDF = savedPipelineModel.transform(testDF)

display(predDF.select("features", "price", "prediction"))

features,price,prediction
"Map(vectorType -> sparse, length -> 99, indices -> List(0, 3, 6, 12, 43, 68, 69, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 37.73615, -122.41245, 2.0, 1.0, 1.0, 2.0, 1.0, 194.0, 91.0, 9.0, 9.0, 10.0, 10.0, 9.0, 9.0))",86.0,42.87569317935413
"Map(vectorType -> sparse, length -> 99, indices -> List(0, 3, 6, 11, 45, 67, 69, 73, 74, 75, 76, 77, 78, 79, 80, 82, 83, 84, 85, 86, 87, 88, 92, 93, 94, 95, 96, 97, 98), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 37.76702, -122.43518, 2.0, 1.0, 1.0, 1.0, 3.0, 98.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",190.0,207.7985698871562
"Map(vectorType -> sparse, length -> 99, indices -> List(0, 3, 6, 28, 42, 68, 69, 73, 74, 75, 76, 77, 78, 79, 80, 82, 83, 84, 85, 86, 87, 88, 92, 93, 94, 95, 96, 97, 98), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 37.78424, -122.39925, 2.0, 1.0, 1.0, 1.0, 180.0, 98.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",100.0,66.44237254263498
"Map(vectorType -> sparse, length -> 99, indices -> List(0, 3, 6, 19, 43, 67, 69, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 37.7787, -122.4554, 4.0, 2.0, 2.0, 2.0, 3.0, 6.0, 100.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0))",325.0,310.5514803297665
"Map(vectorType -> sparse, length -> 99, indices -> List(0, 3, 6, 17, 43, 68, 69, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 37.79256, -122.42135, 1.0, 1.0, 1.0, 1.0, 140.0, 2.0, 60.0, 7.0, 6.0, 8.0, 8.0, 9.0, 7.0))",200.0,14.257843854888051
"Map(vectorType -> sparse, length -> 99, indices -> List(0, 3, 6, 14, 42, 67, 69, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 37.75369, -122.42577, 2.0, 1.0, 1.0, 1.0, 30.0, 2.0, 100.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0))",200.0,159.09416506921116
"Map(vectorType -> sparse, length -> 99, indices -> List(0, 3, 6, 24, 43, 68, 69, 73, 74, 75, 76, 77, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 37.71969, -122.44378, 2.0, 1.0, 2.0, 1.0, 24.0, 86.0, 9.0, 9.0, 10.0, 10.0, 9.0, 9.0))",80.0,-19.64792383243548
"Map(vectorType -> sparse, length -> 99, indices -> List(0, 3, 6, 23, 42, 68, 69, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 90), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 37.79586, -122.43035, 1.0, 1.0, 1.0, 1.0, 30.0, 1.0, 80.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 1.0))",160.0,161.3381097101519
"Map(vectorType -> sparse, length -> 99, indices -> List(0, 3, 6, 9, 42, 67, 69, 73, 74, 75, 76, 77, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 37.7752, -122.43765, 3.0, 1.0, 1.0, 90.0, 6.0, 100.0, 9.0, 9.0, 10.0, 10.0, 10.0, 9.0))",132.0,112.37329941804636
"Map(vectorType -> sparse, length -> 99, indices -> List(0, 3, 6, 9, 44, 68, 69, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 37.77814, -122.44079, 2.0, 1.0, 1.0, 1.0, 3.0, 5.0, 100.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0))",100.0,215.76290468771185


## Evaluate model

![](https://files.training.databricks.com/images/r2d2.jpg) How is our R2 doing?

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")

rmse = regressionEvaluator.evaluate(predDF)
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"RMSE is {rmse}")
print(f"R2 is {r2}")

As you can see, our RMSE decreased when compared to the model without one-hot encoding, and the R2 increased as well!

-sandbox
&copy; 2020 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>