# Model tuning and selection in Pyspark
  
In this last chapter, you'll apply what you've learned to create a model that predicts which flights will be delayed.
  
```
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   
      /_/
```


## Resources
  
**Notebook Syntax**
  
<span style='color:#7393B3'>NOTE:</span>  
- Denotes additional information deemed to be *contextually* important
- Colored in blue, HEX #7393B3
  
<span style='color:#E74C3C'>WARNING:</span>  
- Significant information that is *functionally* critical  
- Colored in red, HEX #E74C3C
  
---
  
**Links**
  
[NumPy Documentation](https://numpy.org/doc/stable/user/index.html#user)  
[Pandas Documentation](https://pandas.pydata.org/docs/user_guide/index.html#user-guide)  
[Matplotlib Documentation](https://matplotlib.org/stable/index.html)  
[Seaborn Documentation](https://seaborn.pydata.org)  
[Apache Spark Documentation](https://spark.apache.org/docs/latest/api/python/index.html)  
  
---
  
**Notable Functions**
  
<table>
  <tr>
    <th>Index</th>
    <th>Operator</th>
    <th>Use</th>
  </tr>
  <tr>
    <td>1</td>
    <td>pyspark.SparkContext()</td>
    <td>Create a new SparkContext instance, the entry point to using Spark functionality.</td>
  </tr>
  <tr>
    <td>2</td>
    <td>pyspark.SparkContext().version</td>
    <td>Retrieve the version of the SparkContext.</td>
  </tr>
  <tr>
    <td>3</td>
    <td>pyspark.SparkContext().stop()</td>
    <td>Stop the SparkContext, releasing associated resources.</td>
  </tr>
  <tr>
    <td>4</td>
    <td>pyspark.sql.SparkSession</td>
    <td>Create a new SparkSession instance, offering an entry point for DataFrame and SQL functionality.</td>
  </tr>
  <tr>
    <td>5</td>
    <td>pyspark.sql.SparkSession.builder.getOrCreate()</td>
    <td>Retrieve an existing SparkSession or create a new one if none exists.</td>
  </tr>
  <tr>
    <td>6</td>
    <td>pyspark.sql.SparkSession.builder.appName</td>
    <td>Set the application name for the SparkSession.</td>
  </tr>
  <tr>
    <td>7</td>
    <td>pyspark.sql.SparkSession.builder.getOrCreate</td>
    <td>Get an existing SparkSession or create a new one if none exists.</td>
  </tr>
  <tr>
    <td>8</td>
    <td>pyspark.sql.SparkSession.read</td>
    <td>Create a DataFrameReader for reading data in various formats.</td>
  </tr>
  <tr>
    <td>9</td>
    <td>pyspark.sql.SparkSession.read.format</td>
    <td>Specify the input data format when reading data using the DataFrameReader.</td>
  </tr>
  <tr>
    <td>10</td>
    <td>pyspark.sql.SparkSession.read.format.option('inferSchema', 'True')</td>
    <td>Specify options, such as inferring schema from data, when reading data using the DataFrameReader.</td>
  </tr>
  <tr>
    <td>11</td>
    <td>pyspark.sql.SparkSession.read.format.option('header', 'True')</td>
    <td>Specify options, such as reading headers from data, when reading data using the DataFrameReader.</td>
  </tr>
  <tr>
    <td>12</td>
    <td>pyspark.sql.SparkSession.read.format.load()</td>
    <td>Load data into a DataFrame based on specified options using the DataFrameReader.</td>
  </tr>
  <tr>
    <td>13</td>
    <td>pyspark.sql.SparkSession.createOrReplaceTempView</td>
    <td>Create or replace a temporary view of a DataFrame.</td>
  </tr>
  <tr>
    <td>14</td>
    <td>pyspark.sql.SparkSession.catalog.listTables()</td>
    <td>List the tables available in the catalog.</td>
  </tr>
  <tr>
    <td>15</td>
    <td>pyspark.sql.SparkSession.sql()</td>
    <td>Execute a SQL query and return the result as a DataFrame.</td>
  </tr>
  <tr>
    <td>16</td>
    <td>pyspark.sql.SparkSession.sql().toPandas()</td>
    <td>Convert the result of a SQL query to a Pandas DataFrame.</td>
  </tr>
  <tr>
    <td>17</td>
    <td>pyspark.sql.SparkSession.sql().toPandas().head()</td>
    <td>Retrieve the first few rows of a Pandas DataFrame obtained from a SQL query result.</td>
  </tr>
  <tr>
    <td>18</td>
    <td>pyspark.sql.SparkSession.createDataFrame()</td>
    <td>Create a DataFrame from a list or RDD.</td>
  </tr>
  <tr>
    <td>19</td>
    <td>pyspark.sql.SparkSession.read.csv()</td>
    <td>Read data from a CSV file and load it into a DataFrame.</td>
  </tr>
  <tr>
    <td>20</td>
    <td>pyspark.sql.SparkSession.table</td>
    <td>Create a DataFrame representing a table in the catalog.</td>
  </tr>
  <tr>
    <td>21</td>
    <td>pyspark.sql.SparkSession.filter</td>
    <td>Filter rows of a DataFrame based on a condition.</td>
  </tr>
  <tr>
    <td>22</td>
    <td>pyspark.sql.SparkSession.select</td>
    <td>Select columns from a DataFrame.</td>
  </tr>
  <tr>
    <td>23</td>
    <td>pyspark.sql.SparkSession.selectExpr</td>
    <td>Select columns using SQL expressions from a DataFrame.</td>
  </tr>
  <tr>
    <td>24</td>
    <td>pyspark.sql.SparkSession.printSchema</td>
    <td>Print the schema of a DataFrame.</td>
  </tr>
  <tr>
    <td>25</td>
    <td>pyspark.sql.SparkSession.withColumn</td>
    <td>Add or replace a column in a DataFrame.</td>
  </tr>
  <tr>
    <td>26</td>
    <td>pyspark.sql.types.IntegerType</td>
    <td>Create an IntegerType column type for use in DataFrame schema.</td>
  </tr>
  <tr>
    <td>27</td>
    <td>pyspark.sql.functions.col</td>
    <td>Reference a column in a DataFrame based on its name.</td>
  </tr>
  <tr>
    <td>28</td>
    <td>pyspark.sql.SparkSession.groupBy</td>
    <td>Group rows in a DataFrame based on specified columns.</td>
  </tr>
  <tr>
    <td>29</td>
    <td>pyspark.sql.SparkSession.groupBy.min</td>
    <td>Compute the minimum value of specified columns for grouped rows.</td>
  </tr>
  <tr>
    <td>30</td>
    <td>pyspark.sql.SparkSession.groupBy.max</td>
    <td>Compute the maximum value of specified columns for grouped rows.</td>
  </tr>
  <tr>
    <td>31</td>
    <td>pyspark.sql.SparkSession.groupBy.avg</td>
    <td>Compute the average value of specified columns for grouped rows.</td>
  </tr>
  <tr>
    <td>32</td>
    <td>pyspark.sql.SparkSession.groupBy.sum</td>
    <td>Compute the sum of specified columns for grouped rows.</td>
  </tr>
  <tr>
    <td>33</td>
    <td>pyspark.sql.SparkSession.groupBy.count</td>
    <td>Compute the count of rows for grouped columns.</td>
  </tr>
  <tr>
    <td>34</td>
    <td>pyspark.sql.functions.stddev</td>
    <td>Compute the standard deviation of specified columns in a DataFrame.</td>
  </tr>
  <tr>
    <td>35</td>
    <td>pyspark.sql.SparkSession.withColumnRenamed</td>
    <td>Rename a column in a DataFrame.</td>
  </tr>
  <tr>
    <td>36</td>
    <td>pyspark.sql.SparkSession.join</td>
    <td>Join two DataFrames based on specified columns.</td>
  </tr>
  <tr>
    <td>37</td>
    <td>pyspark.ml.feature.StringIndexer</td>
    <td>Convert categorical strings to numerical indices using StringIndexer.</td>
  </tr>
  <tr>
    <td>38</td>
    <td>pyspark.ml.feature.OneHotEncoder</td>
    <td>Encode categorical indices as one-hot vectors using OneHotEncoder.</td>
  </tr>
  <tr>
    <td>39</td>
    <td>pyspark.ml.feature.VectorAssembler</td>
    <td>Combine multiple columns into a single feature vector using VectorAssembler.</td>
  </tr>
  <tr>
    <td>40</td>
    <td>pyspark.ml.Pipeline</td>
    <td>Construct a ML pipeline by assembling a sequence of transformers and an estimator.</td>
  </tr>
  <tr>
    <td>41</td>
    <td>pyspark.sql.SparkSession.randomSplit</td>
    <td>Randomly split a DataFrame into training and testing datasets.</td>
  </tr>
</table>

  
---
  
**Language and Library Information**  
  
Python 3.11.0  
  
Name: numpy  
Version: 1.24.3  
Summary: Fundamental package for array computing in Python  
  
Name: pandas  
Version: 2.0.3  
Summary: Powerful data structures for data analysis, time series, and statistics  
  
Name: matplotlib  
Version: 3.7.2  
Summary: Python plotting package  
  
Name: seaborn  
Version: 0.12.2  
Summary: Statistical data visualization  
  
Name: pyspark  
Version: 3.4.1  
Summary: Apache Spark Python API  
  
---
  
**Miscellaneous Notes**
  
<span style='color:#7393B3'>NOTE:</span>  
  
`python3.11 -m IPython` : Runs python3.11 interactive jupyter notebook in terminal.
  
`nohup ./relo_csv_D2S.sh > ./output/relo_csv_D2S.log &` : Runs csv data pipeline in headless log.  
  
`print(inspect.getsourcelines(test))` : Get self-defined function schema  
  
<span style='color:#7393B3'>NOTE:</span>  
  
Snippet to plot all built-in matplotlib styles :
  
```python

x = np.arange(-2, 8, .1)
y = 0.1 * x ** 3 - x ** 2 + 3 * x + 2
fig = plt.figure(dpi=100, figsize=(10, 20), tight_layout=True)
available = ['default'] + plt.style.available
for i, style in enumerate(available):
    with plt.style.context(style):
        ax = fig.add_subplot(10, 3, i + 1)
        ax.plot(x, y)
    ax.set_title(style)
```
  

In [1]:
import numpy as np                  # Numerical Python:         Arrays and linear algebra
import pandas as pd                 # Panel Datasets:           Dataset manipulation
import matplotlib.pyplot as plt     # MATLAB Plotting Library:  Visualizations
import seaborn as sns               # Seaborn:                  Visualizations
import pyspark                      # Apache Spark:             Cluster Computing

# Setting a standard figure size
plt.rcParams['figure.figsize'] = (8, 8)

# Set the maximum number of columns to be displayed
pd.set_option('display.max_columns', 50)

### What is logistic regression?
  
The model you'll be fitting in this chapter is called a logistic regression. This model is very similar to a linear regression, but instead of predicting a numeric variable, it predicts the probability (between 0 and 1) of an event.
  
To use this as a classification algorithm, all you have to do is assign a cutoff point to these probabilities. If the predicted probability is above the cutoff point, you classify that observation as a 'yes' (in this case, the flight being late), if it's below, you classify it as a 'no'!
  
You'll tune this model by testing different values for several hyperparameters. A hyperparameter is just a value in the model that's not estimated from the data, but rather is supplied by the user to maximize performance. For this course it's not necessary to understand the mathematics behind all of these values - what's important is that you'll try out a few different choices and pick the best one.
  
---
  
Why do you supply hyperparameters?
  
1. Possible Answers
  
- [ ] They explain information about the data.
- [x] They improve model performance.
- [ ] They improve model fitting speed.
  
Great job! You supply hyperparameters to optimize your model.

### Create the modeler
  
The `Estimator` you'll be using is a `LogisticRegression` from the `pyspark.ml.classification` submodule.
  
---
  
1. Import the `LogisticRegression` class from `pyspark.ml.classification`.
2. Create a `LogisticRegression` called `lr` by calling `LogisticRegression()` with no arguments.

In [2]:
from pyspark.sql import SparkSession

# Creating spark session
spark = (
    SparkSession.builder.appName('flights').getOrCreate()
)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/27 15:00:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression Estimator
lr = LogisticRegression()

Great work! That's the first step to any modeling in PySpark.

### Cross validation
  
In the next few exercises you'll be tuning your logistic regression model using a procedure called k-fold cross validation. This is a method of estimating the model's performance on unseen data (like your `test` DataFrame).
  
It works by splitting the training data into a few different partitions. The exact number is up to you, but in this course you'll be using PySpark's default value of three. Once the data is split up, one of the partitions is set aside, and the model is fit to the others. Then the error is measured against the held out partition. This is repeated for each of the partitions, so that every block of data is held out and used as a test set exactly once. Then the error on each of the partitions is averaged. This is called the cross validation error of the model, and is a good estimate of the actual error on the held out data.
  
You'll be using cross validation to choose the hyperparameters by creating a grid of the possible pairs of values for the two hyperparameters, `elasticNetParam=` and `regParam=`, and using the cross validation error to compare all the different models so you can choose the best one!
  
---
  
What does cross validation allow you to estimate?
  
Possible Answers
  
- [x] The model's error on held out data.
- [ ] The model's error on data used for fitting.
- [ ] The time it will take to fit the model.
  
Exactly! The cross validation error is an estimate of the model's error on the test set.

### Create the evaluator
  
The first thing you need when doing cross validation for model selection is a way to compare different models. Luckily, the `pyspark.ml.evaluation` submodule has classes for evaluating different kinds of models. Your model is a binary classification model, so you'll be using the `BinaryClassificationEvaluator` from the `pyspark.ml.evaluation` module.
  
This evaluator calculates the area under the ROC. This is a metric that combines the two kinds of errors a binary classifier can make (false positives and false negatives) into a simple number. You'll learn more about this towards the end of the chapter!
  
---
  
1. Import the submodule `pyspark.ml.evaluation` as `evals`.
2. Create evaluator by calling `evals.BinaryClassificationEvaluator()` with the argument `metricName="areaUnderROC"`.

In [4]:
# Import the evaluation submodule
import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName='areaUnderROC')

Perfect! Now you can compare models using the metric output by your `evaluator`!

Make a grid
Next, you need to create a grid of values to search over when looking for the optimal hyperparameters. The submodule `pyspark.ml.tuning` includes a class called `ParamGridBuilder` that does just that (maybe you're starting to notice a pattern here; PySpark has a submodule for just about everything!).

You'll need to use the `.addGrid()` and `.build()` methods to create a grid that you can use for cross validation. The `.addGrid()` method takes a model parameter (an attribute of the model `Estimator`, `lr`, that you created a few exercises ago) and a list of values that you want to try. The `.build()` method takes no arguments, it just returns the grid that you'll use later.
  
---
  
1. Import the submodule `pyspark.ml.tuning` under the alias tune.
2. Call the class constructor `ParamGridBuilder()` with no arguments. Save this as grid.
3. Call the `.addGrid()` method on grid with `lr.regParam` as the first argument and `numpy.arange(0, .1, .01)` as the second argument. This second call is a function from the `numpy` module (imported as `np`) that creates a list of numbers from 0 to 0.1, incrementing by 0.01. Overwrite grid with the result.
4. Update grid again by calling the `.addGrid()` method a second time create a grid for `lr.elasticNetParam` that includes only the values [0, 1].
5. Call the `.build()` method on grid and overwrite it with the output.

In [5]:
from pyspark.ml import tuning as tune

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0,1])

# Build the grid
grid = grid.build()

Awesome! That's the last ingredient in your cross validation recipe!

### Make the validator
  
The submodule `pyspark.ml.tuning` also has a class called `CrossValidator` for performing cross validation. This `Estimator` takes the modeler you want to fit, the grid of hyperparameters you created, and the evaluator you want to use to compare your models.
  
The submodule `pyspark.ml.tune` has already been imported as tune. You'll create the CrossValidator by passing it the logistic regression `Estimator` `lr`, the parameter grid, and the evaluator you created in the previous exercises.
  
---
  
1. Create a `CrossValidator` by calling `tune.CrossValidator()` with the arguments:
- `estimator=lr`
- `estimatorParamMaps=grid`
- `evaluator=evaluator`
2. Name this object `cv`.

In [6]:
# Create the CrossValidator
cv = tune.CrossValidator(
    estimator=lr,
    estimatorParamMaps=grid,
    evaluator=evaluator
)

Great job! You're almost a machine learning pro!

### Fit the model(s)
  
You're finally ready to fit the models and select the best one!
  
Unfortunately, cross validation is a very computationally intensive procedure. Fitting all the models would take too long.
  
To do this locally you would use the code:
  
```python
# Fit cross validation models
models = cv.fit(training)

# Extract the best model
best_lr = models.bestModel
```
  
Remember, the training data is called `training` and you're using `lr` to fit a logistic regression model. Cross validation selected the parameter values `regParam=0` and `elasticNetParam=0` as being the best. These are the default values, so you don't need to do anything else with `lr` before fitting the model.
  
---
  
1. Create `best_lr` by calling `lr.fit()` on the training data.
2. Print `best_lr` to verify that it's an object of the `LogisticRegressionModel` class.

In [7]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# Dataframe flights table
flights = (spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("../_datasets/flights_small.csv"))
flights.createOrReplaceTempView("flights")  # Created table 1

# Dataframe planes table
planes = (spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .load('../_datasets/planes.csv'))
planes.createOrReplaceTempView("planes")    # Created table 2

# Rename year column to plane_year
planes = planes.withColumnRenamed('year', 'plane_year')

# Join the DataFrames
model_data = flights.join(planes, on='tailnum', how='leftouter')

# Cast the columns to integers
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast('integer'))
model_data = model_data.withColumn('air_time', model_data.air_time.cast('integer'))
model_data = model_data.withColumn('month', model_data.month.cast('integer'))
model_data = model_data.withColumn('plane_year', model_data.plane_year.cast('integer'))

# Create the column plane_age
model_data = model_data.withColumn('plane_age', model_data.year - model_data.plane_year)

# Create is_late
model_data = model_data.withColumn('is_late', model_data.arr_delay > 0)

# Convert to an integer
model_data = model_data.withColumn('label', model_data.is_late.cast('integer'))

# Remove missing values
model_data = model_data.filter(
    'arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL'
)

# Create StringIndexer
carr_indexer = StringIndexer(inputCol='carrier', outputCol='carrier_index')

# Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol='carrier_index', outputCol='carrier_fact')

# Create a StringIndexer
dest_indexer = StringIndexer(inputCol='dest', outputCol='dest_index')

# Create a OneHotEncoder
dest_encoder = OneHotEncoder(inputCol='dest_index', outputCol='dest_fact')

# Make a VectorAssembler
vec_assembler = VectorAssembler(
    inputCols=['month', 'air_time', 'carrier_fact', 'dest_fact', 'plane_age'],
    outputCol='features')

# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

# Fit and transform the data
piped_data = flights_pipe.fit(model_data).transform(model_data)

# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])

                                                                                

In [8]:
# Call lr.fit()
best_lr = lr.fit(training)

# Print best_lr
print(best_lr)

23/08/27 15:01:16 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

LogisticRegressionModel: uid=LogisticRegression_6117bb848e78, numClasses=2, numFeatures=81


Wow! You fit your first Spark model!

### Evaluating binary classifiers
  
For this course we'll be using a common metric for binary classification algorithms call the AUC, or area under the curve. In this case, the curve is the ROC, or receiver operating curve. The details of what these things actually measure isn't important for this course. All you need to know is that for our purposes, the closer the AUC is to one (1), the better the model is!
  
---
  
If you've created a perfect binary classification model, what would the AUC be?
  
Possible Answers
  
- [ ] -1
- [x] 1
- [ ] 0
- [ ] .5
  
Great job! An AUC of one represents a model that always perfectly classifies observations.

### Evaluate the model
  
Remember the test data that you set aside waaaaaay back in chapter 3? It's finally time to test your model on it! You can use the same evaluator you made to fit the model.
  
---
  
1. Use your model to generate predictions by applying `best_lr.transform()` to the test data. Save this as `test_results`.
2. Call `evaluator.evaluate()` on `test_results` to compute the AUC. Print the output.

In [9]:
# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

                                                                                

0.7063483719812562


                                                                                

Congratulations! What do you think of the AUC? Your model isn't half bad! You went from knowing nothing about Spark to doing advanced machine learning. Great job on making it to the end of the course! The next steps are learning how to create large scale Spark clusters and manage and submit jobs so that you can use models in the real world. Remember, Spark is still being actively developed, so there's new features coming all the time!