# ML Pipelines

We are going to focus on preparing a data set by cleaning the data, creating new features, which are fields that will serve in training the model later, and then looking at selecting a curated set of features based on how promising they look.

Lab based on book: Data Analysis with Python and PySpark, Jonathan Rioux

⚠: This is not a class about in machine learning! For more about ML, look at Real-World Machine Learning by Henrik Brink, Joseph W. Richards, and Mark Fetherolf (Manning, 2016).

## 1. Reading, exploring, and preparing our machine learning data set

We will start with the ingestion and exploration of our machine learning data set. More specifically, we’ll review the content of our data frame, look at incoherences, and prepare our data for feature engineering. For our ML model, I chose a data set of 20,057 dish names that contain 680 columns characterizing the ingredient list, the nutritional content, and the category of the dish. 

Dataset source: https://www.kaggle.com/datasets/hugodarwood/epirecipes

👍 **Our goal here is to predict if this dish is a dessert**

#### 1.1 Import and clean

Import the dataset into the cluster:

- Click Data Icon Data in the sidebar.
- Click the DBFS button at the top of the page.
- Click the Upload button at the top of the page.
- On the Upload Data to DBFS dialog, optionally select a target directory or enter a new one.
- In the Files box, drag and drop or use the file browser to select the local file to upload.

In [0]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [0]:
# File location and type
file_location = "/FileStore/epi_r.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
food = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(food)

In [0]:
print(food.count(), len(food.columns))

In [0]:
food.printSchema()

Some of the columns contains undesirable characters, such as a # (`#cakeweek`), or a space (`30 days of groceries`), or some invalid characters (`bon app��tit`)!

**Having a consistent column naming scheme will make subsequent code easier to write, read, and maintain in the long run.**

We will remove anything that isn’t a letter or a number, standardize the spaces and other separators to use the underscore (_) character, andreplace the ampersand (&) with its English equivalent and.

To apply our function `sanitize_column_name`, we used `toDF()`: when used to rename the colum ns of a data frame, takes as parameters N strings, where N is the number of columns in our data frame. Since we can access the columns of our data frame via `food.columns`, a quick list comprehension takes care of renaming everything. We also unpack my list into distinct attributes using the star operator.

In [0]:
def sanitize_column_name(name):
    
    """Drops unwanted characters from the column name.
    We replace spaces, dashes and slashes with underscore, and only keep alphanumeric characters."""
    
    answer = name
    
    for i, j in ((" ", "_"), ("-", "_"), ("/", "_"), ("&", "and")):
        answer = answer.replace(i, j)
    return "".join(

        [
            char
            for char in answer
        if char.isalpha() or char.isdigit() or char == "_"
        ]
    )

    
food = food.toDF(*[sanitize_column_name(name) for name in food.columns])

In [0]:
food.printSchema()

#### 1.2 Explore and create features

Exploring data for machine learning is similar to exploring data when performing a transformation in the sense that we manipulate the data to uncover some inconsistencies, patterns, or gaps

In [0]:
display(food)

Identifying your variables as categorical (with the proper subtype) or continuous has a
direct impact on the data preparation and, down the road, the performance of your ML
model. Looking at our summary data, it seems that we have a lot of potentially binary columns.
In the case of the clove column, the minimum and three quartile values are all
zero. To verify this, we’ll group the entire data frame and collect a set of distinct values.
If we have only two values for a given column, binary it is!

In [0]:
#Is this colunm binary?
#This CMD may take up to 1 min to run.

import pandas as pd

pd.set_option("display.max_rows", 1000)

is_binary = food.agg(
    *[
        (F.size(F.collect_set(x)) == 2).alias(x) 
        #collect_set() will create a set of the distinct values as an array, and size() returns the length of the array. Two distinct values means that it’s probably binary.
        for x in food.columns
    ]
).toPandas()

is_binary.unstack()
#unstack un-pivots a pandas DataFrame, making a wide data frame easier to analyze in the terminal.

#### 1.3 Data mishapes and feature set

Some columns are not
consistent compared to other related (binary) columns. We are going to explore the content of
the suspicious columns, address the gaps, and continue our exploration. We aim a
more consistent, more robust feature set that will lead to a better ML model.

In [0]:
food.agg(*[F.collect_set(x) for x in ("cakeweek", "wasteless")]).show(1, False)

In [0]:
food.where("cakeweek > 1.0 or wasteless > 1.0").select("title", "rating", "wasteless", "cakeweek", food.columns[-1]).show()

Our data set had a bunch of quotation marks along with some commas that confused PySpark’s parser. Since
we have a small number of records affected, I did not bother with realigning the data
and deleted them outright. I keep the null values as well.

In [0]:
food = food.where(
    (
        #"if cakeweek and wasteless are both either 0.0, 1.0, or null."
        F.col("cakeweek").isin([0.0, 1.0])
        | F.col("cakeweek").isNull()
    )
    & (
        F.col("wasteless").isin([0.0, 1.0])
        | F.col("wasteless").isNull()
    )
)

In [0]:
#we expect 3 less records:
print(food.count(), len(food.columns))

Now that we have identified two binary-in-hiding feature columns, we can identify our feature set and our target variable. The target (or label) is the column containing
the value we want to predict. In our case, the column is aptly named `dessert`.

Let's create all-caps variables containing the four main sets of columns I
care about:
- The identifiers, which are the column(s) that contain the information unique to
each record
- The targets, which are the column(s) (most often one) that contain the value we
wish to predict
- The continuous columns, containing continuous features
- The binary columns, containing binary features

In [0]:
IDENTIFIERS = ["title"]

CONTINUOUS_COLUMNS = [
    "rating",
    "calories",
    "protein",
    "fat",
    "sodium",
]

TARGET_COLUMN = ["dessert"]

BINARY_COLUMNS = [
    x
    for x in food.columns
    if x not in CONTINUOUS_COLUMNS
    and x not in TARGET_COLUMN
    and x not in IDENTIFIERS
]

#### 1.4 Find and delete useless records and input binary features

I will removing two types of records:
- Those where all the features are null
- Those where the target is null

Furthermore, we will impute, meaning that we will provide a default value for, our
binary features. Since each of them are 0/1, where zero is False and one is True, we
equate null to False and fill zero as a default value (not ideal, but reasonable)

In [0]:
#FIRST: remove records with only null values

food = food.dropna(
    how="all",
    subset=[x for x in food.columns if x not in IDENTIFIERS],
)

food = food.dropna(subset=TARGET_COLUMN)

print(food.count(), len(food.columns))

(we lost 5 records, thats ok!)

In [0]:
#SECOND: impute a default value (0.0) to all binary columns

food = food.fillna(0.0, subset=BINARY_COLUMNS)

print(food.where(F.col(BINARY_COLUMNS[0]).isNull()).count())

#### 1.5 Cleaning continuous variables (and extreme values)

We are going to:
- cast the variables and delete wrong values
- review the distribution of numerical columns to account for
extreme or unrealistic values.

⚠ The next steps are not a blueprint to be applied regardless
of the situation/dataset.

If we go back to the schema in section 1.1, because of some data misalignment, PySpark
inferred the type of the rating and calories column as a string, where they should
have been numerical.

We are going to create an UDF (user defined function) to take a string column and return
True if the value is a floating-point number (or a null—PySpark will allow null values
in a Double column) and False otherwise.

More about UDF: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.udf.html

The function returns True right off the bat if the value is null. If the value is a non-null value, it casts the value as a Python float. If it fails, it returns False.

👍TIP: If you want to negate a whole expression in a filter() method, PySpark provides the ~ operator.

In [0]:
#FIRST: cast

from typing import Optional

@F.udf(T.BooleanType())
def is_a_number(value: Optional[str]) -> bool:
    if not value:
        return True
    try:
        _ = float(value) #We used the underscore to tell the code to perform the work, but to not care about the result.
    except ValueError:
        return False
    return True


food.where(~is_a_number(F.col("rating"))).select(*CONTINUOUS_COLUMNS).show()

We have a single remaining rogue record that we remove in the next CMD before casting the columns as a double. Our continuous feature columns are now all numerical.

In [0]:
for column in ["rating", "calories"]:
    food = food.where(is_a_number(F.col(column)))
    food = food.withColumn(column, F.col(column).cast(T.DoubleType()))

print(food.count(), len(food.columns)) #we should lose just one record!

We need to use our judgment for the best course of
action to address this data quality issue. I could filter the records once more, but this
time, I’ll cap the values to the 99th percentile, avoiding extreme (and potentially
wrong) values.

In [0]:
#SECOND: Look for extreme values
food.select(*CONTINUOUS_COLUMNS).summary(
"mean",
"stddev",
"min",
"1%",
"5%",
"50%",
"95%",
"99%",
"max",
).show()

To make things easier, we are going to **hardcode** the maximum acceptable values for each column, and
then I apply those maximums iteratively to my food data frame

In [0]:
maximum = {
    "calories": 3203.0,
    "protein": 173.0,
    "fat": 207.0,
    "sodium": 5661.0,
}

for k, v in maximum.items():
    food = food.withColumn(
        k,
        F.when(F.isnull(F.col(k)), F.col(k)).otherwise( 
            F.least(F.col(k), F.lit(v))
        ),
    )

In [0]:
food.display()

#### 1.6 Remove rare binary features

We are going to remove features that are either too rare or too frequent. Binary features with only a few zeroes or ones are not
helpful in classifying a recipe as a dessert: if every recipe (or no recipe) has a certain
feature as true, then that feature does not discriminate properly, meaning that our
model has no use for it.

In last section, we computed the sum of each
binary column; this will give us the numbers of 1.0's since the sum of the ones is equal to their count.

For this model, let's use 10 as threshold.

In [0]:
inst_sum_of_binary_columns = [
    F.sum(F.col(x)).alias(x) for x in BINARY_COLUMNS
]

sum_of_binary_columns = (
    food.select(*inst_sum_of_binary_columns).head().asDict()  # Since a row is just like a Python dictionary, I can bring the row back to the driver and process it locally.
)

num_rows = food.count()
too_rare_features = [
    k
    for k, v in sum_of_binary_columns.items()
    if v < 10 or v > (num_rows - 10)
]

print('count of variables to remove: ', len(too_rare_features))

print('\n\n\nvariables to remove: \n',too_rare_features)

#Rather than deleting the columns from the data frame, I just remove them from my BINARY_COLUMNS list.
BINARY_COLUMNS = list(set(BINARY_COLUMNS) - set(too_rare_features))

print('\n\n\n\nvariable kept: \n',BINARY_COLUMNS)

(We removed 167 features that are either too rare or too frequent.)

## 2. Feature Engineering

Now we are going into two important steps of model building: feature creation (also called
feature engineering) and refinement.

Our goal:
- Creating a few custom features using our continuous feature columns
- Measuring correlation over original and generated continuous features

#### 2.1 Customs features

In PySpark, creating
new features is done simply by creating columns with the information you want;
this means you can create simple or highly sophisticated features.

Just as an example, we’ll take the `protein` and `fat` columns representing
the quantity (in grams) of protein and fat in the recipe, respectively. With the information
in those two columns, I create two features representing the percentage of calories
attributed to each macro nutriment.

⚠ PAY ATENTION TO MULTICOLLINEARITY! When
using a model that has a linear component, such as the linear regression and the
logistic regression, this will cause problems with your model’s accuracy

In [0]:
food = food.withColumn(
    "protein_ratio", F.col("protein") * 4 / F.col("calories")  # <1>
).withColumn(
    "fat_ratio", F.col("fat") * 9 / F.col("calories")
) #There are 4 kcal per grams of protein and 9 kcal per grams of fat.

food = food.fillna(0.0, subset=["protein_ratio", "fat_ratio"])

CONTINUOUS_COLUMNS += ["protein_ratio", "fat_ratio"]

#### 2.2 Feature correlation

Look at the correlation
between our set of continuous may help us improve our model accuracy and explainability

In this section we are going to address:
1. How PySpark computes the correlation between variables and provides the results in a matrix using Vector and Matrix objects
2. How we can extract values from them. 
3. The correlation between our continuous variables and made a decision about their inclusion in our first model

For computing correlation between variables, PySpark provides the `Correlation` object.
Correlation has a single method, `corr`, that computes the correlation between features
in a `Vector`. Vectors are like PySpark arrays but with a special representation optimized
for ML work.
We are going to use the `VectorAssembler` transformer on the food data frame to create a new column,
continuous_features, that contains a Vector of all our continuous features.
A transformer is a preconfigured object that, as its name indicates, transforms a
data frame. Independently, it looks like unnecessary complexity, but it shines when
applied within a pipeline.

In [0]:
from pyspark.ml.feature import VectorAssembler

continuous_features = VectorAssembler(
    inputCols=CONTINUOUS_COLUMNS, outputCol="continuous_features"
)

vector_food = food.select(CONTINUOUS_COLUMNS)
for x in CONTINUOUS_COLUMNS:
    vector_food = vector_food.where(~F.isnull(F.col(x))) 

vector_variable = continuous_features.transform(vector_food)

vector_variable.select("continuous_features").show(3, False)

#Correlation will not work well if you blend categorical and/or binary features together.

In [0]:
vector_variable.select("continuous_features").printSchema()

Now, we are going to apply the `Correlation.corr()` function on the continuous feature
vector and export the correlation matrix into an easily interpretable pandas Data-
Frame. PySpark returns the correlation matrix in a `DenseMatrix` column type, which
is like a two-dimensional vector. In order to extract the values in an **easy-to-read format**:
1. We extract a single record as a list of Row using head().
2. A Row is like an ordered dictionary, so we can access the first (and only) field
containing our correlation matrix using list slicing.
3. A DenseMatrix can be converted into a pandas-compatible array by using the
toArray() method on the matrix.
4. We can directly create a pandas DataFrame from our Numpy array. Inputting
our column names as an index (in this case, they’ll play the role of “row names”)
makes our correlation matrix very readable.

In [0]:
from pyspark.ml.stat import Correlation

#The corr method takes a data frame and a Vector column reference as a parameter and generates a single-row, single column data frame containing the correlation matrix.
correlation = Correlation.corr(
    vector_variable, "continuous_features"
)

correlation.printSchema()

#DenseMatrix is not easily accessible by itself.

In [0]:
correlation_array = correlation.head()[0].toArray()

correlation_pd = pd.DataFrame(
    correlation_array,  
    index=CONTINUOUS_COLUMNS,  
    columns=CONTINUOUS_COLUMNS, 
)

print(correlation_pd.iloc[:, :6])

**There is no absolute threshold for keeping or removing correlated variables.**

We see high correlation between `sodium`,
`calories`, `protein`, and `fat`. Surprisingly, we see little correlation between our custom
features and the columns that contributed to their creation

## 3. Feature Preparation

This section provides an overview of transformers and estimators in the context of feature
preparation. We use transformers and estimators as an abstraction over common
operations in machine learning modeling. We explore two relevant examples of transformers
and estimators:
-  Null imputation, where we provide a value to replace null occurrences in a column
(e.g., the mean)
-  Scaling features, where we normalize the values of a column, so they are on a
more logical scale (e.g., between zero and one)


The best way to think about a `transformer` is by translating its behavior into a
`function`. Below we compare a `VectorAssembler` to a `function` assemble_
vector() that performs the same work, which is to create a Vector named after the
argument to outputCol, which contains all the values in the columns passed to
inputCols. Don’t focus on the actual work here, but more on the mechanism
of application.

![image](files/tables/transformer.jpg)

The transformer object has a two-staged process. 
- First, when instantiating the
transformer, we provide the parameters necessary for its application, but not the data
frame on which it’ll be applied. This echoes the separation of data and instructions we
saw in previously Labs. 
- Then, we use the instantiated transformer’s transform() method on
the data frame to get a transformed data frame.


This separation of instructions and data is key in creating serializable ML pipelines,
which leads to easier ML experiments and model portability

#### 3.1 Imputer estimator

In this section, we cover the Imputer estimator and introduce the concept of an estimator.
Estimators are the main abstraction used by Spark for any data-dependent transformation,
including ML models, so they are pervasive in any ML code using PySpark.

We want our Imputer to impute the mean value to every record in the
calories, protein, fat, and sodium columns when the record is null.

More information in section Imputer: https://spark.apache.org/docs/latest/ml-features

In [0]:
from pyspark.ml.feature import Imputer

OLD_COLS = ["calories", "protein", "fat", "sodium"]
NEW_COLS = ["calories_i", "protein_i", "fat_i", "sodium_i"]

imputer = Imputer(
    strategy="mean",  
    inputCols=OLD_COLS,  
    outputCols=NEW_COLS,  
)

imputer_model = imputer.fit(food)

CONTINUOUS_COLUMNS = (
    list(set(CONTINUOUS_COLUMNS) - set(OLD_COLS)) + NEW_COLS  
)

In [0]:
#Let's check!

food_imputed = imputer_model.transform(food)

food_imputed.where("calories is null").select("calories", "calories_i").show(5, False)

#### 3.2 Scaling features

This section covers variable scaling using the MinMaxScaler transformer. Scaling variables
means performing a mathematical transformation on the variables so that they
are all on the same numeric scale.

To choose the right scaling algorithm, we need to look at our variables as a whole.
Since we have so many binary variables, it is convenient to have every variable be
between zero and one. Our protein_ratio and fat_ratio are ratios between zero
and one too!

In [0]:
from pyspark.ml.feature import MinMaxScaler

CONTINUOUS_NB = [x for x in CONTINUOUS_COLUMNS if "ratio" not in x]

continuous_assembler = VectorAssembler(
    inputCols=CONTINUOUS_NB, outputCol="continuous"
)

food_features = continuous_assembler.transform(food_imputed)

continuous_scaler = MinMaxScaler(
    inputCol="continuous",
    outputCol="continuous_scaled",
)

food_features = continuous_scaler.fit(food_features).transform(
    food_features
)

food_features.select("continuous_scaled").show(3, False)


👍 TIP check the pyspark.ml.feature module for other scalers. https://spark.apache.org/docs/2.3.1/api/python/pyspark.ml.html

## 4. Finally, ML Pipeline

We may say an ML pipeline is
an ordered list of transformers and estimators.

#### 4.1 Transformers and estimators


TRANSFORMERS:

Transformer’s sole purpose—through its `transform()` method—is to take
the values in `inputCols` (assembled values) and return a single column, named
`outputCol`, that contains a vector of all the assembled values.
A transformer has a set of explicit parameters (called
Params in the Spark language) that drive its behavior.
Some parameters have a default value in case you
don’t define a value yourself (e.g., handleInvalid).

The most important method of a
transformer is the `transform( )`
method. This method takes a data
frame as an input and returns
a transformed data frame.

Example: `VectorAssembler` is a transformer. Params: inputCols, outputCol, handleInvalid

If you look at the signature for VectorAssembler, you’ll see an asterisk at the beginning
of the parameters list:

`` class pyspark.ml.feature.VectorAssembler(*, inputCols=None,
outputCol=None, handleInvalid='error')`` 

In Python, every parameter after the asterisk (*) is called a keyword-only argument,
meaning that we need to mention the keyword. For instance, we couldn’t do Vector-
Assembler("input_column", "output_column"). For more: https://peps.python.org/pep-3102/

In [0]:
print(continuous_assembler.outputCol)

In [0]:
print(continuous_assembler.getOutputCol())

print('\n', continuous_assembler.explainParam("outputCol"))

print('\n', continuous_assembler.explainParams())

ESTIMATOR:

Where a transformer transforms an
input data frame into an output data frame, an estimator is fitted on an input data
frame and returns an output transformer.

We focus on estimator usage through the `fit()`
method (versus `transform()` for the transformer), which is really the only notable
difference for the end user. The `fit()` method takes a data
frame as an input and returns a parametrized
transformer as an output.

Just like a transformer, an estimator has a set
of explicit parameters (called Params in the
Spark language) that drive its behavior. Some
parameters have a default value in case you
don’t define a value yourself (e.g., min/max).

Example: `MinMaxScaler` is a estimator. Params: `min`, `max`, `inputcCol`, `outputCol`.

This fit()/transform() approach applies for estimators that are far more complex
than MinMaxScaler. Case in point: ML models are actually implemented as estimators
in Spark.

#### 4.2 Building a complete ML pipeline

This section we will introduce the `Pipeline` object as an estimator with a special purpose:
running other transformers and estimators.

Pipelines build on transformers and estimators
to make training, evaluating, and optimizing ML models much clearer and
more explicit.

ML pipelines are implemented through the Pipeline class, which
is a specialized version of the estimator. The Pipeline estimator has only one
Param, called stages, which takes a list of transformers and estimators.

In [0]:
#Just as a matter of completeness, we are going to repeat the estimators and transformers here, to consolidate the code 

from pyspark.ml import Pipeline
import pyspark.ml.feature as MF

imputer = MF.Imputer(  
    strategy="mean",
    inputCols=["calories", "protein", "fat", "sodium"],
    outputCols=["calories_i", "protein_i", "fat_i", "sodium_i"],
)

continuous_assembler = MF.VectorAssembler(  
    inputCols=["rating", "calories_i", "protein_i", "fat_i", "sodium_i"],
    outputCol="continuous",
)

continuous_scaler = MF.MinMaxScaler(  
    inputCol="continuous",
    outputCol="continuous_scaled",
)


#The food_pipeline pipeline contains three stages, encoded in the stages Param
food_pipeline = Pipeline(  
    stages=[imputer, continuous_assembler, continuous_scaler]
)

In practical terms, since the pipeline is an estimator, it has a `fit()` method that generates
a PipelineModel. Under the hood, the pipeline applies each stage in order, calling
the appropriate method depending on if the stage is a transformer (`transform()`)
or an estimator (`fit()`). By wrapping all of our individual stages into a pipeline, we
only have one method to call, `fit()`, knowing that PySpark will do the right thing to
yield a PipelineModel.

If the stage is
a transformer, it gets applied on the data
and then passed as a stage in the
PipelineModel. If the stage is an estimator,
it gets fitted on the data and the resulting
model gets passed as a stage in the
PipelineModel.

##### 4.2.1 Final dataset (vector column type)

This section will cover the assembly into a final feature vector, the last stage before
sending our data for training.

PySpark requires all the data fed into a machine learning
estimator, as well as some other estimators like the MinMaxScaler, to be in a single vector
column.

REMEMBER: We already know how to assemble data into a vector: use the `VectorAssembler`.

We will assemble all of our BINARY_COLUMNS, the _ratio columns, and the continuous_
scaled vector column from our pipeline. PySpark will do the right thing when assembling
vector columns in another vector: rather than getting nested vectors, the assembly
step will flatten everything into a single, ready-to-use vector.

In [0]:
preml_assembler = MF.VectorAssembler(
    inputCols=BINARY_COLUMNS 
    + ["continuous_scaled"]
    + ["protein_ratio", "fat_ratio"],
    outputCol="features",
)

food_pipeline.setStages(
    [imputer, continuous_assembler, continuous_scaler, preml_assembler]
)

food_pipeline_model = food_pipeline.fit(food)  # food_pipeline_model becomes a PipelineModel
food_features = food_pipeline_model.transform(food) 

Our data frame is ready for machine learning! We have a number of records, each with
- A target (or label ) column, dessert, containing a binary input (1.0 if the recipe
is a dessert, 0.0 otherwise)
- A vector of features, called features, containing all the information we want to
train our machine learning model with

In [0]:
food_features.select("title", "dessert", "features").show(5, truncate=30)

We provide 513 distinct features (see the 513 at the beginning of the features column value) with a large number of zeroes. This is
called a sparse features set. When storing vectors, PySpark has two choices for representing
vectors:
-  A dense representation, where a Vector in PySpark is simply a NumPy (a highperformance
multidimensional array library for Python) single-dimensional
array object
-  A sparse representation, where a Vector in PySpark is an optimized sparse vector
compatible with the SciPy (a scientific computing library in Python)
scipy.sparse matrix.

For more: https://www.youtube.com/watch?v=oGwEv82ifrE


PySpark allows for a metadata dictionary to be attached to a
column, let's have a look:

In [0]:
print(food_features.schema["features"])

In [0]:
print(food_features.schema["features"].metadata)

# Since they originate from a VectorAssembler, PySpark gives scaled variables a generic name, but you can retrieve their name from the original vector column (here continuous_assembled) as needed.

##### 4.2.2 Trainning the model (using a Logistics Regression)

It is time do add a ML model to our Pipeline!

ATTENTION: in real world, you need to choose the correct model to apply to the business problem
you are trying to solve

In our case, because our target is binary (0.0 or 1.0), we restrict ourselves to a classification algorithm. The logistic regression algorithm, despite its name, is a classification algorithm that
belongs to the family of generalized linear models.

Before integrating our logistic regression into our pipeline, we need to create the `estimator`.
This `estimator` is called `LogisticRegression` and comes from the `pyspark.ml
.classification` module. The API documentation page for the LogisticRegression: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LogisticRegression.html

In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="features", labelCol="dessert", predictionCol="prediction"
)

food_pipeline.setStages(
    [
        imputer,
        continuous_assembler,
        continuous_scaler,
        preml_assembler,
        lr,  # <1>
    ]
)

#We just setted three Params:
# - featuresCol: the column containing our features vector
# - labelCol: the column containing our label (or target)
# - predictionCol: the column that will contain the predictions of our model

Below, we `fit()` our pipeline. Before doing so, we need to split our data set into two portions using `randomSplit()`: one for training, which
we feed to our pipeline, and one for testing, which is what we use to evaluate our
model fit.

But before fitting our pipeline, we cache() the training data frame. We do this because ML uses the data frame
repeatedly, so caching in memory provides an increase in speed if your cluster **has
enough memory**.

*Although PySpark will use the same seed, which should guarantee
that the split will be consistent across runs, there are some cases where PySpark
will break that consistency. If you want to be 100% certain about your splits,
split your data frame, write each one to disk, and then read them from the
disk location.*

In [0]:
#This CMD may take up to 10min

train, test = food.randomSplit([0.7, 0.3], 42) #42 is a seed

train.cache()

food_pipeline_model = food_pipeline.fit(train)
results = food_pipeline_model.transform(test)

In [0]:
results.select("prediction", "rawPrediction", "probability").show(3, False)

#### 4.3 Evaluate and optimize

In this section, we perform a reviewing of our model results and tuning their implementation.

#####4.3.1 Assessing model accuracy: Confusion matrix and evaluator object

In [0]:
#This CMD may take up to 1 min

results.groupby("dessert").pivot("prediction").count().show()

#The confusion matrix shows that our data set has a lot more non-desserts than desserts. In the classification world this is called an imbalanced data set

In Spark 3.1, we now have access to a new `LogisticRegressionSummary` object that avoids the trip to
the RDD world.

We need to first extract our fitted model
from the pipeline model. For this, we can use the stages attribute of `pipeline_
food_model` and access just the last item. From that model, called `lr_model` in the CMD below, we call `evaluate()` on the results data set. `evaluate()` will error out any prediction
columns that exist, so I simply give the relevant ones (dessert, features) to
it. It’s a small price to pay to avoid computing the metrics by hand. Note that PySpark
does not know which label we consider positive and negative. Because of this, the precision
and recall are accessible through `precisionByLabel` and `recallByLabel`,
which both return lists of precision/recall for each label in order.

In [0]:
lr_model = food_pipeline_model.stages[-1]
metrics = lr_model.evaluate(results.select("title", "dessert", "features"))

# LogisticRegressionTrainingSummary

print(f"Model precision: {metrics.precisionByLabel[1]}") 
print(f"Model recall: {metrics.recallByLabel[1]}")

The receiver operating characteristic curve (ROC) is another common metric used when evaluating binary classification
models. 

The ROC curve is obtained through the BinaryClassificationEvaluator object.
Below we instantiate the said object, asking explicitly for the areaUnderROC metric.

In [0]:
#As homework, you may try build this ROC curve using matplotlib

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="dessert",  # <1>
    rawPredictionCol="rawPrediction",  # <1>
    metricName="areaUnderROC",
)

accuracy = evaluator.evaluate(results)
print(f"Area under ROC = {accuracy} ")

##### 4.3.2 Optimizing hyperparameters with cross-validation

By fine-tuning some aspects of the model training (how Spark builds the
fitted model), we can hope to yield better model accuracy. For this, we use a technique
called cross-validation. Cross-validation resamples the data set into training and
testing sets to assess the ability of the model to generalize over new data.

To build the set of hyperparameters we wish to evaluate our model against, we use the
ParamGridBuilder, which assists in creating a Param Map

In [0]:
from pyspark.ml.tuning import ParamGridBuilder

grid_search = (
    ParamGridBuilder() 
    .addGrid(lr.elasticNetParam, [0.0, 1.0]) 
    .build()
)

print(grid_search)

the output may be a messy, so to facilitate the reading:

 [
 
     {Param(parent='LogisticRegression_14302c005814',
            name='elasticNetParam',
            doc='...'): 0.0},  <4>
     {Param(parent='LogisticRegression_14302c005814',
            name='elasticNetParam',
            doc='...'): 1.0}  <4>

]

Now onto cross-validation. PySpark provides out-of-the-box K-fold crossvalidation
through the CrossValidator class

In [0]:
#ATTENTION: This command may take 1h or more. Only run the cell if you have patience or can do something else in the meantime. 

from pyspark.ml.tuning import CrossValidator

cv = CrossValidator(
    estimator=food_pipeline,
    estimatorParamMaps=grid_search,
    evaluator=evaluator,
    numFolds=3,
    seed=13,
    collectSubModels=True,
)

# cv_model = cv.fit(train)

# print(cv_model.avgMetrics)

In [0]:
# pipeline_food_model = cv_model.bestModel

#### 4.3 Extracting the coefficientes

This section covers the extraction of our model features and their coefficients. We use
those coefficients to get a sense of the most important features of the model and plan
some improvements for a second iteration.

In [0]:
# import pandas as pd

# feature_names = ["(Intercept)"] + [ x["name"]
#     for x in (
#         food_features
#         .schema["features"]
#         .metadata["ml_attr"]["attrs"]["numeric"]
#     )
# ]

# feature_coefficients = [lr_model.intercept] + list(
#     lr_model.coefficients.values
# )


# coefficients = pd.DataFrame(
#     feature_coefficients, index=feature_names, columns=["coef"]
# )

# coefficients["abs_coef"] = coefficients["coef"].abs()

# print(coefficients.sort_values(["abs_coef"]))

A coefficient close to zero, like kirsch, lemon, and food_processor, means that
this feature is not very predictive of our model. On the flip side, a very high or low
coefficient, like cauliflower, horseradish, and quick_and_healthy, means that this
feature is highly predictive.

# Exercise

We are going to create a model to predict the flight delay over 15 minutes (```ARR_DEL15```) using other attributes - such as, airport code, career, and various weather conditions.

Before starting, you must download the dataset `flight_weather.csv` You will find the dataset on Moodle. Since this is a tabular dataset, you can go to ``Catalog``, then ``tables``. There you can create the table (using the UI option is fine).

> Note : For more accurate learning in classification, use LightGBM classifier in SynapseML library (formerly MMLSpark library).<br>
> Here I use built-in DecisionTree Classifier in MLlib.

*This exercise was based on https://github.com/tsmatz/azure-databricks-exercise*

#### 1.1 Import and clean

In [0]:
#IMPORT DATASET

# File location and type
file_location = "/FileStore/flight_weather.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

In this dataset,

`ARR_DEL15` : 1 when the flight is delayed over 15 minutes, 0 otherwise.

`XXXOrigin` : Weather conditions in departure airport.

`XXXDest` : Weather conditions in destination airport.

In [0]:
# Code here: NUMBER OF ROWS/COLUMNS

In [0]:
# Code here: PRINT SCHEMA

#### 1.2 Explore the features

You may like to explore the feature by using the graphics in the table above

#### 1.3 Data mishapes and feature set

Mark as "delayed over 15 minutes" if it's canceled.

In [0]:
# CODE HERE

Remove flights if it's diverted.

In [0]:
# CODE HERE

In [0]:
from pyspark.sql.types import IntegerType

df = df \
    .withColumns(
        {
            "RelativeHumidityOrigin": df["RelativeHumidityOrigin"].cast(IntegerType()),
            "AltimeterOrigin": df["AltimeterOrigin"].cast(IntegerType()),
            "DryBulbCelsiusOrigin": df["DryBulbCelsiusOrigin"].cast(IntegerType()),
            "WindSpeedOrigin": df["WindSpeedOrigin"].cast(IntegerType()),
            "VisibilityOrigin": df["VisibilityOrigin"].cast(IntegerType()),
            "DewPointCelsiusOrigin": df["DewPointCelsiusOrigin"].cast(IntegerType()),
            "RelativeHumidityDest": df["RelativeHumidityDest"].cast(IntegerType()),
            "AltimeterDest": df["AltimeterDest"].cast(IntegerType()),
            "DryBulbCelsiusDest": df["DryBulbCelsiusDest"].cast(IntegerType()),
            "WindSpeedDest": df["WindSpeedDest"].cast(IntegerType()),
            "VisibilityDest": df["VisibilityDest"].cast(IntegerType()),
            "DewPointCelsiusDest": df["DewPointCelsiusDest"].cast(IntegerType()),
            "ARR_DEL15", df["ARR_DEL15"].cast(IntegerType())
        }
    )
 

#### 1.4 Find and delete useless records and input binary features

Narrow to required columns.

In [0]:
df = df.select(
  "ARR_DEL15",
  "MONTH",
  "DAY_OF_WEEK",
  "UNIQUE_CARRIER",
  "ORIGIN",
  "DEST",
  "CRS_DEP_TIME",
  "CRS_ARR_TIME",
  "RelativeHumidityOrigin",
  "AltimeterOrigin",
  "DryBulbCelsiusOrigin",
  "WindSpeedOrigin",
  "VisibilityOrigin",
  "DewPointCelsiusOrigin",
  "RelativeHumidityDest",
  "AltimeterDest",
  "DryBulbCelsiusDest",
  "WindSpeedDest",
  "VisibilityDest",
  "DewPointCelsiusDest")

Drop rows which has null value in columns.

In [0]:
# Code here: DROPNA

In [0]:
# Code here: SHOW RESULTS AFTER DROPNA

#### 1.5 Cleaning continuous variables (and extreme values)

Look for extreme values

In [0]:
summ = df.select("ARR_DEL15", "AltimeterDest", "WindSpeedDest", "AltimeterOrigin", "VisibilityDest", "WindSpeedOrigin", "VisibilityOrigin", "DryBulbCelsiusDest", "DewPointCelsiusDest", "DryBulbCelsiusOrigin", "RelativeHumidityDest", "DewPointCelsiusOrigin", "RelativeHumidityOrigin").summary(
"mean",
"stddev",
"min",
"1%",
"5%",
"50%",
"95%",
"99%",
"max",
)

display(summ)

####4.2.1 Final dataset (vector column type)

In [0]:
# Split data into train data and test data

Convert categorical values to index values (0, 1, ...) for the following columns.

- Carrier code (```UNIQUE_CARRIER```)
- Airport code in departure (```ORIGIN```)
- Airport code in destination (```DEST```)
- Flag (0 or 1) for delay over 15 minutes (```ARR_DEL15```)

In [0]:
from pyspark.ml.feature import StringIndexer
uniqueCarrierIndexer = StringIndexer(inputCol="UNIQUE_CARRIER", outputCol="Indexed_UNIQUE_CARRIER").fit(df)
originIndexer = StringIndexer(inputCol="ORIGIN", outputCol="Indexed_ORIGIN").fit(df)
destIndexer = StringIndexer(inputCol="DEST", outputCol="Indexed_DEST").fit(df)
arrDel15Indexer = StringIndexer(inputCol="ARR_DEL15", outputCol="Indexed_ARR_DEL15").fit(df)

In [0]:
# Assemble feature columns
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
  inputCols = [
    "MONTH",
    "DAY_OF_WEEK",
    "Indexed_UNIQUE_CARRIER",
    "Indexed_ORIGIN",
    "Indexed_DEST",
    "CRS_DEP_TIME",
    "CRS_ARR_TIME",
    "RelativeHumidityOrigin",
    "AltimeterOrigin",
    "DryBulbCelsiusOrigin",
    "WindSpeedOrigin",
    "VisibilityOrigin",
    "DewPointCelsiusOrigin",
    "RelativeHumidityDest",
    "AltimeterDest",
    "DryBulbCelsiusDest",
    "WindSpeedDest",
    "VisibilityDest",
    "DewPointCelsiusDest"],
  outputCol = "features")

#### 4.2.2 Training the model

In [0]:
# Instantiate classifier (Decision tree)

In [0]:
# Create pipeline and Train

In [0]:
# Predict with eveluation data

####4.3.1 Assessing model accuracy

In [0]:
# Evaluate results

In [0]:
# Store pipeline