<center><h1>PART 1: TRAIN A MODEL ON DATABRICKS NOTEBOOK</h1></center>
<br>
In this notebook, we are going cover the end-to-end development of a non-trivial machine earning use-case in Databricks using Scikit-Learn

#### ABOUT THE MODEL & DATA

Using data from Taarifa and the Tanzanian Ministry of Water, we will predict which pumps are functional, which need some repairs, and which don't work at all. The labels encompass three classes and the training data is based on a number of variables about what kind of pump is operating, when it was installed, and how it is managed. A smart understanding of which waterpoints will fail can improve maintenance operations and ensure that clean, portable water is available to communities across Tanzania. This competition is hosted on [Driven Data.](https://www.drivendata.org/competitions/7/pump-it-up-data-mining-the-water-table/)

In [2]:
from __future__ import print_function
import os
import numpy as np
import pandas as pd
from sklearn.externals import joblib
import category_encoders as ce
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from IPython.display import display

### ABOUT THE DATABRICKS ENVIRONMENT

Databricks clusters can be permanent or ephemeral. To store data, the best practice however, is to use Databricks file system (DBFS) - a persistent distributed storage that sits on top of Azure Blob Storage. Let's do some house keeping and create a user folder that can be used to store things like picked objects.

For local disk storage while development we will use the Databricks FileStore folder. `/FileStore` is a special folder within DBFS where you can save files and also download files to your local machine via a browser.  
Use the Databricks Data menu/UI to upload the pumps_data.csv and new_pumps_data.csv to `/FileStore/tables/pumps` directory  

```
/FileStore
  ├── tables                     -> Databricks by default stores data here
  │   └──pumps                   -> we will create this project specific folder
  │      ├── new_pumps_data.csv  -> scoring dataset with no labels
  │      └── pumps_data.csv      -> training dataset with labels
  └── users/jason/pumps          -> we will create this folder as our project root folder
      ├───models                 -> store all pickle files
      │    ├──  local            -> pickle files created by training locally in the notebook
      │    ├──  rf.pkl           -> Random Forest estimator trained on AMLS
      │    ├──  le.pkl           -> Preprocessing transformer trained on AMLS
      │    ├──  ohc.pkl          -> Preprocessing transformer trained on AMLS
      │    ├──  y_le.pkl         -> Preprocessing transformer trained on AMLS
      └── scripts                -> scripts such as train.py and score.py for AMLS
```

Let's create a `Config` class to hold all the pertinent configurations and storage locations.

In [4]:
class Config(object):

    # define DBFS paths for sub-directories
    PROJECT_DIR = '/FileStore/users/jason/pumps/'   # to be used largely by dbutils only
    # note pure Python does not understand 'dbfs:/' so you need to use '/dbfs' in specifying the FileStore folder
    MODELS_DIR = '/dbfs'+PROJECT_DIR+'models/local/'
    SCRIPTS_DIR = '/dbfs'+PROJECT_DIR+'scripts/'
    
    # set location for uploading data
    # default location for data is /FileStore/tables but we will use a pumps sub-directory
    DATA_DIR = '/dbfs/FileStore/tables/pumps/'  

In [5]:
# create the project directories in FileStore if not already exists
dbutils.fs.mkdirs(Config.PROJECT_DIR+'models/local/')
dbutils.fs.mkdirs(Config.PROJECT_DIR+'scripts')
# verify
dbutils.fs.ls(Config.PROJECT_DIR)

### IMPORT DATA

Let's start by defining some useful helper functions.

In [8]:
def print_nans(df):

    print('Checking for NANs:............................')
    mis_val = df.isnull().sum()
    mis_val_percent = 100 * df.isnull().sum() / len(df)
    mis_val_table = pd.concat([mis_val, mis_val_percent], axis=1)
    mis_val_table_ren_columns = mis_val_table.rename(
        columns={0: 'Missing Values', 1: '% of Total Values'})
    mis_val_table_ren_columns = mis_val_table_ren_columns[
        mis_val_table_ren_columns.iloc[:, 1] != 0].sort_values(
        '% of Total Values', ascending=False).round(1)
    print("Your selected dataframe has " +
          str(df.shape[1]) +
          " columns and " +
          str(len(df)) +
          " rows \n" "There are " +
          str(mis_val_table_ren_columns.shape[0]) +
          " columns that have missing values.")
    print('..............................................')
    return mis_val_table_ren_columns


def data_frame_imputer(df):
    fill = pd.Series([df[c].value_counts().index[0]
                      if df[c].dtype == np.dtype('O') else df[c].mean() for c in df],
                     index=df.columns)
    return df.fillna(fill)


def replace_with_grouped_mean(df, value, column, to_groupby):

    invalid_mask = (df[column] == value)

    # get the mean without the invalid value
    means_by_group = (df[~invalid_mask].groupby(to_groupby)[column].mean())

    # get an array of the means for all of the data
    means_array = means_by_group[df[to_groupby].values].values

    # assign the invalid values to means
    df.loc[invalid_mask, column] = means_array[invalid_mask]

    return df


def log_transformer(df, base, c=1):

    if base == 'e' or base == np.e:
        log = np.log

    elif base == '10' or base == 10:
        log = np.log10

    else:
        def log(x): return np.log(x) / np.log(base)

    c = c
    out = pd.DataFrame()
    for _ in df:
        out = df.apply(lambda x: log(x + c))
    return out


def stratified_split(x, y, test_size):

    from sklearn.model_selection import StratifiedShuffleSplit

    sss = StratifiedShuffleSplit(n_splits=10, test_size=test_size, random_state=5)
    sss.get_n_splits(x, y)
    data_train = pd.DataFrame()
    data_test = pd.DataFrame()
    label_train = pd.DataFrame()
    label_test = pd.DataFrame()
    for train_index, test_index in sss.split(x, y):
        data_train, data_test = x.iloc[train_index], x.iloc[test_index]
        label_train, label_test = y.iloc[train_index], y.iloc[test_index]
    return data_train, data_test, label_train, label_test


In [9]:
def create_dataframe(x):
    """
    Imports the pumps csv data file directly from DBFS.

    :param x: full DBFS path to file
              e.g: '/dbfs/FileStore/tables/pumps/pumps_data.csv'
    :return: two dataframes that split data and labels
    """
    # import raw data
    raw = pd.read_csv(x, index_col=0)
    labels = pd.DataFrame(raw['status_group'])
    data = raw.drop('status_group', axis=1)
    
    return data, labels

In [10]:
# check if data was uploaded successfully
data, labels = create_dataframe(x=Config.DATA_DIR+'pumps_data.csv')
data.head(2), labels.head(2)

### CLEAN DATA

In this step we'll clean the data, impute NANs etc. by invoking those helper functions.

In [12]:
def clean_data(x, y):
    """
    Takes the pumps data and label dataframe and cleans it

    :param x: the pumps dataframe
    :param y: the pumps labels dataframe
    :return:  stratified splits for train and test
    """
  
    useful_columns = ['amount_tsh',
                      'gps_height',
                      'longitude',
                      'latitude',
                      'region',
                      'population',
                      'construction_year',
                      'extraction_type_class',
                      'management_group',
                      'quality_group',
                      'source_type',
                      'waterpoint_type']

    # subset to columns we care about
    x = x[useful_columns]

    # for column construction_year, values <=1000 are probably bad
    invalid_rows = x['construction_year'] < 1000
    valid_mean = int(x.construction_year[~invalid_rows].mean())
    x.loc[invalid_rows, "construction_year"] = valid_mean

    # in some columns 0 is an invalid value
    x = replace_with_grouped_mean(df=x, value=0, column='longitude', to_groupby='region')
    x = replace_with_grouped_mean(df=x, value=0, column='population', to_groupby='region')

    # set latitude to the proper value
    x = replace_with_grouped_mean(df=x, value=-2e-8, column='latitude', to_groupby='region')

    # set amount_to non-zeroes
    x = replace_with_grouped_mean(df=x, value=0, column='amount_tsh', to_groupby='region')

    # remove na's
    x = data_frame_imputer(df=x)

    # print nans in the dataframe if any
    print_nans(x)

    # log transform numerical columns
    num_cols = ['amount_tsh', 'population']
    x[num_cols] = log_transformer(df=x[num_cols], base='e', c=1)

    # do train/test split
    x_train, x_test, y_train, y_test = stratified_split(x=x, y=y, test_size=0.2)

    return x_train, x_test, y_train, y_test

Before we proceed let's check our work

In [14]:
x_train, x_test, y_train, y_test = clean_data(data, labels)
display(x_train.head(2), y_train.head(2), x_test.head(2), y_test.head(2))


### PREPROCESS DATA

In this step we'll label and one hot encode all categoricals

In [16]:
def train_pre_processing(x, y):

    """
    Preprocesses the pumps train datasets by applying label
    and one-hot encoding

    :param x: the pumps x_train dataset
    :param y: the upmps y_train dataset
    :return: encoded datasets and the fitted transformers
    """

    # transform categorical variables with encoders
    le_cols = ['region']
    ohc_cols = ['extraction_type_class',
                'management_group',
                'quality_group',
                'source_type',
                'waterpoint_type']

    # define encoders include label encoding for the actual labels
    # using handle_unknown='ignore' will leave out new unseen values so keep
    # monitoring your data for changes
    
    le = ce.OrdinalEncoder(cols=le_cols,
                           return_df=True,
                           handle_unknown='ignore')

    ohc = ce.OneHotEncoder(cols=ohc_cols,
                           return_df=True,
                           use_cat_names=False,
                           handle_unknown='ignore')

    y_le = ce.OrdinalEncoder(return_df=True,
                             handle_unknown='ignore')

    print("x_train shape: ", x.shape)
    print("y_train shape: ", y.shape)
    # apply the encoders
    print("Running label and one-hot encoding on the train data...")
    x = le.fit_transform(x)
    x = ohc.fit_transform(x)
    y = y_le.fit_transform(y)
    # update the transformers
    le = le
    ohc = ohc
    y_le = y_le
    print("Final x_train shape: ", x.shape)
    print("Final y_train shape: ", y.shape)
    print("done.")
    
    return x, y, le, ohc, y_le


def test_pre_processing(x, y, le, ohc, y_le):

    """
    Preprocesses the pumps test datasets by applying label
    and one-hot encoding

    :param x: the x_test dataset
    :param y: the y_test dataset
    :param le: the label encoder fitted from the train_pre_processing() function
    :param ohc: the one-hot encoder fitted from the train_pre_processing() function
    :param y_le: the y label encoder fitted from the train_pre_processing() function
    :return: encoded x_test and y_test
    """
  
    print("x_test shape: ", x.shape)
    print("y_test shape: ", y.shape)
    print("Running label and one-hot encoding on the test data...")
    x = le.transform(x)
    x = ohc.transform(x)
    y = y_le.transform(y)
    print("New x_test shape: ", x.shape)
    print("New y_test shape: ", y.shape)
    print("done.")

    return x, y



Once again, let's check our work to ensure all categoricals are properly encoded

In [18]:
# pre-process training data
x_train, y_train, le, ohc, y_le = train_pre_processing(x=x_train, y=y_train)
# pre-process test data
x_test, y_test = test_pre_processing(x=x_test, y=y_test, le=le, ohc=ohc, y_le=y_le)
display(x_train.head(2), y_train.head(2), x_test.head(2), y_test.head(2))

### TRAIN MODEL

Now, we are going to create one function that will encompass all the flows all the way from importing csv to processing and model training.

In [20]:
def train_and_evaluate(x, n_estimators=100, criterion='entropy', class_weight='balanced_subsample'):
    """
    A full pipeline that cleans, preprocesses data and then fits a
    random forest classifier

    :param x              : full DBFS path to file
                            e.g: '/dbfs/FileStore/tables/pumps/pumps_data.csv'
    :param n_estimators:  : random forest parameter for number of branches
    :param criterion:     : random forest parameter for node splitting methodology
    :param class_weight:  : random forest parameter whether to treat all classes as balanced
    :return               :  all estimator/transfromer objects and the accuracy metric
    """

    # ingest and process data
    data, labels = create_dataframe(x=x)
    x_train, x_test, y_train, y_test = clean_data(x=data, y=labels)
    x_train, y_train, le, ohc, y_le = train_pre_processing(x=x_train, y=y_train)
    x_test, y_test = test_pre_processing(x=x_test, y=y_test, le=le, ohc=ohc, y_le=y_le)
    
    # train classifier
    print("training classifier...")
    rf = RandomForestClassifier(n_estimators=n_estimators,
                                criterion=criterion,
                                class_weight=class_weight)
    rf.fit(x_train, np.ravel(y_train))
    print(" classifier has been trained")
    
    # evaluate on test set
    test_pred = rf.predict(x_test)
    accuracy = accuracy_score(y_test, test_pred)
    print("Test Accuracy: ", accuracy)
    
    # we need to return the transformers also so that it gets captured in the global scope
    # this will allow us to pass these objects to the save_model() function
    return rf, accuracy, le, ohc, y_le
  
    

Let's run the entire flow using the `train_and_evaluate()` function

In [22]:
rf, accuracy, le, ohc, y_le = train_and_evaluate(Config.DATA_DIR+'pumps_data.csv')

At this point. we're going to set up a function to export our pickled estimator and transformers to GCS

In [24]:
def save_model(estimator, dbfspath, file_name):

    """
    :param estimator : estimator or transformer currently in memory to pickle
    :param dbfspath  : path to DBFS directory to store file as expected by dbutils
                       e.g: /FileStore/users/jason/pumps/models/
    :param file_name : name of the pickled file
    """
    
    # dump pickle to local filesystem
    joblib.dump(estimator, file_name) 
    # move the pickled object from the local folder on the cluster to DBFS
    dbutils.fs.mv('file:/databricks/driver/'+file_name, dbfspath)

Let's test out the exporting function

In [26]:
save_model(rf, Config.PROJECT_DIR+'models/local/', file_name='rf.pkl')
save_model(le, Config.PROJECT_DIR+'models/local/', file_name='le.pkl')
save_model(ohc, Config.PROJECT_DIR+'models/local/', file_name='ohc.pkl')
save_model(y_le, Config.PROJECT_DIR+'models/local/', file_name='y_le.pkl')

The pickled objects should now appear in DBFS

In [28]:
dbutils.fs.ls(Config.PROJECT_DIR+'models/local')

### SCORE NEW DATA

The `process_data()` function will clean and preprocess any new data and output a format that the random forest model expects to see

In [30]:
def process_data(x, le, ohc):
    """
    Gets new data ready for scoring

    :param x: new data in the form of a dataframe
    :param le: the pumps pickled label encoder transformer
    :param ohc: the pumps pickled one-hot encoding transformer
    :return:  dataframe ready for prediction
    """
  
    useful_columns = ['amount_tsh',
                      'gps_height',
                      'longitude',
                      'latitude',
                      'region',
                      'population',
                      'construction_year',
                      'extraction_type_class',
                      'management_group',
                      'quality_group',
                      'source_type',
                      'waterpoint_type']

    # subset to columns we care about
    x = x[useful_columns]

    # for column construction_year, values <=1000 are probably bad
    invalid_rows = x['construction_year'] < 1000
    valid_mean = int(x.construction_year[~invalid_rows].mean())
    x.loc[invalid_rows, "construction_year"] = valid_mean

    # in some columns 0 is an invalid value
    x = replace_with_grouped_mean(df=x, value=0, column='longitude', to_groupby='region')
    x = replace_with_grouped_mean(df=x, value=0, column='population', to_groupby='region')

    # set latitude to the proper value
    x = replace_with_grouped_mean(df=x, value=-2e-8, column='latitude', to_groupby='region')

    # set amount_to non-zeroes
    x = replace_with_grouped_mean(df=x, value=0, column='amount_tsh', to_groupby='region')

    # remove na's
    x = data_frame_imputer(df=x)

    # print nans in the dataframe if any
    print_nans(x)

    # log transform numerical columns
    num_cols = ['amount_tsh', 'population']
    x[num_cols] = log_transformer(df=x[num_cols], base='e', c=1)
    
    print("data shape: ", x.shape)
    print("Running label and one-hot encoding on the new data...")
    x = le.transform(x)
    x = ohc.transform(x)
    print("Processed data shape: ", x.shape)
    print("done.")
    
    return x

In [31]:
# load pickled models & transformers
rf = joblib.load(Config.MODELS_DIR+'rf.pkl')
le = joblib.load(Config.MODELS_DIR+'le.pkl')
ohc = joblib.load(Config.MODELS_DIR+'ohc.pkl')

In [32]:
# get the data ready for prediction
df = pd.read_csv(Config.DATA_DIR+'new_pumps_data.csv', index_col=0)
df = process_data(df, le, ohc)

In [33]:
# make prediction
predictions = rf.predict(df)
print(predictions)