In [1]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression 
# When working with PySpark, also need to start the PySpark session 

from pyspark.sql import SparkSession
# This a local computer custer so always have one cluster
spark = SparkSession.builder.appName('practiceML').getOrCreate()
# Imput dataset

df = spark.read.csv('converted_sf_housing_data.csv', header = True, inferSchema = True)
df.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms","number_of_reviews", "price").show(5)

+----------------------+---------------+--------+---------+-----------------+-----+
|neighbourhood_cleansed|      room_type|bedrooms|bathrooms|number_of_reviews|price|
+----------------------+---------------+--------+---------+-----------------+-----+
|      Western Addition|Entire home/apt|     1.0|      1.0|            180.0|170.0|
|        Bernal Heights|Entire home/apt|     2.0|      1.0|            111.0|235.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|             17.0| 65.0|
|        Haight Ashbury|   Private room|     1.0|      4.0|              8.0| 65.0|
|      Western Addition|Entire home/apt|     2.0|      1.5|             27.0|785.0|
+----------------------+---------------+--------+---------+-----------------+-----+
only showing top 5 rows



In [2]:
# Spliting data 80:20 and setting a random seed for  reproducibility, such that if we rerun this code we will 
# get the same data points going to our train and test datasets, respectively.The value of the seed itself shouldn’t matter,
# but data scientists often like setting it to 42 as that is the answer to the Ultimate Question of Life:
trainDF, testDF = df.randomSplit([.8, .2], seed=42)
print(f"""There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set""")

# if the input data changes, then the result of the split (by random Split()) won’t be the same.
# So, you should cache the training data set because you will be accessing it many times 
# throughout the machine learning process

# Linear regression (like many other algorithms in Spark) requires that all the input features are 
# contained within a single vector in DataFrame. Thus, we need to transform the data.

# For the task of putting all of our features into a single vector, we will use the VectorAssembler transformer. 
# VectorAssembler takes a list of input columns and creates a new DataFrame with an additional column, 
# which we call features. It combines the values of those input columns into a single vector. 

There are 5780 rows in the training set, and 1366 in the test set


## Single Variate LinearRegression (only predicting using bedroom as input feature)

In [3]:
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select("bedrooms", "features", "price").show(5)

+--------+--------+-----+
|bedrooms|features|price|
+--------+--------+-----+
|     1.0|   [1.0]|200.0|
|     1.0|   [1.0]|130.0|
|     1.0|   [1.0]| 95.0|
|     1.0|   [1.0]|250.0|
|     3.0|   [3.0]|250.0|
+--------+--------+-----+
only showing top 5 rows



In [4]:
# Linear regression can be extended to handle multiple independent variables. 
# If we had three features as input, x = [x1, x2, x3], then we could model y as y ≈ w0 + w1x1+ w2x2+ w3x3 + ε. 
# In this case, there is a separate coefficient (or weight) for each feature and a single intercept (w0 instead of b here). 
# The process of estimating the coefficients and intercept for model is called learning (or fitting) 
# the parameters for the model. 

# In Spark, LinearRegression is
# a type of estimator—it takes in a DataFrame and returns a Model. Estimators learn
# parameters from your data, have an estimator_name.fit() method, and are eagerly
# evaluated (i.e., kick off Spark jobs), whereas transformers are lazily evaluated. Some
# other examples of estimators include Imputer, DecisionTreeClassifier, and Random
# ForestRegressor.

lr = LinearRegression(featuresCol="features", labelCol="price")
lrModel = lr.fit(vecTrainDF)

# lr.fit() returns a LinearRegressionModel (lrModel), which is a transformer. In
# other words, the output of an estimator’s fit() method is a transformer. Once the
# estimator has learned the parameters, the transformer can apply these parameters to
# new data points to generate predictions

In [5]:
# Inspecting the parameters 
# In Python
m = round(lrModel.coefficients[0], 2)
b = round(lrModel.intercept, 2)
print(f"""The formula for the linear regression line is price = {m}*bedrooms + {b}""")

The formula for the linear regression line is price = 123.68*bedrooms + 47.51


## Creating a Pipeline
If we want to apply model to the test set, then we need to prepare that data in the
same way as the training set (i.e., pass it through the vector assembler). Oftentimes
data preparation pipelines will have multiple steps, and it becomes cumbersome to
remember not only which steps to apply, but also the ordering of the steps. This is the
motivation for the Pipeline API: you simply specify the stages you want your data to
pass through, in order, and Spark takes care of the processing for you. They provide
the user with better code reusability and organization. In Spark, Pipelines are esti‐
mators, whereas ```PipelineModels```—fitted ```Pipelines```—are transformers.

Another advantage of using the Pipeline API is that it determines which stages are
estimators/transformers for you, so you don’t have to worry about specifying
```name.fit()``` versus ```name.transform()``` for each of the stages.
Since ```pipelineModel``` is a transformer, it is straightforward to apply it to our test data
set too:


In [6]:
# Build the pipeline 
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)

# Apply the pipelines 
predDF = pipelineModel.transform(testDF)
predDF.select("bedrooms", "features", "price", "prediction").show(5)

+--------+--------+-----+------------------+
|bedrooms|features|price|        prediction|
+--------+--------+-----+------------------+
|     1.0|   [1.0]| 85.0|171.18598011578285|
|     1.0|   [1.0]| 45.0|171.18598011578285|
|     1.0|   [1.0]| 70.0|171.18598011578285|
|     1.0|   [1.0]|128.0|171.18598011578285|
|     1.0|   [1.0]|159.0|171.18598011578285|
+--------+--------+-----+------------------+
only showing top 5 rows



## Multi-Variate LinearRegression (Using all features)
### One-hot encoding

Most machine learning models in MLlib expect numerical values as input, repre‐
sented as vectors. To convert categorical values into numeric values, we can use a
technique called one-hot encoding (OHE). Suppose we have a column called Animal
and we have three types of animals: Dog, Cat, and Fish. We can’t pass the string types
into our ML model directly, so we need to assign a numeric mapping, such as this:
```
Animal = {"Dog", "Cat", "Fish"}
"Dog" = 1, "Cat" = 2, "Fish" = 3
```
However, using this approach we’ve introduced some spurious relationships into our
data set that weren’t there before. For example, why did we assign Cat twice the value
of Dog? The numeric values we use should not introduce any relationships into our
data set. Instead, we want to create a separate column for every distinct value in our
Animal column:
```
"Dog" = [ 1, 0, 0]
"Cat" = [ 0, 1, 0]
"Fish" = [0, 0, 1]
```
If the animal is a dog, it has a one in the first column and zeros elsewhere. If it is a cat,
it has a one in the second column and zeros elsewhere. The ordering of the columns
is irrelevant. If you’ve used pandas before, you’ll note that this does the same thing as
```pandas.get_dummies()```.
If we had a zoo of 300 animals, would OHE massively increase consumption of mem‐
ory/compute resources? Not with Spark! Spark internally uses a ```SparseVector``` when
the majority of the entries are 0, as is often the case after OHE, so it does not waste
space storing 0 values. Let’s take a look at an example to better understand how
SparseVectors work:
```
DenseVector(0, 0, 0, 7, 0, 2, 0, 0, 0, 0)
SparseVector(10, [3, 5], [7, 2])
```
The ```DenseVector``` in this example contains 10 values, all but 2 of which are 0. To create a ```SparseVector```, we need to keep track of the size of the vector, the indices of the
nonzero elements, and the corresponding values at those indices. In this example the
size of the vector is 10, there are two nonzero values at indices 3 and 5, and the corresponding 
values at those indices are 7 and 2.

There are a few ways to one-hot encode your data with Spark. A common approach is
to use the ```StringIndexer``` and ```OneHotEncoder```. With this approach, the first step is to
apply the ```StringIndexer``` estimator to convert categorical values into category indices. 
These category indices are ordered by label frequencies, so the most frequent
label gets index 0, which provides us with reproducible results across various runs of
the same data.


In [7]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]

# How does the StringIndexer handle new categories
# that appear in the test data set, but not in the training data set?

# There is a
# handleInvalid parameter that specifies how you want to handle them. The options
# are skip (filter out rows with invalid data), error (throw an error), or keep (put inva‐
# lid data in a special additional bucket, at index numLabels). For this example, we just
# skipped the invalid records.

stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")
oheEncoder = OneHotEncoder(inputCols=indexOutputCols, outputCols=oheOutputCols)

numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]
assemblerInputs = oheOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")


In [8]:
# feature preparation and model building into the pipeline
lr = LinearRegression(labelCol="price", featuresCol="features")
pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr])

In [9]:
# # Inspecting the parameters 

# m = round(lrModel.coefficients[0], 2)
# b = round(lrModel.intercept, 2)
# print(f"""The formula for the linear regression line is price = {m}*bedrooms + {b}""")

pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select("features", "price", "prediction").show(5)


+--------------------+-----+------------------+
|            features|price|        prediction|
+--------------------+-----+------------------+
|(98,[0,3,6,22,43,...| 85.0| 55.24365707389188|
|(98,[0,3,6,22,43,...| 45.0|23.357685914717877|
|(98,[0,3,6,22,43,...| 70.0|28.474464479034395|
|(98,[0,3,6,12,42,...|128.0| -91.6079079594947|
|(98,[0,3,6,12,43,...|159.0| 95.05688229945372|
+--------------------+-----+------------------+
only showing top 5 rows



## Evaluating Models

## Root Mean-Square Error (RMSE) Approach
RMSE score ranges from 0 to infinity. The closer to the zero, the better. 

**RMSE mathematical concept**
- Compuete the error (difference) ```Error = (Y- ȳ)``` and squared it so that positive and negative residuals do not cancle out  ```Squared Error (SE) = (Y- ȳ)^2```
- Sum up the squared error for all ```n``` no of records  ```Sum of Squared Errors (SSE) = Sum(i= 1 to n) (Y- ȳ)^2```
- However, the SSE grows with the number of records ```n``` in the data set, so we want to normalize it by the number of records. so, it becomes: ```Mean Squared Error (MSE) = 1/n(Sum(i= 1 to n) (Y- ȳ)^2)```
- MSE is on the scale of unit-squared. It requires to take the square root of the MSE to get the error back on the scale of the original unit, which gives the root-mean-square error (RMSE) ```Root Mean Squared Error (RMSE) = Sq.Root of MSE```

In [11]:
# Let’s evaluate our model using RMSE:
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="price", 
    metricName="rmse")

rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")

RMSE is 220.6


**Interpreting the value of RMSE.***
*How do we know if 220.6 is a good value for the RMSE?*
The most common approach is to build a simple baseline model and compute its RMSE to compare against. 
A common baseline model for regression tasks is to compute the average value of the label on the training
set ȳ , then predict ȳ for every record in the test data set and compute the resulting RMSE



## DBC Notebook

The baseline model has RMSE of 240.7. so, we beat baseline. If it doesn't beat the baseline, then something probably
went wrong in your model builing process. If this were a classification problem, you might want to predict the
most prevalent class as your baseline model.


## R-Squared Approach
Despite the name R^2 containing “squared,” R-squared values range from negative infinity to 1.

# Computation Image from excel 

If the model perfectly predicts every data point, then the ```RSS = 0```, making ```R-Squared = 1```. And if 
```RSS = TSS``` then the fraction becomes 1/1 so R-Squared is 0. This is what happens if model performs the same as 
always predicting the average value YBar. 

But what if your model performs worse than always predicting ȳ and your RSS isreally large? Then your R-Squared
can actually be negative! If your R-Squared is negative, you should reevaluate your modeling process. The nice thing about using R-Squared is that you don’t necessarily need to define a baseline model to compare against.

If we want to change our regression evaluator to use R-Squared, instead of redefining the regression evaluator, we can set the metric name using the setter property. 

```
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R2 is {r2}")
```

In [12]:
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R2 is {r2}")

R2 is 0.16043316698848087


So, In this example R-Squared is positive, but it is very close to 0. One of the reasons why our model is not performing too well is because our label, price, appears to be ```log-normally distributed``` (https://en.wikipedia.org/wiki/Log-normal_distribution). If a distribution is log-normal, it means that if we take the logarithm of the value, the result looks like a normal distribution. Price is often log-normally distributed. If you think about rental prices in San Francisco, most cost around $200 per night, but there are some that rent for thousands of dollars a night! You can see the distribution of our Airbnb prices for our training Dataset. 
## Insert fig 10-7 and 10-8 from book page 329 and as well as the code from repo


## Hyperparameter Tuning
A hyperparameter is an attribute that you define about the model prior to training, and it is not learned during the training process (not to be confused with parameters, which are learned in the training process). The number of trees in your random forest is an example of a hyperparameter.
Let's focus on using tree-based models as an example for hyperparameter turning procedures, but the same concepts apply to other models as well. 

**Tree-Based Models**
A decision tree is a series of if-then-else rules learned from your data for classification or regression tasks. For decision trees, you don’t have to worry about standardizing or scaling your input features, because this has no impact on the splits—but you do have to be careful about how you prepare your categorical features.Tree-based methods can naturally handle categorical variables. In spark.ml, you just need to pass the categorical columns to the StringIndexer, and the decision tree can take care of the rest. 

In [13]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(labelCol="price")
# Filter for just numeric columns (and exclude price, our label)
numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]

# Combine output of StringIndexer defined above and numeric columns
assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
# Combine stages into pipeline
stages = [stringIndexer, vecAssembler, dt]
pipeline = Pipeline(stages=stages)
dt.setMaxBins(40)
pipelineModel = pipeline.fit(trainDF) # This line should error
dtModel = pipelineModel.stages[-1]
print(dtModel.toDebugString)


DecisionTreeRegressionModel: uid=DecisionTreeRegressor_e17af87064a5, depth=5, numNodes=47, numFeatures=33
  If (feature 12 <= 2.5)
   If (feature 12 <= 1.5)
    If (feature 5 in {1.0,2.0})
     If (feature 4 in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})
      If (feature 3 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0})
       Predict: 104.23992784125075
      Else (feature 3 not in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0})
       Predict: 250.7111111111111
     Else (feature 4 not in {0.0,1.0,3.0,5.0,9.0,10.0,11.0,13.0,14.0,16.0,18.0,24.0})
      If (feature 3 in {0.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,22.0,23.0,27.0,33.0,35.0})
       Predict: 151.94179894179894
      Else (feat

In [14]:
# Understanding feature importance scores from our model to see the most important features
import pandas as pd
featureImp = pd.DataFrame(list(zip(vecAssembler.getInputCols(), 
                                   dtModel.featureImportances)),columns=["feature", "importance"])
featureImp.sort_values(by="importance", ascending=False)

Unnamed: 0,feature,importance
12,bedrooms,0.283406
1,cancellation_policyIndex,0.167893
2,instant_bookableIndex,0.140081
4,property_typeIndex,0.128179
15,number_of_reviews,0.126233
3,neighbourhood_cleansedIndex,0.0562
9,longitude,0.03881
14,minimum_nights,0.029473
13,beds,0.015218
5,room_typeIndex,0.010905


## Ways to improve model performance
**Bootstrapping samples by rows**: Bootstrapping is a technique for simulating new data by sampling with replacement from your original data. Each decision tree is trained on a different bootstrap sample of your data set, which produces slightly different decision trees (M&M approach), and then you aggregate their predictions. This process is called ```bootstrap aggregating``` or ```bagging```.

**Random feature selection by columns**: The main drawback with bagging is that the trees are all highly correlated, and thus learn similar patterns in your data. To mitigate this problem, each time you want to make a split you only consider a random subset of the columns (1/3 of the features for ```RandomForestRegressor``` and ```SquareRoot of features``` for ```RandomForestClassifier```). Due to this randomness you introduce, you typically want each tree to be quite shallow. You might be thinking: each of these trees will perform worse than any single decision tree, so how could this approach possibly be better? It turns out that each of the trees learns something different about your data set, and combining this collection of “weak” learners into an ensemble makes the forest much more robust than a single decision tree. 

So how do we determine what the optimal number of trees in our random forest or the max depth of those trees should be? This process is called ```hyperparameter tuning```.

## Ways to tune hyperparameters
**k-Fold Cross-Validation**: For example, instead of splitting our data into an 80/20 train/test split, as we did earlier, we can do a 60/20/20 split to generate training, validation, and test data sets, respectively. With this approach we lose 25% of the training data (80% to 60%) which could have been used to help improve the model. 

Instead the spiltting the data set into training, validation, and test sets, we spilt into training and test sets as before -but we use the training data for both training and validation. To accomplish this, we split our training data into ```k``` subsets, or “folds” (e.g., three). Then, for a given hyperparameter configuration,
we train our model on ```k–1``` folds and evaluate on the remaining fold, repeating this process k times. 

|  |```k-2``` Folds| ```k-1``` Folds |```k``` Folds|
|---|--------|-------|-------|
|Pass 1:|Train|Train|Validate|
|Pass 2:|Train|Validate|Train|
|Pass 3:|Validate|Train|Train|

As this figure shows, if we split our data into three folds, our model is first trained on the first and second folds (or splits) of the data, and evaluated on the third fold. We then build the same model with the same hyperparameters on the first and third folds of the data, and evaluate its performance on the second fold. Lastly, we build the model on the second and third folds and evaluate it on the first fold. We then average the performance of those three (or k) validation data sets as a proxy of how well this model will perform on unseen data, as every data point had the chance to be part of the validation data set exactly once. Next, we repeat this process for all of our different hyperparameter configurations to identify the optimal one.

To perform a hyperparameter search in Spark, take the following steps :
- Define the estimator you want to evaluate.
- Specify which hyperparameters you want to vary, as well as their respective values, using the ```ParamGridBuilder```.
- Define an evaluator to specify which metric to use to compare the various models.
- Use the ```CrossValidator``` to perform cross-validation, evaluating each of the various models.

In [1]:
# Define a pipeline 
pipeline = Pipeline(stages = [StringIndexer, vecAssembler, rf])

from pyspark.ml.tuning import ParamGridBuilder
# vary our maxDepth to be 2, 4, or 6 and numTrees to be 10 and 100.
# retun the 3X2 matrix with different hyperparameter configurations 
paramGrid = (ParamGridBuilder().addgrid(rf.maxDepth, [2, 5, 6])
                             .addGrid(rf.numTrees, [10,100])
                             .build())

# Now that we have set up our hyperparameter grid, we need to define how to evaluate 
# each of the models to determine which one performed best. For this task we will use
# the RegressionEvaluator, and we’ll use RMSE as our metric of interest:

evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")

# perfroming k-fold cross-validation and fit it with cross-validator to the training data set 
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=pipeline, evaluator=evaluator,estimatorParamMaps=paramGrid, numFolds=3, seed=42)
cvModel = cv.fit(trainDF)

# we just train total 19 models:  6 hyperparameters (3X2 metirc) with 3 folds. Plus, Spark retrains
# your model on the entire training data set once it has identified the optimal 
# hyperparameter configuration, so in the end we trained 19 models. If you want to retain the 
# intermediate models trained, you can set collect SubModels=True in the CrossValidator

# inspect the results of the cross-validator
# Best model the one with lowest RMSE
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))

# Optimizing Pipelines
# to faster the trainin time, we can pass parallelism into the model. 
# Generally speaking, a value up to 10 should be sufficient for most clusters
cvModel = cv.setParallelism(4).fit(trainDF)

# Further spped up the training time: Nesting inside altogether
cv = CrossValidator(estimator=rf, 
                    evaluator=evaluator, 
                    estimatorParamMaps=paramGrid,
                    numFolds=3,
                    parallelism=4,
                    seed=42)
pipeline = Pipeline(stages=[stringIndexer, vecAssembler, cv])
pipelineModel = pipeline.fit(trainDF)


NameError: name 'Pipeline' is not defined

## Managing, Deploying, and Scaling Machine Learning Pipelines with Apache Spark

## MLfow

MLflow is an open source platform that helps developers reproduce and share experiments, 
manage models, and much more
- Tracking: Provides APIs to record parameters, metrics, code versions, models, and artifacts such as plots, and text
- Projects: A standardized format to package your data science projects and their dependencies to run on other platforms
- Models: A standardized format to package models to deploy to diverse execution environments. It provides a consistent API for loading and applying models, regardless of the algorithm or library used to build the model.
- Registry: A repository to keep track of model lineage, model versions, stage transitions, and annotations.
