# Under the hood - using the individual training functions

We will now explore the functions used by `do_datasciencing` and see how they can be put together into a manual pipeline in which some of them can be replaced by custom user functions.

In [0]:
import datasciencefunctions as ds
import datasciencefunctions.classification as dsclass

from datasciencefunctions import MlModel

#0. Load and prepare data

#### 
* We will load the adult databricks example dataset. 
* It contains categorical, ordinal and numeric (continuous) predictors representing demographic info of US adults and a target denoting whether their income exceeded USD 50 000. 
* We want to train a model to predict whether the person's income exceeds USD 50 000. 
(you can read more details in the readme below).

In [0]:
with open("/dbfs/databricks-datasets/adult/README.md") as f:
    x = ''.join(f.readlines())

print(x)

In [0]:
schema = """
  age DOUBLE,
  workclass STRING,
  fnlwgt DOUBLE,
  education STRING,
  education_num DOUBLE,
  marital_status STRING,
  occupation STRING,
  relationship STRING,
  race STRING,
  sex STRING,
  capital_gain DOUBLE,
  capital_loss DOUBLE,
  hours_per_week DOUBLE,
  native_country STRING,
  income STRING
"""

df_adult = (
    spark
    .read
    .format("csv")
    .schema(schema)
    .option("header", True)
    .option("path", "dbfs:/databricks-datasets/adult/adult.data")
    .load()
    .sample(fraction=0.35) # only take a sample of the dataset for tutorial purposes
)

df_adult.printSchema()

When running binary classification, datasciencefunctions expects you to specify a label (or target) column with values 1 and 0.

We will create a classification target column called "income_above_50K" with value 1 if the person's income exceeds USD 50K and 0 otherwise.

In [0]:
import pyspark.sql.functions as F

df_adult_ml = (
    df_adult
    .withColumn(
        "income_above_50K", 
        F.when(F.col("income")==" >50K", 1).otherwise(0)
    )
    # we drop the income column because it is perfectly correlated with our label
    .drop("income")
)

In [0]:
display(df_adult_ml.limit(10))

age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income_above_50K
53.0,Private,234721.0,11th,7.0,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,United-States,0
49.0,Private,160187.0,9th,5.0,Married-spouse-absent,Other-service,Not-in-family,Black,Female,0.0,0.0,16.0,Jamaica,0
34.0,Private,245487.0,7th-8th,4.0,Married-civ-spouse,Transport-moving,Husband,Amer-Indian-Eskimo,Male,0.0,0.0,45.0,Mexico,0
25.0,Self-emp-not-inc,176756.0,HS-grad,9.0,Never-married,Farming-fishing,Own-child,White,Male,0.0,0.0,35.0,United-States,0
38.0,Private,28887.0,11th,7.0,Married-civ-spouse,Sales,Husband,White,Male,0.0,0.0,50.0,United-States,0
43.0,Self-emp-not-inc,292175.0,Masters,14.0,Divorced,Exec-managerial,Unmarried,White,Female,0.0,0.0,45.0,United-States,1
54.0,Private,302146.0,HS-grad,9.0,Separated,Other-service,Unmarried,Black,Female,0.0,0.0,20.0,United-States,0
35.0,Federal-gov,76845.0,9th,5.0,Married-civ-spouse,Farming-fishing,Husband,Black,Male,0.0,0.0,40.0,United-States,0
59.0,Private,109015.0,HS-grad,9.0,Divorced,Tech-support,Unmarried,White,Female,0.0,0.0,40.0,United-States,0
19.0,Private,168294.0,HS-grad,9.0,Never-married,Craft-repair,Own-child,White,Male,0.0,0.0,40.0,United-States,0


In [0]:
import mlflow
from datasciencefunctions.utils import current_dbx_notebook_path

# set experiment based on current notebook path, you will probably want to change it to the shared experiment you'll be working on
mlflow.set_experiment(current_dbx_notebook_path(dbutils) + "_test_experiment")

# 1. The gist of `do_datasciencing`

The function `do_datasciencing` does more or less what the following code does. The actual code is somewhat more complex (you can check it [here](nothing here)) but the important stuff is all in the following cell.

While the example in this notebook is show in PySpark **all the functions from the datasciencefunctions package used here can also be used when training scikit-learn models**, often with no changes at all.

We will go through each part of the code separately to understand the entire process.

In [0]:
from datasciencefunctions.data_processing import train_test_split
from datasciencefunctions.data_processing import fit_transformation_pipeline
from datasciencefunctions.data_processing import apply_transformation_pipeline
from datasciencefunctions.classification import fit_classification_model
from datasciencefunctions.classification import get_model_summary
from datasciencefunctions.classification import log_model_summary

# data preparation
df = df_adult_ml.withColumnRenamed("income_above_50K", "label")
model_type = ds.MlModel.spark_GLM_binomial
cat_list = [
    "workclass",
    "education",
    "marital_status",
    "race",
    "sex",
    "native_country",
]

num_list = [
    "age",
    "capital_gain",
    "capital_loss",
    "hours_per_week",
]

# disabling MLFlow auto-tracking (specific to Databricks Spark)
spark.conf.set("spark.databricks.mlflow.trackMLlib.enabled", "false")

train_data, test_data = train_test_split(df)

pipeline = fit_transformation_pipeline(
    train_data,
    skip_cols=["label"],
    cat_cols=cat_list,
    num_cols=num_list,
    # scaling="standard",  # other scaling options can be specified
)

train_data, test_data = apply_transformation_pipeline(
    pipeline,
    train_data,
    test_data
)

# (only in PySpark) caching the data!!! 
train_data.cache()
test_data.cache()
_, _ = train_data.count(), test_data.count()

model = fit_classification_model(
    model_type,
    train_data,
    param_space_search="hyperopt",
    max_evals=5,  # make it faster, you'll want to set some more reasonable value
)

summary = get_model_summary(model, pipeline, test_data)

# here goes any custom metrics, params, artifacts, etc. (must be *before* logging if you want it in MLflow)

with mlflow.start_run():
    log_model_summary(summary)

# 2. Individual parts

Let us now look at the code in detail and show how we can modify a part of the pipeline if we want to use different functions for some parts of the training pipeline.

First we just define our machine learning dataframe and specify which model we want to use (in the next chapter we will see how to register new models to `MlModel`). The functions used in `do_datasciencing` do not support custom names for the labels (by design) so first we need to rename our `income_above_50K` column to `label`. Note that this was not a problem in `do_datasciencing` since it does the renaming for you as long as you specify what the name of the label column is.

In [0]:
df = df_adult_ml.withColumnRenamed("income_above_50K", "label")
model_type = ds.MlModel.spark_GLM_binomial
cat_list = [
    "workclass",
    "education",
    "marital_status",
    "race",
    "sex",
    "native_country",
    ]

num_list = [
    "age",
    "capital_gain",
    "capital_loss",
    "hours_per_week",
    ]

### Disabling automatic MLFlow tracking

We want to log models to MLFlow but we do not this done every step of the way and logging every single model tried during hyperparameter optimization. In `do_datasciencing` this is done automatically, outside of it you want to use the following command (unless you really want to log every step of the training). 

`NOTE`: Works for Spark models in Databricks. MLflow in versions 1.13 onwards provides means to disable automatic tracking for all supported frameworks with a single line of code.

In [0]:
spark.conf.set("spark.databricks.mlflow.trackMLlib.enabled", "false")

### Train-test split

Next we want to split our data into a training and testing set. We will first show how that is done with the `ds.data_processing.train_test_split` function and then use a custom function to illustrate the point that every single part of the functions used in the pipeline can be replaced by custom functions individually - you can still use the rest for the parts of the pipeline you do not want to write on your own.

####The standard `datasciencefunctions` way:

In [0]:
train_data, test_data = train_test_split(df)

####A customized train-test split

Let's say we wrote the following split function:

In [0]:
def my_custom_train_test_split(df, split_column, split_threshold):
    """
    docstring should be here
    """
    df_train = df.filter(F.col(split_column) <= split_threshold)
    df_test = df.filter(F.col(split_column) > split_threshold)
    
    return df_train, df_test

We can then replace `ds.data_processing.train_test_split` with `my_custom_train_test_split` and keep the rest of the pipeline unchanged:

In [0]:
# For the tutorial purposes we need to create a dummy split column
# which in this case consists of samples fro a standardized normal distribution
df = df.withColumn("my_split_column", F.randn())

# We split the training and testing data using the new functions
train_data, test_data = my_custom_train_test_split(df, "my_split_column", 1)

## Fitting the transformation pipeline

The model data (the features and the label) need to be vectorize so that a model can be trained on them. The `ds.data_processing` module contains the function `fit_transformation_pipeline` fits a pipeline which does just that. All it needs is a machine learning dataset and a list of categorical and numerical columns. You also want to add the label column to the `skip_cols` parameter, otherwise the resulting transformation pipeline would vectorize the label as another feature.

Of course you can replace the transformation pipeline with a custom-made one.

In [0]:
pipeline = fit_transformation_pipeline(
    train_data,
    skip_cols=["label"],
    cat_cols=cat_list,
    num_cols=num_list,
)

## Applying the transformation pipeline

A fitted transformation pipeline needs to be applied to the training and testing datasets to actually vectorize their features. The function `apply_transformation_pipeline` form the `ds.data_processing` module does that, while the training and testing datasets are transformed separately (this makes sense for example when we use scaling).

Once again, you can replace the function with a custom one or just transform the datasets manually.

In [0]:
train_data, test_data = apply_transformation_pipeline(
    pipeline,
    train_data,
    test_data,
)

## (only in PySpark): caching the data!!! 

It is generally good practice to cache data where appropriate to save time by computing transformations only once. In case of training ML models it is however ***necessary*** to do so. If you don't somehow split the train and test data together and don't store the results, the dataframes will be computed separately. Since the split is random, some rows might end up in both the training dataset and testing dataset creating a ***leak!***

In [0]:
train_data.cache()
test_data.cache()
_, _ = train_data.count(), test_data.count()

## Fitting a classification model

With the data in a vectorized form  and cached, we can fit the classification model itself. When this is done through `datasciencefunctions`, we apply the `fit_classification_model` function from the `ds.classification` module. This functions uses hyperopt cross-validation hyperparameter optimization to fit the model by default and can be customized through optional parameters just as we have seen in the previous tutorial when the `params_fit_classification` in `do_datasciencing` (which only passes the optional arguments down to `fit_classification_model`). In the final part of the tutorial, we will see how to create custom hyperparameter spaces.

Once again, `fit_classification_model` can of course be replaced by a custom fitting function.

In [0]:
model = fit_classification_model(
    model_type,
    train_data,
    param_space_search="hyperopt",
    max_evals=5,  # make it faster, you'll want to set some more reasonable value
)

## Obtaining a model summary

To get the model summary that we have seen in the previous parts of the tutorial, we use the `get_model_summary` function from the `ds.classification` module. It has exactly the same format and content as the summary from `do_datasciencing` (since it is obtained in the same way). 

You can get the content of the output of `get_model_summary` in other ways, but it will require putting together quite a lot of functions. However, since the summary is just a nested dictionary, you can add new elemets to it (for example if you want to add extra metrics) or change its content as you would with any other nested dictionary.

In [0]:
summary = get_model_summary(model, pipeline, test_data)

# Add custom metrics, params, artifacts, models, etc.
You can provide additional entries to the summary. Format defined in documentation should be followed (see `get_summary_layout` and `log_model_summary`).

In [0]:
summary["metrics"]["classification"]["binary"]["foo"] = 0.123
summary["params"]["bar"] = "baz"

## Logging the summary/model to MLFlow

Finally, you will usually want to log the finished model summary to an MLFlow experiment. The functions `log_model_summary` from `ds.classification` module will help you with that, logging its standard content into a MLFlow experiment.

In [0]:
# note that the experiment has already been set above so MLflow knows where to log it
with mlflow.start_run():
    log_model_summary(summary)

# Bonus: All models in one fell swoop
### An example loop in PySpark using `datasciencefunctions` to train and log all classification model types at once

If you want to try a lot of model architectures on your data quickly and with minimal work, you can use `datasciencefunctions` to create a loop like the one below to train and log all relevant models (and their summaries). Most of the following code is like the gist of `do_datasciencing` example above, except that instead of training a single model, we train and log all of them.

In [0]:
# data preparation
df = df_adult_ml.withColumnRenamed("income_above_50K", "label")
model_type = ds.MlModel.spark_GLM_binomial
cat_list = [
    "workclass",
    "education",
    "marital_status",
    "race",
    "sex",
    "native_country",
]

num_list = [
    "age",
    "capital_gain",
    "capital_loss",
    "hours_per_week",
]

# disabling automatic logging to MLFlow
spark.conf.set("spark.databricks.mlflow.trackMLlib.enabled", "false")

# train-test split
train_data, test_data = train_test_split(df)

# fitting the transformation pipeline
pipeline = fit_transformation_pipeline(
    train_data,
    skip_cols=["label"],
    cat_cols=cat_list,
    num_cols=num_list,
)

# applying the transformation pipeline
train_data, test_data = apply_transformation_pipeline(
    pipeline,
    train_data,
    test_data,
)

# (only in PySpark) caching the data!!! 
train_data.cache()
test_data.cache()
_, _ = train_data.count(), test_data.count()

models = dict()

for model_type in MlModel:
    if model_type.framework != "spark" or model_type.task_type != "classification":
        continue  # ...we don't want summary for sklearn since we're working with Spark stack now
    models[model_type] = fit_classification_model(
        model_type,
        train_data,
        param_space_search="hyperopt",
        max_evals=1,  # make it faster, you'll want to set some more reasonable value
    )

summaries = dict()

for model_type in models:
    if model_type.framework != "spark" or model_type.task_type != "classification":
        continue  # dtto
    summaries[model_type] = ds.classification.get_model_summary(models[model_type], pipeline, test_data)

for model_type in models:
    if model_type.framework != "spark" or model_type.task_type != "classification":
        continue  # dtto
    with mlflow.start_run():
        log_model_summary(summaries[model_type])

# Where to now?

[Back to the introductory notebook](classification.ipynb)

[To the next chapter](04_custom_mlmodels_hyperparameter_spaces.ipynb)