# Training a Machine Learning Model Externally and Deploying it to Db2 for In-Database Scoring

In this notebook, we will train a customer segmentation model using the K-Means algorithm model externally in IBM Cloud Pak for Data. The model will then be deployed to Db2 as a joblib file. We will then use Db2's Python UDFs to execute the inferencing pipeline and return predictions to the user.

Contents:
<br>
**I. Model Training** - Train a customer segmentation model in Watson Studio and create deployment assets (e.g. trained model, column metadata, PCA components, etc.)<br>
**II. Model Deployment** - Using Python UDFs, deploy the inferencing pipeline (i.e. data processing and model scoring) to the database for in-database scoring<br>

# I. Training a Customer Segmentation Model with scikit-learn

## Introduction

In this section we will train our customer segmentation model. Segmentation can help you to identify and understand customer subgroups and how they differ from one another. Customer segmentation has many beneficial uses. In our case for example, it can be used to assign the best-fit agent, identify opportunities for leveraging agent skills, or even to create hosting opportunities to improve client engagement.<br><br>
We'll prepare the data so it's in a wide format ready for segmentation and then apply some transformation and clustering techniques. Our data contains demographic, behavioural, summary product and communication based data. Since there are currently no predefined segments we'll need to discover segments within the underlying data structure.

**Sample Materials, provided under license. <br>
Licensed Materials - Property of IBM. <br>
© Copyright IBM Corp. 2019, 2020. All Rights Reserved. <br>
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp. <br>**

In [1]:
# # Ensure sklearn is the correct version
!pip install scikit-learn==0.23.2

Collecting scikit-learn==0.23.2
  Downloading scikit_learn-0.23.2-cp37-cp37m-manylinux1_x86_64.whl (6.8 MB)
[K     |████████████████████████████████| 6.8 MB 8.3 MB/s eta 0:00:01
Installing collected packages: scikit-learn
  Attempting uninstall: scikit-learn
    Found existing installation: scikit-learn 0.23.1
    Uninstalling scikit-learn-0.23.1:
      Successfully uninstalled scikit-learn-0.23.1
Successfully installed scikit-learn-0.23.2


In [2]:
# Import Libraries
import pandas as pd
import numpy as np
import json
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
from pandas.api.types import is_string_dtype
from pandas.api.types import is_numeric_dtype
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.decomposition import PCA
import sys
import joblib
import seaborn as sns

from project_lib import Project
project = Project()


from sklearn.preprocessing import StandardScaler


if '/project_data/data_asset/' not in sys.path:
    sys.path.insert(0, '/project_data/data_asset/')
    
from customer_segmentation_prep import *

%matplotlib inline

In [3]:
#Establishing Connection to Database

import pandas as pd
import ibm_db_dbi
import ibm_db
from project_lib import Project 

# Define connection string and connect to database
project = Project.access()
LocalDB2_credentials = project.get_connection(name = "CSSDB3")

bluedb_connection = ibm_db.connect("DATABASE={};HOSTNAME={};PORT={};PROTOCOL=TCPIP;UID={};PWD={}".format(LocalDB2_credentials['database'],
                                                                                                         LocalDB2_credentials['host'],
                                                                                                         LocalDB2_credentials['port'],
                                                                                                         LocalDB2_credentials['username'],
                                                                                                         LocalDB2_credentials['password']),"","")
dbi_bluedb_connection = ibm_db_dbi.Connection(bluedb_connection)

## User Inputs and Data Prep

### User Inputs
**effective_date :**  This is the date that the segmentation is computed. All input data should be before this date.<br>
**train_or_score :**  Specify whether we are prepping the data for training or scoring. Should always be 'train' in this notebook.<br>

**granularity_key :** Specifies the customer ID column.<br>
**customer_start_date :** Column with the start of the summary month of customer data.<br> 
**customer_end_date :** As above, but last day of the summary month.<br>
**status_attribute :** Column which indicates whether the customer is active or inactive and is used to define churn. Churned customers are removed from the dataset.<br>
**status_flag_active :** The name of the variable in the status_attribute that indicates that the customer has churned, in this case it is 'Inactive'.<br>
**date_customer_joined :** Specifies the column where the customer join date is recorded. This variable is used to calculate customer tenure.<br>

**columns_required :** A list of default columns required, includes ID column and date columns.<br>
**default_attributes :** A list of the variables that we would like to use for the segmentation.<br>
**risk_tolerance_list :** A list of the risk categories for the customer's accounts. 'High', 'Low' etc.<br> 
**investment_objective_list :** A list of the investment objective categories for the customer's accounts. 'Security', 'Income' etc.<br>

The last three user input variables are used for data cleaning.<br>
**std_multiplier :** This variable is used to identify outlier values. This number is multiplied by the variable standard deviation. Any value above this is defined as an outlier and the value is capped at this number multiplied by the standard deviation.<br>
**max_num_cat_cardinality :** This variable defines the maximum cardinality for categorical variables. Any categorical variable with more categories than this maximum is removed from the dataset.<br> 
**nulls_threshold :** This threshold is used to identify columns with many null values. Any column with percentage of nulls greater than this threshold will be removed from the dataset.<br>

The user can use the default inputs as listed below or can choose their own. The user inputs will be stored and the same inputs will be applied automatically at scoring time. 


### Data Prep
See `project_data/data_asset/customer_segmentation_prep.py` for details of data preparation.

The script generates the dataset that is used for clustering. We take a wide form dataset with customer details, filter to include only columns that are relevant, complete data cleaning and produce a dataframe suitable for clustering. 

### Data Cleaning
•	Any customer who attrited in the dataset is removed. Only active customers are used for clustering.<br>
•	We take the most recent record for each customer.<br>
•	Any columns in the dataset that have a single constant value are removed.<br>
•	Any column with more than 10% null values is removed.<br>
•	High cardinality categorical columns are removed.<br>
•	Numerical outliers are cleaned. <br>
•	Remaining missing values are filled with 'Unknown' for categorical and the average of the column for numerical. 

In [4]:
# User input variables
effective_date = '2018-09-30'  # date at which the prediction was computed 
train_or_score = 'train'

granularity_key='CUSTOMER_CUSTOMER_ID'
customer_start_date='CUSTOMER_SUMMARY_START_DATE'
customer_end_date='CUSTOMER_SUMMARY_END_DATE'
status_attribute='CUSTOMER_STATUS'
status_flag_active='Active'
date_customer_joined='CUSTOMER_RELATIONSHIP_START_DATE'

columns_required=['CUSTOMER_CUSTOMER_ID', 'CUSTOMER_STATUS', 'CUSTOMER_SUMMARY_START_DATE', 'CUSTOMER_SUMMARY_END_DATE',
                    'CUSTOMER_EFFECTIVE_DATE',  'CUSTOMER_SYSTEM_LOAD_TIMESTAMP']

default_attributes=['CUSTOMER_GENDER', 'CUSTOMER_AGE_RANGE', 'CUSTOMER_EDUCATION_LEVEL',
                            'CUSTOMER_EMPLOYMENT_STATUS', 'CUSTOMER_MARITAL_STATUS', 
                            'CUSTOMER_URBAN_CODE', 'CUSTOMER_ANNUAL_INCOME', 'CUSTOMER_RELATIONSHIP_START_DATE', 
                            'CUSTOMER_SUMMARY_RETURN_LAST_QUARTER', 
                            'CUSTOMER_SUMMARY_NUMBER_OF_EMAILS',
                            'CUSTOMER_SUMMARY_NUMBER_OF_LOGINS',
                    'CUSTOMER_SUMMARY_AMOUNT_OF_MANAGEMENT_FEES',
                           'CUSTOMER_SUMMARY_TOP_SPENDING_CATEGORY', 'CUSTOMER_CREDIT_AUTHORITY_LEVEL', 'CUSTOMER_CUSTOMER_BEHAVIOR', 'CUSTOMER_IMPORTANCE_LEVEL_CODE',
                           'CUSTOMER_MARKET_GROUP',
                           'CUSTOMER_PURSUIT']
risk_tolerance_list = []
investment_objective_list = []

std_multiplier=5
max_num_cat_cardinality=15
nulls_threshold=0.1

## Load Customer Segmentation Data

For this project we will be loading our training data from the database table called **DSE.CUST_SEG_DATA_TRAIN**. This table was created from the csv file **customer_full_summary_latest.csv**. The file is located in the `/project_data/data_asset/` directory.

In [5]:
# Read in training data from Db2
sql = 'SELECT * FROM DSE.CUST_SEG_DATA_TRAIN'
input_df = pd.read_sql(sql, dbi_bluedb_connection)

# Convert Db2 datatypes to Python datatypes - important for datetime columns!
original_csv_dtypes = joblib.load('/project_data/data_asset/original_csv_dtypes.pkl') 
df_raw = input_df.astype(original_csv_dtypes)

data_prep = CustomerSegmentationPrep(train_or_score=train_or_score, effective_date=effective_date, granularity_key=granularity_key, customer_start_date=customer_start_date, customer_end_date=customer_end_date,
                                        status_attribute=status_attribute, status_flag_active=status_flag_active, date_customer_joined=date_customer_joined, columns_required=columns_required, default_attributes=default_attributes,
                                        risk_tolerance_list=risk_tolerance_list, investment_objective_list=investment_objective_list, std_multiplier=std_multiplier, max_num_cat_cardinality=max_num_cat_cardinality, nulls_threshold=nulls_threshold)
df_prepped = data_prep.prep_data(df_raw, train_or_score)


Before removing inactive customers we have 1000 customers
After removing inactive customers we have 838 customers
Before cleaning, we had 22 columns.
After cleaning, we have 19 columns.
Add a column for customer tenure
Prepped data has 838 rows and 17 columns.
Prep has data for 838 customers


In [6]:
# Preview prepped data
df_prepped.head()

Unnamed: 0,CUSTOMER_CUSTOMER_ID,CUSTOMER_PURSUIT,CUSTOMER_CREDIT_AUTHORITY_LEVEL,CUSTOMER_EMPLOYMENT_STATUS,CUSTOMER_MARKET_GROUP,CUSTOMER_AGE_RANGE,CUSTOMER_EDUCATION_LEVEL,CUSTOMER_MARITAL_STATUS,CUSTOMER_CUSTOMER_BEHAVIOR,CUSTOMER_SUMMARY_AMOUNT_OF_MANAGEMENT_FEES,CUSTOMER_SUMMARY_TOP_SPENDING_CATEGORY,CUSTOMER_URBAN_CODE,CUSTOMER_EFFECTIVE_DATE,CUSTOMER_GENDER,CUSTOMER_ANNUAL_INCOME,CUSTOMER_IMPORTANCE_LEVEL_CODE,CUSTOMER_TENURE_IN_MONTHS
0,1000,Capital Acquisition,Medium,Employed,Accumulating,30 to 40,College,Married,Moderate,1757.13,Recreation,City,2018-01-02,Male,325000.0,Low priority,8
1,1001,Retirement Planning,Very High,Selfemployed,Gifting,65 and over,Professional,Divorced,Aggressive,17935.79,Uncategorized,Urban,2017-11-29,Female,280000.0,Normal priority,10
2,1002,Increase Net Worth,Very Low,Homemaker,Accumulating,55 to 65,PhD,Married,Growth,1221.06,Travel,Urban,2017-08-28,Female,130000.0,High priority,13
3,1003,Increase Net Worth,Very Low,Homemaker,Accumulating,65 and over,PhD,Married,Growth,1176.59,Travel,Urban,2018-01-17,Female,120000.0,High priority,8
4,1004,Estate Planning,Medium,Employed,Accumulating,40 to 55,College,Married,Moderate,14452.36,Food,City,2018-01-03,Male,350000.0,Low priority,8


Now that the data is prepared we need to continue with a few more data preparation steps before we can do clustering. First is to simply remove the columns `CUSTOMER_CUSTOMER_ID` and `CUSTOMER_EFFECTIVE_DATE` since they're not needed for segmentation.

In [7]:
# Drop columns not needed for segmentation
df_prepped.drop(['CUSTOMER_CUSTOMER_ID', 'CUSTOMER_EFFECTIVE_DATE'], axis=1, inplace=True)

### Dummy Variables

Next, since our data contains mixed data types, categorical and numeric, we need to convert those categorical features to numeric by creating binary dummy variables. Once we create the dummy variables from the categorical features we'll drop the original categorical features.

In [8]:
# Create lists of the numeric and categorical features
numeric_cols = list(df_prepped.select_dtypes(include=[np.number]).columns)
categorical_cols = list(df_prepped.select_dtypes(include=[object]).columns)

# Copy of the prepped dataframe before any transformations are carried out
prepped_data_pre_transform = df_prepped.copy()

# Create dummy variables for categorical features and drop original
for col in categorical_cols:
    df_prepped = pd.concat([df_prepped, pd.get_dummies(df_prepped[col], prefix=col, drop_first=True)], axis=1)
    df_prepped.drop(col, axis=1, inplace=True)

### Standardize Data

The last step for our data preparation is to standardize the numeric variables. Standardizing numeric values prior to clustering is common practice especially when dealing with features of varying scales (number of children vs summary of assets amount). This helps to improve the cluster quality as well as cluster algorithm accuracy and performance.

In [9]:
scale_features = df_prepped[numeric_cols]

scaler = StandardScaler()
scale_features = scaler.fit_transform(scale_features.values)
df_prepped[numeric_cols] = scale_features

In [10]:
# Preview prepped data with standardized numeric values
df_prepped.head()

Unnamed: 0,CUSTOMER_SUMMARY_AMOUNT_OF_MANAGEMENT_FEES,CUSTOMER_ANNUAL_INCOME,CUSTOMER_TENURE_IN_MONTHS,CUSTOMER_PURSUIT_Education Planning,CUSTOMER_PURSUIT_Estate Planning,CUSTOMER_PURSUIT_Increase Net Worth,CUSTOMER_PURSUIT_Philanthropy,CUSTOMER_PURSUIT_Retirement Planning,CUSTOMER_CREDIT_AUTHORITY_LEVEL_Low,CUSTOMER_CREDIT_AUTHORITY_LEVEL_Medium,...,CUSTOMER_SUMMARY_TOP_SPENDING_CATEGORY_Recreation,CUSTOMER_SUMMARY_TOP_SPENDING_CATEGORY_Savings,CUSTOMER_SUMMARY_TOP_SPENDING_CATEGORY_Travel,CUSTOMER_SUMMARY_TOP_SPENDING_CATEGORY_Uncategorized,CUSTOMER_SUMMARY_TOP_SPENDING_CATEGORY_Utilities,CUSTOMER_URBAN_CODE_Rural,CUSTOMER_URBAN_CODE_Urban,CUSTOMER_GENDER_Male,CUSTOMER_IMPORTANCE_LEVEL_CODE_Low priority,CUSTOMER_IMPORTANCE_LEVEL_CODE_Normal priority
0,-0.850833,0.932255,-0.897529,0,0,0,0,0,0,1,...,1,0,0,0,0,0,0,1,1,0
1,2.167705,0.5736,-0.701125,0,0,0,0,1,0,0,...,0,0,0,1,0,0,1,0,0,1
2,-0.950851,-0.621916,-0.406519,0,0,1,0,0,0,0,...,0,0,1,0,0,0,1,0,0,0
3,-0.959148,-0.701617,-0.897529,0,0,1,0,0,0,0,...,0,0,1,0,0,0,1,0,0,0
4,1.517783,1.131508,-0.897529,0,1,0,0,0,0,1,...,0,0,0,0,0,0,0,1,1,0


## Principal Component Analysis (PCA)

Now that our data is clean with some transformations we're going to do one last transformation. We'll use principal components analysis (PCA) to reduce the dimensionality of our data and reduce clustering computation. Also, performing PCA prior to clustering can often help with performance if there are underlying linear relationships with the data. This isn't always the case and is dependent on your data, but it did hold true with our customer dataset.

Below we perform PCA on our dataset and select 14 components. You can see the newly transformed and reduced data set below. This will be the data set that we pass to our clustering algorithm.

In [11]:
# PCA
pca = PCA(n_components=14)
pca_fitted = pca.fit(df_prepped)
df_pca = pca_fitted.transform(df_prepped)

# Preview transformed data
pd.DataFrame(df_pca).head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13
0,-1.633069,0.313649,-0.425015,0.899856,-0.658682,1.346514,0.145198,0.20364,-0.129012,-0.038811,0.131175,0.132386,-0.084905,-0.270533
1,2.41277,1.449158,-1.563411,0.826072,-0.414704,-0.496564,0.013733,0.963539,-0.196875,0.22556,-0.688968,-0.154542,0.177696,0.054096
2,0.275778,-0.993911,1.886191,0.385648,0.002758,0.249282,0.753729,0.430153,0.490053,0.69681,-0.194723,-0.643173,-0.127439,-0.249291
3,0.530589,-1.085195,1.67239,0.85531,-0.271357,0.266506,0.281015,-0.008495,0.418908,0.740401,-0.531247,-0.803264,-0.146244,-0.657398
4,-0.680556,1.766475,-1.074686,1.096721,1.025809,0.680437,-0.147261,-0.195352,0.256169,0.172991,-0.220285,0.446379,0.019043,0.361261


From our PCA, we can view how much variance is explained for each additional principal component. The below cumulative sum shows that the first 14 components explains 90.6% of the data.

In [12]:
# Percent variance explained array
print(np.cumsum(pca.explained_variance_ratio_))

[0.21764406 0.38447951 0.52871062 0.64044143 0.70645547 0.74986159
 0.78734266 0.81547564 0.83739013 0.85529122 0.87230691 0.88615574
 0.89701825 0.90684308]


## K-means Clustering

We can now begin clustering. For clustering we'll use the k-means clustering algorithm. K-means clustering simply partitions the data into k clusters where each observation or client belongs to the cluster with the nearest mean. When using k-means you have to specify the number of clusters beforehand, but often times that number is unknown. We'll loop through the k-means algorithm using a range of number of clusters and determine cluster number by using the metric called silhouette coefficient. The silhouette coefficient indicates how similar the observations within its own cluster are compared to other observations in different clusters. Range for silhouette coefficient are from -1 to 1 where 1 represents objects within its own cluster are well paired.

In [13]:
# Specify max number of clusters for iteration
max_number_of_clusters = 15

# Loop through K-means and view silhouette coefficient to determine number of clusters
for i in range(2, max_number_of_clusters+1):
    kmeans_mdl = KMeans(n_clusters=i, random_state=1234)
    kmeans_mdl.fit(df_pca)
    labels = kmeans_mdl.labels_
    silhouette_coef = metrics.silhouette_score(df_pca, labels, metric='euclidean')
    print('Silhouette coefficient for ' + str(i) + ' clusters:' + str(silhouette_coef))

Silhouette coefficient for 2 clusters:0.22429690157253118
Silhouette coefficient for 3 clusters:0.23397827199300802
Silhouette coefficient for 4 clusters:0.27508986404202296
Silhouette coefficient for 5 clusters:0.28634452973423685
Silhouette coefficient for 6 clusters:0.30174119156635326
Silhouette coefficient for 7 clusters:0.2941987400417092
Silhouette coefficient for 8 clusters:0.2836505349160446
Silhouette coefficient for 9 clusters:0.2666771048718334
Silhouette coefficient for 10 clusters:0.2637364333328408
Silhouette coefficient for 11 clusters:0.261700168501964
Silhouette coefficient for 12 clusters:0.25686977767653274
Silhouette coefficient for 13 clusters:0.25829634023463766
Silhouette coefficient for 14 clusters:0.25242050363080915
Silhouette coefficient for 15 clusters:0.2528373458169747


For the above exercise we see that number of clusters should be 6 or 7 based on the silhouette coefficient. For this exercise we selected 7 clusters.

We then fit the k-means algorithm to our data using the specified 7 clusters. Then we add those cluster assignments back to the PCA dataframe so we can visualize the cluster assignments on a 2-dimensional plot.

In [14]:
# K-means with 7 clusters based on silhouette coefficient
num_clusters = 7
kmeans_mdl = KMeans(n_clusters=num_clusters, random_state=1234)
fitted_model = kmeans_mdl.fit(df_pca)


# Save Deployment Assets

### Save Data Transformation Assets to Shared Filesystem
We finally save the deployment assets to a filesystem shared by the Db2 container and our Jupyter notebook.

In [15]:
# store the columns to be standardised, means and standard deviations in the training metadata 
with open('/db2whjoblib/training_data_metadata.json', 'r') as f:
    training_metadata = json.load(f)

training_metadata['cols_to_standardise'] = numeric_cols
training_metadata['scaler_means'] = list(scaler.mean_)
training_metadata['scaler_standard_dev'] = list(np.sqrt(scaler.var_))

# store the names of the columns used for training
training_metadata['cols_used_for_training'] = list(df_prepped.columns)

with open('/db2whjoblib/training_data_metadata.json', 'w') as f:
    json.dump(training_metadata, f)
    
# Save out PCA
joblib.dump(pca_fitted, '/db2whjoblib/pca.txt')

['/db2whjoblib/pca.txt']

### Storing Kmeans Model as joblib

In [16]:
from joblib import dump

#save model to db2 mounted directory
dump(fitted_model,'/db2whjoblib/kmeans_mdl.joblib')

['/db2whjoblib/kmeans_mdl.joblib']

# II. Deploying the Trained Model and Inferencing Pipeline to Db2 for In-Database Scoring

In this section, we will deploy our trained model to Db2 and use it to make inferences in-database.

This section contains the following steps:
1. Connect to Db2 Database - Connect to Db2
2. Initalization of Environment - Installing Python packages on the Db2 platform and uploading deployment assets to the file system
3. Writing the UDF - Write the UDF that will process and score the data
4. UDF Function Deployment - Deploy the UDF function
5. Make Predictions - Call the UDF and return the query results

## 1. Connect to Db2 Database

Make sure you have a DB2 Warehouse instance and create a connection asset named "CSSDB3" 

In [17]:
#Establishing Connection to Database

import pandas as pd
import ibm_db_dbi
import ibm_db
from project_lib import Project 

# Define connection string and connect to database
project = Project.access()
LocalDB2_credentials = project.get_connection(name = "CSSDB3")

bluedb_connection = ibm_db.connect("DATABASE={};HOSTNAME={};PORT={};PROTOCOL=TCPIP;UID={};PWD={}".format(LocalDB2_credentials['database'],
                                                                                                         LocalDB2_credentials['host'],
                                                                                                         LocalDB2_credentials['port'],
                                                                                                         LocalDB2_credentials['username'],
                                                                                                         LocalDB2_credentials['password']),"","")
dbi_bluedb_connection = ibm_db_dbi.Connection(bluedb_connection)

## 2. Initialization of Environment


### Placing the necessary files and Python packages in the shared filesystem

**Note:** Only run cells in section 2 if running the notebook for the first time

In [19]:
# # Only run this cell if running the notebook for the first time!

# # copy original csv datatypes to pickle file to db2mount directory 
# !cp /project_data/data_asset/original_csv_dtypes.pkl /db2whjoblib/original_csv_dtypes.pkl


# # copy data preparation file to db2mount directory 
# !cp /project_data/data_asset/customer_segmentation_prep.py /db2whjoblib/customer_segmentation_prep.py

In [20]:
# Only run this cell if running the notebook for the first time!

# !pip install --target=/db2whjoblib/pandas pandas

# !pip install --target=/db2whjoblib/dateutil python-dateutil

# !pip install --target=/db2whjoblib/scikit-learn scikit-learn==0.23.2

## 3. Writing the UDF

1. Load in necessary data processing packages
2. Define the default parameters for data-prep object instantiation
3. Load in the files containing the data processing parameters (datatypes,normalization, pca) 
4. Read, process, score the data

In [21]:
%%writefile /db2joblib/full_pipeline_routine.py

#1. Defining and importing necessary packages 
import nzae
import json
from joblib import dump, load
import sys
import pandas as pd
# This is the transformation file
sys.path.insert(0, '/mnt/backup/joblib/')
import customer_segmentation_prep as transformer


class full_pipeline(nzae.Ae):
    def _runUdtf(self):
        
        #2.Defining parameters to instantiate "data prep" object 
        #-------------USER INPUT FIELDS-------------------#
        # User input variables
        effective_date = '2018-09-30'  # date at which the prediction was computed 
        train_or_score = 'score'
        granularity_key='CUSTOMER_CUSTOMER_ID'
        customer_start_date='CUSTOMER_SUMMARY_START_DATE'
        customer_end_date='CUSTOMER_SUMMARY_END_DATE'
        status_attribute='CUSTOMER_STATUS'
        status_flag_active='Active'
        date_customer_joined='CUSTOMER_RELATIONSHIP_START_DATE'

        columns_required=['CUSTOMER_CUSTOMER_ID', 'CUSTOMER_STATUS', 'CUSTOMER_SUMMARY_START_DATE', 'CUSTOMER_SUMMARY_END_DATE',
                            'CUSTOMER_EFFECTIVE_DATE',  'CUSTOMER_SYSTEM_LOAD_TIMESTAMP']

        default_attributes=['CUSTOMER_GENDER', 'CUSTOMER_AGE_RANGE', 'CUSTOMER_EDUCATION_LEVEL','CUSTOMER_EMPLOYMENT_STATUS', 'CUSTOMER_MARITAL_STATUS', 
                            'CUSTOMER_URBAN_CODE', 'CUSTOMER_ANNUAL_INCOME', 'CUSTOMER_RELATIONSHIP_START_DATE', 'CUSTOMER_SUMMARY_RETURN_LAST_QUARTER', 
                            'CUSTOMER_SUMMARY_NUMBER_OF_EMAILS','CUSTOMER_SUMMARY_NUMBER_OF_LOGINS','CUSTOMER_SUMMARY_AMOUNT_OF_MANAGEMENT_FEES',
                            'CUSTOMER_SUMMARY_TOP_SPENDING_CATEGORY', 'CUSTOMER_CREDIT_AUTHORITY_LEVEL', 'CUSTOMER_CUSTOMER_BEHAVIOR', 
                            'CUSTOMER_IMPORTANCE_LEVEL_CODE','CUSTOMER_MARKET_GROUP','CUSTOMER_PURSUIT']
        
        df_columns = columns_required + default_attributes
        
        risk_tolerance_list = []
        investment_objective_list = []
        std_multiplier=5
        max_num_cat_cardinality=15
        nulls_threshold=0.1
        #-------------USER INPUT FIELDS-------------------#
        
        #3. Loading pca, object type, and normalization parameters
        original_csv_dtypes = load('/mnt/backup/joblib/original_csv_dtypes.pkl')   #loading pkl file with datatypes 
        metadata_json = json.load( open('/mnt/backup/joblib/training_data_metadata.json','r')) #loading json file training metadata 
        pca = load('/mnt/backup/joblib/pca.txt') #loading pca file training metadata 
        
        cols_to_standardise = metadata_json['cols_to_standardise']
        scaler_means = metadata_json['scaler_means']
        scaler_standard_dev = metadata_json['scaler_standard_dev']
        cols_used_for_training = metadata_json['cols_used_for_training']
        
        self.model = load('/mnt/backup/joblib/kmeans_mdl.joblib')
        
        scoring_prep = transformer.CustomerSegmentationPrep('score', granularity_key=granularity_key,
                                    customer_start_date=customer_start_date,
                                    customer_end_date=customer_end_date,
                                    status_attribute=status_attribute,
                                    status_flag_active=status_flag_active,
                                    date_customer_joined=date_customer_joined,
                                    columns_required=columns_required,
                                    default_attributes=default_attributes,
                                    risk_tolerance_list=risk_tolerance_list,
                                    investment_objective_list=investment_objective_list,
                                    effective_date=effective_date,   
                                    std_multiplier=std_multiplier,
                                    max_num_cat_cardinality=max_num_cat_cardinality,
                                    nulls_threshold=nulls_threshold)
        
        #4. Read in, process, and score the DB2 data in a loop (batch scoring)
        batchsize = 10000
        rownum = 0
        row_list = []
            
        for row in self: 

            # Collect rows into batches
            if self.isDone():
                if rownum==0:
                    break
                # handle (last) paritally filled batch
                batchsize = rownum
            else:
                row_list.append(row)
                rownum = rownum+1         
            
            # Convert batches into DataFrames
            if rownum==batchsize:
                input_df = pd.DataFrame(row_list, columns=df_columns)
            

                #Defining the input fields back to their original python datatypes
                input_df = input_df.astype(original_csv_dtypes)


                #Execute transformation script
                prepped_data = scoring_prep.prep_data(input_df, 'score')
                ids = list(prepped_data['CUSTOMER_CUSTOMER_ID'])

                if (prepped_data is not None):
            
                    # Perform feature scaling 
                    for i in range(0, len(cols_to_standardise)):
                        current_col = cols_to_standardise[i]
                        prepped_data[current_col] = (prepped_data[current_col] - scaler_means[i]) / scaler_standard_dev[i]   

                    # if a column does not exist in scoring but is in training, add the column to scoring dataset
                    for col in cols_used_for_training:
                        if col not in list(prepped_data.columns):
                            prepped_data[col] = 0
                            
                    # if a column exists in scoring but not in training, delete it from scoring dataset
                    for col in list(prepped_data.columns):
                        if col not in cols_used_for_training:
                            prepped_data.drop(col, axis=1, inplace=True)

                    prepped_data = prepped_data[cols_used_for_training]

                    prepped_data = pd.DataFrame(pca.transform(prepped_data), columns=['pc_1','pc_2','pc_3','pc_4','pc_5','pc_6','pc_7','pc_8',
                                                                                      'pc_9','pc_10','pc_11','pc_12','pc_13','pc_14'])
                    
                    prediction = self.model.predict(prepped_data) 
                    
                    for x in range(len(prediction)):
                        self.output(float(ids[x]),float(prediction[x]))
                    
                # prepare for next batch
                row_list = []
                rownum = 0
                
        self.done()

        
full_pipeline.run()

Overwriting /db2joblib/full_pipeline_routine.py


Enable read/write access only if runnning for **first time**

In [22]:
# # Providing permissions to execute UDF - only run this cell if running notebook for first time
# !chmod -R 777 /db2joblib/*

## 4. UDF Function Deployment

### Writing and concatenating deployment strings

In [23]:
# Define input column names
columns_required=['CUSTOMER_CUSTOMER_ID', 'CUSTOMER_STATUS', 'CUSTOMER_SUMMARY_START_DATE', 'CUSTOMER_SUMMARY_END_DATE','CUSTOMER_EFFECTIVE_DATE',  
                  'CUSTOMER_SYSTEM_LOAD_TIMESTAMP']

default_attributes=['CUSTOMER_GENDER', 'CUSTOMER_AGE_RANGE', 'CUSTOMER_EDUCATION_LEVEL','CUSTOMER_EMPLOYMENT_STATUS', 'CUSTOMER_MARITAL_STATUS', 
                    'CUSTOMER_URBAN_CODE', 'CUSTOMER_ANNUAL_INCOME', 'CUSTOMER_RELATIONSHIP_START_DATE', 'CUSTOMER_SUMMARY_RETURN_LAST_QUARTER', 
                    'CUSTOMER_SUMMARY_NUMBER_OF_EMAILS','CUSTOMER_SUMMARY_NUMBER_OF_LOGINS','CUSTOMER_SUMMARY_AMOUNT_OF_MANAGEMENT_FEES',
                    'CUSTOMER_SUMMARY_TOP_SPENDING_CATEGORY', 'CUSTOMER_CREDIT_AUTHORITY_LEVEL', 'CUSTOMER_CUSTOMER_BEHAVIOR', 
                    'CUSTOMER_IMPORTANCE_LEVEL_CODE','CUSTOMER_MARKET_GROUP','CUSTOMER_PURSUIT']

In [24]:
# Query Db2 for input column datatypes - used for creating the UDF input datatypes string
sql = """SELECT NAME, COLTYPE,LENGTH FROM SYSIBM.SYSCOLUMNS WHERE TBCREATOR='DSE' AND TBNAME='CUST_SEG_DATA_TEST'"""
dtypes_df = pd.read_sql(sql, dbi_bluedb_connection)

# filter based on input columns we use (24 out of ~300 available)
dtypes_df=dtypes_df[dtypes_df['NAME'].isin(columns_required+default_attributes)]

In [25]:
# Create Db2 datatype mapping for UDF input columns
mapping = [str(dtypes_df['COLTYPE'][dtypes_df['NAME']==x].values[0]).strip()+'('+str(dtypes_df['LENGTH'][dtypes_df['NAME']==x].values[0])+')' 
           if str(dtypes_df['COLTYPE'][dtypes_df['NAME']==x].values[0]).strip()=="VARCHAR" 
           else str(dtypes_df['COLTYPE'][dtypes_df['NAME']==x].values[0]).strip() 
           for x in (columns_required + default_attributes)]

# Create a string from the mapping. This is used in the deployment statement
input_dtypes_string = ', '.join([x for x in mapping ])

### Function Deployment 


In [26]:
# Deploy UDF function
create_udf_function_full_pipeline = """
CREATE OR REPLACE FUNCTION full_pipeline({0}) returns table (ID double, PREDICTION double) language PYTHON  parameter style \
NPSGENERIC  FENCED  NOT THREADSAFE  NO FINAL CALL  DISALLOW PARALLEL  NO DBINFO  DETERMINISTIC \
NO EXTERNAL ACTION RETURNS NULL ON NULL INPUT  NO SQL external name '/mnt/backup/joblib/full_pipeline_routine.py'""".format(input_dtypes_string)
ibm_db.exec_immediate(bluedb_connection,create_udf_function_full_pipeline)

print(create_udf_function_full_pipeline)


CREATE OR REPLACE FUNCTION full_pipeline(DOUBLE, VARCHAR(90), VARCHAR(90), VARCHAR(90), VARCHAR(90), DOUBLE, VARCHAR(90), VARCHAR(90), VARCHAR(90), VARCHAR(90), VARCHAR(90), VARCHAR(90), DOUBLE, VARCHAR(90), DOUBLE, DOUBLE, DOUBLE, VARCHAR(90), VARCHAR(90), VARCHAR(90), VARCHAR(90), VARCHAR(90), VARCHAR(90), VARCHAR(90)) returns table (ID double, PREDICTION double) language PYTHON  parameter style NPSGENERIC  FENCED  NOT THREADSAFE  NO FINAL CALL  DISALLOW PARALLEL  NO DBINFO  DETERMINISTIC NO EXTERNAL ACTION RETURNS NULL ON NULL INPUT  NO SQL external name '/mnt/backup/joblib/full_pipeline_routine.py'


## 5. Make Predictions


In [27]:
## Writing column selection string for the input. This is used in the SQL selection queries. 
columns_selection_sql= ', '.join(str(x) for x in columns_required + default_attributes)
# Add "i." to input columns for use in the SQL statement
input_columns = ', '.join('i.{}'.format(x) for x in columns_selection_sql.split(","))

In [28]:
# Drop existing results table
sql_drop = 'DROP TABLE DSE.CUST_SEG_PREDICTIONS'
stmt_drop = ibm_db.prepare(bluedb_connection, sql_drop)

ibm_db.execute(stmt_drop)

True

In [29]:
# Create table for storing predictions
sql = 'CREATE TABLE DSE.CUST_SEG_PREDICTIONS(CUST_ID DOUBLE, PREDICTION DOUBLE);'

stmt = ibm_db.prepare(bluedb_connection, sql)

ibm_db.execute(stmt)

True

In [30]:
execute_udf_sql = 'INSERT INTO DSE.CUST_SEG_PREDICTIONS(CUST_ID, PREDICTION) select f.* from DSE.CUST_SEG_DATA_TEST i, table(full_pipeline({})) f'.format(input_columns)
print(execute_udf_sql)

INSERT INTO DSE.CUST_SEG_PREDICTIONS(CUST_ID, PREDICTION) select f.* from DSE.CUST_SEG_DATA_TEST i, table(full_pipeline(i.CUSTOMER_CUSTOMER_ID, i. CUSTOMER_STATUS, i. CUSTOMER_SUMMARY_START_DATE, i. CUSTOMER_SUMMARY_END_DATE, i. CUSTOMER_EFFECTIVE_DATE, i. CUSTOMER_SYSTEM_LOAD_TIMESTAMP, i. CUSTOMER_GENDER, i. CUSTOMER_AGE_RANGE, i. CUSTOMER_EDUCATION_LEVEL, i. CUSTOMER_EMPLOYMENT_STATUS, i. CUSTOMER_MARITAL_STATUS, i. CUSTOMER_URBAN_CODE, i. CUSTOMER_ANNUAL_INCOME, i. CUSTOMER_RELATIONSHIP_START_DATE, i. CUSTOMER_SUMMARY_RETURN_LAST_QUARTER, i. CUSTOMER_SUMMARY_NUMBER_OF_EMAILS, i. CUSTOMER_SUMMARY_NUMBER_OF_LOGINS, i. CUSTOMER_SUMMARY_AMOUNT_OF_MANAGEMENT_FEES, i. CUSTOMER_SUMMARY_TOP_SPENDING_CATEGORY, i. CUSTOMER_CREDIT_AUTHORITY_LEVEL, i. CUSTOMER_CUSTOMER_BEHAVIOR, i. CUSTOMER_IMPORTANCE_LEVEL_CODE, i. CUSTOMER_MARKET_GROUP, i. CUSTOMER_PURSUIT)) f


In [31]:
stmt = ibm_db.prepare(bluedb_connection, execute_udf_sql)
ibm_db.execute(stmt)

True

In [32]:
sql = 'SELECT * FROM DSE.CUST_SEG_PREDICTIONS ORDER BY CUST_ID'
df = pd.read_sql(sql, dbi_bluedb_connection)
df.head(10)

Unnamed: 0,CUST_ID,PREDICTION
0,0.0,4.0
1,1.0,4.0
2,2.0,6.0
3,3.0,4.0
4,4.0,6.0
5,5.0,5.0
6,6.0,4.0
7,7.0,6.0
8,8.0,4.0
9,9.0,5.0
