# ML Pipelines

In [3]:
from pathlib import Path
home = "dbfs:/mnt/data"

path_train_data = f"{home}/data/Automobile-Loan-Default/Train_Dataset.csv"

In [4]:
from pyspark.sql.types import StructField, StructType, StringType, LongType, DoubleType, DoubleType
my_schema = StructType([StructField("ID",StringType(),True),StructField("Client_Income",DoubleType(),True),\
                        StructField("Car_Owned",DoubleType(),True),StructField("Bike_Owned",DoubleType(),True),\
                        StructField("Active_Loan",DoubleType(),True),StructField("House_Own",DoubleType(),True),\
                        StructField("Child_Count",DoubleType(),True),StructField("Credit_Amount",DoubleType(),True),\
                        StructField("Loan_Annuity",DoubleType(),True),StructField("Accompany_Client",StringType(),True),\
                        StructField("Client_Income_Type",StringType(),True),StructField("Client_Education",StringType(),True),\
                        StructField("Client_Marital_Status",StringType(),True),StructField("Client_Gender",StringType(),True),\
                        StructField("Loan_Contract_Type",StringType(),True),StructField("Client_Housing_Type",StringType(),True),\
                        StructField("Population_Region_Relative",DoubleType(),True),StructField("Age_Days",DoubleType(),True),\
                        StructField("Employed_Days",DoubleType(),True),StructField("Registration_Days",DoubleType(),True),\
                        StructField("ID_Days",DoubleType(),True),StructField("Own_House_Age",DoubleType(),True),\
                        StructField("Mobile_Tag",DoubleType(),True),StructField("Homephone_Tag",DoubleType(),True),\
                        StructField("Workphone_Working",DoubleType(),True),StructField("Client_Occupation",StringType(),True),\
                        StructField("Client_Family_Members",DoubleType(),True),StructField("Cleint_City_Rating",DoubleType(),True),\
                        StructField("Application_Process_Day",DoubleType(),True),StructField("Application_Process_Hour",DoubleType(),True),\
                        StructField("Client_Permanent_Match_Tag",StringType(),True),StructField("Client_Contact_Work_Tag",StringType(),True),\
                        StructField("Type_Organization",StringType(),True),StructField("Score_Source_1",DoubleType(),True),\
                        StructField("Score_Source_2",DoubleType(),True),StructField("Score_Source_3",DoubleType(),True),\
                        StructField("Social_Circle_Default",DoubleType(),True),StructField("Phone_Change",DoubleType(),True),\
                        StructField("Credit_Bureau",DoubleType(),True),StructField("Default",DoubleType(),True)])

In [5]:
selected_features = ['Score_Source_2', 'Employed_Days', 'Age_Days', 'Client_Education', \
                     'Client_Gender', 'ID_Days', 'Population_Region_Relative', 'Credit_Amount', \
                     'Car_Owned', 'Child_Count', 'Loan_Annuity']

selected_features = ['Client_Education', 'Employed_Days', 'Age_Days', 'Client_Income_Type', \
 'Client_Gender', 'Car_Owned', 'ID_Days', 'Score_Source_2', 'Phone_Change']

target_variable_name = "Default"

In [6]:
df_train_data = spark\
.read\
.schema(my_schema)\
.option("header", "true")\
.csv(path_train_data)

In [7]:
cols_to_get = selected_features + [target_variable_name]
df_train_data = df_train_data.select(*cols_to_get)

# Length of the data

In [8]:
df_train_data.count()

121856

# Describe data

In [9]:
df_train_data.describe().toPandas().set_index("summary").T

summary,count,mean,stddev,min,max
Client_Education,118194,,,Graduation,Secondary
Employed_Days,118190,67154.07061511127,138971.78295053402,0.0,365243.0
Age_Days,118239,16027.422948434947,4366.356503618858,7676.0,25201.0
Client_Income_Type,118139,,,Businessman,Unemployed
Client_Gender,119426,,,Female,XNA
Car_Owned,118258,0.3428774374672326,0.4746729459548691,0.0,1.0
ID_Days,115871,2987.471015180675,1511.8845759418805,0.0,7197.0
Score_Source_2,116154,0.5186100569859782,0.7402967506270233,5.0E-6,100.0
Phone_Change,118175,962.0740511952612,827.9477872022571,0.0,4185.0
Default,121839,0.0807951476949088,0.2725213778065368,0.0,1.0


## Main concepts in Pipelines

- DataFrame:
    - This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.
- Transformer:
    - A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.
- Estimator:
    - An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.
- Pipeline:
    - A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.
- Parameter:
    - A Param is a named parameter
    - A ParamMap is a set of (parameter, value) pairs.
    - There are two main ways to pass parameters to an algorithm:
        - Set parameters for an instance. E.g., if lr is an instance of LogisticRegression, one could call lr.setMaxIter(10) to make lr.fit() use at most 10 iterations. This API resembles the API used in spark.mllib package.
        - Pass a ParamMap to fit() or transform(). Any parameters in the ParamMap will override parameters previously specified via setter methods.

## ML persistence: Saving and Loading Pipelines
- Often times it is worth it to save a model or a pipeline to disk for later use.
- ML persistence works across Scala, Java and Python.


In [10]:
from pyspark.ml.feature import Binarizer, StringIndexer, OneHotEncoderEstimator, StandardScaler
from pyspark.ml.feature import MinMaxScaler, VectorAssembler, Imputer
from pyspark.ml.feature import CountVectorizer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

## Utility functions used to detect variable type and rename columns

In [11]:
def variable_type(df):
    
    vars_list = df.dtypes
    char_vars = []
    num_vars = []
    for i in vars_list:
        if i[1] in ('string'):
            char_vars.append(i[0])
        else:
            num_vars.append(i[0])
    
    return char_vars, num_vars


In [37]:
df = df_train_data

In [13]:
features_list = df.columns
features_list.remove(target_variable_name)
char_vars, num_vars = variable_type(df_train_data)
char_columns = df_train_data.select(*char_vars).columns

## Configure an ML pipeline

In [14]:
stringIndexers = [StringIndexer(inputCol=c, outputCol=c+"_index", handleInvalid="keep") for c in char_columns]

# Create list of columns generated from string indexer. This list will be passed as input columns to next indexer
idx_out_cols = list([idx.getOutputCol() for idx in stringIndexers])

oneHotEncoderEstimator = OneHotEncoderEstimator(inputCols=idx_out_cols,outputCols=[idx_out_col+"_OHE" for idx_out_col in idx_out_cols])

# Create list of columns generated from oneHotEncoderEstimator. This list will be passed as input columns to next indexer
ohe_out_cols = oneHotEncoderEstimator.getOutputCols()

# We need create vector of all the columns. 
    #- The variable num_vars contains the name of columns which were already of numeric type
    #- ohe_out_cols contans the name of those variables which are converted from string to numeric type using string indexer and one hot encoder
cols_vectorAssemblers = num_vars + ohe_out_cols

In [43]:
# Binarization is the process of thresholding numerical features to binary (0/1) features.
binarizer = Binarizer(threshold=30, inputCol='Age_Days', outputCol='Age_Days_binarized')

In [16]:
# Create list of columns which needs to be imputed. Created separate list to impute using different strategy
num_vars_medean = ['Car_Owned', 'Bike_Owned', 'Active_Loan', 'House_Own', 'Child_Count', 'Mobile_Tag',\
                 'Homephone_Tag', 'Workphone_Working', 'Client_Family_Members', 'Cleint_City_Rating', \
                 'Credit_Bureau','Application_Process_Day','Default']

num_vars_medean = list(set(df.columns).intersection(set(num_vars_medean)))

In [17]:
# The remaining colums will be imputed using mean strategy
num_vars_mean = list(set(num_vars) - (set(num_vars_medean)))

In [18]:
print(num_vars_mean)

['ID_Days', 'Employed_Days', 'Score_Source_2', 'Age_Days', 'Phone_Change']


In [19]:
imputer_mean = Imputer(inputCols=num_vars_mean, outputCols=num_vars_mean).setStrategy("mean")
imputer_mod = Imputer(inputCols=num_vars_medean, outputCols=num_vars_medean).setStrategy("median")

In [20]:
# We need to scale the columns which have large values
cols_to_scale = ['Social_Circle_Default','Phone_Change','Own_House_Age','Score_Source_3',\
                 'Application_Process_Hour','Population_Region_Relative','Credit_Amount','Client_Income',\
                 'Employed_Days','Loan_Annuity','Age_Days','ID_Days','Score_Source_2','Registration_Days','Score_Source_1']

# Interecting as we might have filtered out some columns in the beginning due to feature selection approch done in last notebook - 10-a
cols_to_scale = list(set(cols_to_scale).intersection(set(df.columns)))

In [21]:
# Any one of below scalars need to be used.
standardScaler = StandardScaler(inputCol='features', outputCol="features_scaled")
minMaxScaler = MinMaxScaler(inputCol='features', outputCol="features_scaled")

In [22]:
# VectorAssembler is the transformer that combines a given list of columns into a single vector column.
vectorAssembler = VectorAssembler(inputCols=cols_vectorAssemblers, outputCol='features', handleInvalid = "keep")

### Create pipeline which have all the stages which we configured above

In [44]:
stages = stringIndexers + [oneHotEncoderEstimator, imputer_mean,imputer_mod]  + [vectorAssembler, minMaxScaler,\
                                                           binarizer
                                                          ]
pipeline = Pipeline(stages=stages)

In [45]:
print(stages)

[StringIndexer_e7df86393e40, StringIndexer_5a4254ab0bb0, StringIndexer_489b8f7a3b0a, OneHotEncoderEstimator_cc8a1d602a2b, Imputer_1c16f29b54b3, Imputer_50c35a140b14, VectorAssembler_c16f1d3884bc, MinMaxScaler_cad3f78cf8f3, Binarizer_40d0dd5a4bbf]


## Fit Data Processing Pipeline

In [27]:
df = df_train_data.limit(100)

In [28]:
df_train_data

DataFrame[Client_Education: string, Employed_Days: double, Age_Days: double, Client_Income_Type: string, Client_Gender: string, Car_Owned: double, ID_Days: double, Score_Source_2: double, Phone_Change: double, Default: double]

In [68]:
import yaml

def read_yaml(file_path):
    with open(file_path, "r") as f:
        return yaml.safe_load(f)

In [75]:
my_config = read_yaml("configs/config.yaml")

In [77]:
my_config

{'PIPELINE': {'FIT': {'imputer_strategy': 'mean',
   'standardScaler_withStd': True,
   'standardScaler_withMean': True},
  'TRANSFORM': {'imputer_strategy': 'mean',
   'standardScaler_withStd': True,
   'standardScaler_withMean': False}}}

In [81]:
my_config['PIPELINE']['FIT']['standardScaler_withMean']

True

In [85]:
# Fit the pipeline to training documents.
paramMap = {
    binarizer.threshold: 20000, 
    imputer_mean.strategy: my_config['PIPELINE']['FIT']['imputer_strategy'], 
    standardScaler.withStd: my_config['PIPELINE']['FIT']['standardScaler_withStd'],
    standardScaler.withMean: my_config['PIPELINE']['FIT']['standardScaler_withMean'],
}
paramMap

{Param(parent='Binarizer_40d0dd5a4bbf', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 20000,
 Param(parent='Imputer_1c16f29b54b3', name='strategy', doc='strategy for imputation. If mean, then replace missing values using the mean value of the feature. If median, then replace missing values using the median value of the feature.'): 'mean',
 Param(parent='StandardScaler_2b12c1b1db3e', name='withStd', doc='Scale to unit standard deviation'): True,
 Param(parent='StandardScaler_2b12c1b1db3e', name='withMean', doc='Center data with mean'): True}

In [86]:
model_process_data = pipeline.fit(df_train_data, paramMap)

In [87]:
# Fit the pipeline to training documents.
paramMap = {
    binarizer.threshold: 20000, 
    imputer_mean.strategy: my_config['PIPELINE']['TRANSFORM']['imputer_strategy'], 
    standardScaler.withStd: my_config['PIPELINE']['TRANSFORM']['standardScaler_withStd'],
    standardScaler.withMean: my_config['PIPELINE']['TRANSFORM']['standardScaler_withMean'],
}
paramMap

{Param(parent='Binarizer_40d0dd5a4bbf', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 20000,
 Param(parent='Imputer_1c16f29b54b3', name='strategy', doc='strategy for imputation. If mean, then replace missing values using the mean value of the feature. If median, then replace missing values using the median value of the feature.'): 'mean',
 Param(parent='StandardScaler_2b12c1b1db3e', name='withStd', doc='Scale to unit standard deviation'): True,
 Param(parent='StandardScaler_2b12c1b1db3e', name='withMean', doc='Center data with mean'): False}

In [88]:
df_my = model_process_data.transform(df_train_data, paramMap)

In [89]:
df_my.limit(10).select("Age_Days", "Age_Days_binarized").toPandas()

Unnamed: 0,Age_Days,Age_Days_binarized
0,13957.0,0.0
1,14162.0,0.0
2,16790.0,0.0
3,23195.0,1.0
4,11366.0,0.0
5,13881.0,0.0
6,21323.0,1.0
7,22493.0,1.0
8,16027.422948,0.0
9,20507.0,1.0


### Save date processing pipeline to disk

In [30]:
model_process_data.write().overwrite().save(f"{home}/data/my_data_processing_pipeline")