In [None]:
#!python -m pip install --user --upgrade pip

In [None]:
#!pip3 install kfp --upgrade --user

In [1]:
# create  directory for outputs.
output_dir = "/home/jovyan/data/"

In [2]:
def data_download_n_class_declr(data_path):
    
    # IMPORT LIBRARY 
    
   
    import sys, subprocess
    subprocess.run([sys.executable, "-m", "pip", "install", "pandas"])
    subprocess.run([sys.executable, "-m", "pip", "install", "numpy"])
    subprocess.run([sys.executable, "-m", "pip", "install", "dill"])
    subprocess.run([sys.executable, "-m", "pip", "install", "scikit-learn==0.22"])
    
    import dill
    import gzip
    import pandas as pd
    import numpy as np
        
    import random as python_random
    
    
    from sklearn.preprocessing import StandardScaler
    from sklearn.base import BaseEstimator, TransformerMixin, ClassifierMixin
    
    import warnings
    warnings.filterwarnings('ignore')
    
    
    # setting random seed for result reproducibility
    np.random.seed(1)
    python_random.seed(12)
    
    
    # Data Download
    credit_card_df = pd.read_csv('https://raw.github.com/HamoyeHQ/g01-fraud-detection/master/data/credit_card_dataset.zip')
    
    
    print('=== DOWNLOAD DATA SUCCESSFUL ===')
    
    
                       
    # CREATING THE COLUMN SELECTOR CLASS
                       
    # 27 most important features according to our EDA
    cols = ['V'+str(i) for i in range(1, 29) if i != 25]
                       
    class ColumnSelector(BaseEstimator, TransformerMixin):
        def __init__(self, cols=cols):
            self.cols = cols

        def fit(self, X, y=None):
            return self

        def transform(self, X):
            if isinstance(X, pd.DataFrame):
                return np.array(X[self.cols])

            elif isinstance(X, pd.Series):
                return np.array(X[self.cols]).reshape(1, -1)

            elif isinstance(X, np.ndarray):
                self.cols_ind = [int(col[1:]) for col in self.cols]
                if len(X.shape) == 1: # if one dimensional array
                    return X[self.cols_ind].reshape(1, -1)
                    return X[:, self.cols_ind]

            else:
                raise TypeError('expected input type to be any of pd.Series, pd.DataFrame or np.ndarray but got {}'.format(type(X)))
            
    print('=== CREATED COLUMN SELECTOR ===')

                       
    cols_select = ColumnSelector()
    scaler = StandardScaler()
    
  
    print('=== SERIALIZING CLASSESS, AND DATA ===')
                       
    with gzip.open(f"{data_path}/columnSelector.gz.dill", "wb") as f:                
        dill.dump(cols_select, f)
    
    with gzip.open(f"{data_path}/scaler.gz.dill", "wb") as f:                
        dill.dump(scaler, f)
                
    with gzip.open(f"{data_path}/data.gz.dill", "wb") as f:                
        dill.dump(credit_card_df, f)
        
    
    print('=== DONE ===')

In [3]:
### DATA DOWNLOAD ,FUNCTION AND CLASS DECLARATION
data_download_n_class_declr(output_dir)

=== DOWNLOAD DATA SUCCESSFUL ===
=== CREATED COLUMN SELECTOR ===
=== SERIALIZING CLASSESS, AND DATA ===
=== DONE ===


In [4]:
def fraud_detection_model(data_path):
    
    
    import sys, subprocess
    subprocess.run([sys.executable, "-m", "pip", "install", "dill"])
    subprocess.run([sys.executable, "-m", "pip", "install", "scikit-learn==0.22"])
    subprocess.run([sys.executable, "-m", "pip", "install", "numpy"])
    subprocess.run([sys.executable, "-m", "pip", "install", "pandas"])
    subprocess.run([sys.executable, "-m", "pip", "install", "xgboost"])
    
    import random as python_random
    
    import numpy as np
    from xgboost import XGBClassifier
    from sklearn.pipeline import Pipeline
    from sklearn.calibration import CalibratedClassifierCV
    
    
    python_random.seed(12)
    np.random.seed(1)
    
    
    import dill
    import gzip
    
    
    print('=== DE-SERIALIZING CLASSESS, AND DATA ===')
                       
    with gzip.open(f"{data_path}/columnSelector.gz.dill", "rb") as f:                
        columnselector = dill.load(f)
    
    with gzip.open(f"{data_path}/scaler.gz.dill", "rb") as f:                
        scaler = dill.load(f)
            
        
    with gzip.open(f"{data_path}/data.gz.dill", "rb") as f:                
        data = dill.load(f)
        
        
    print('=== DONE ===')
        
    
    print('=== CREATING DATA PREPARATION PIPELINE ===')
    # data preparation pipeline
    data_prep = Pipeline([('columns', columnselector), ('scaler', scaler)])
    
    print('=== DONE ===')
    
    
    
    print('=== CREATING FEATURES AND TARGETS ===')
    
    y = data.pop('Class')
    X = data
    
    print('=== DONE ===')
    

    
    print('=== FITTING DATA PREPARATION PIPELINE TO DATA ===')
    

    # fitting and transforming the data
    X_prep = data_prep.fit_transform(X, y)
    
    print('=== DONE ===')

    
    
    
    print('=== CREATING FRAUD PREDICTING MODEL ===')
    
    model = XGBClassifier(random_state=1)
    
    admin_cost = 2.5
    

    sample_weights = np.array([X['Amount'].iloc[ind] if fraud else admin_cost for ind, fraud in enumerate(y.values)])
    
    print('=== FITTING DATA TO FRAUD PREDICTING MODEL ===')
    
    model.fit(X_prep, y, sample_weight=sample_weights);
    
    print('=== DONE ===')
    

    
    print('=== CREATE AND FIT CALIBRATED CLASSIFIER TO PREPARED DATA ===')
    calibration = CalibratedClassifierCV(model, method='isotonic', cv='prefit')
    calibration.fit(X_prep, y);
    print('=== DONE===')
    
    
    print('=== SERIALIZING FUNCTIONS, FEATURES, TARGET, AND MODEL ===')
    
    # saving the data prep object
    with gzip.open(f"{data_path}/data_prep_pipe.gz.dill", 'wb') as f:
        dill.dump(data_prep, f)
        
    # saving the fitted calibrated classifier
    with gzip.open(f"{data_path}/calibrator.gz.dill", 'wb') as f:
        dill.dump(calibration, f)
        
    # saving the features
    with gzip.open(f"{data_path}/features.gz.dill", 'wb') as f:
        dill.dump(X, f)
        
    # saving the targets
    with gzip.open(f"{data_path}/target.gz.dill", 'wb') as f:
        dill.dump(y, f)
        
    
              
    print('=== DONE ===')

In [5]:
fraud_detection_model(output_dir)

=== DE-SERIALIZING CLASSESS, AND DATA ===
=== DONE ===
=== CREATING DATA PREPARATION PIPELINE ===
=== DONE ===
=== CREATING FEATURES AND TARGETS ===
=== DONE ===
=== FITTING DATA PREPARATION PIPELINE TO DATA ===
=== DONE ===
=== CREATING FRAUD PREDICTING MODEL ===
=== FITTING DATA TO FRAUD PREDICTING MODEL ===
=== DONE ===
=== CREATE AND FIT CALIBRATED CLASSIFIER TO PREPARED DATA ===
=== DONE===
=== SERIALIZING FUNCTIONS, FEATURES, TARGET, AND MODEL ===
=== DONE ===


In [8]:
def predictions(data_path):
    
    import sys, subprocess
    subprocess.run([sys.executable, "-m", "pip", "install", "pandas"])
    subprocess.run([sys.executable, "-m", "pip", "install", "dill"])
    subprocess.run([sys.executable, "-m", "pip", "install", "scikit-learn==0.22"])
    subprocess.run([sys.executable, "-m", "pip", "install", "xgboost"])
    subprocess.run([sys.executable, "-m", "pip", "install", "numpy"])
    

    import random as python_random
    import numpy as np
    from sklearn.model_selection import  train_test_split
    
        
    python_random.seed(12)
    np.random.seed(1)
    
    
    import dill
    import gzip
    
   
    
    
    
    print('=== DE-SERIALIZING FUNCTIONS, FEATURES, TARGET, AND MODEL ===')
    
    # loading in useful objects
    with gzip.open(f"{data_path}/data_prep_pipe.gz.dill", 'rb') as f:
        data_prep = dill.load(f)
        
    
    with gzip.open(f"{data_path}/calibrator.gz.dill", 'rb') as f:
        calibrator = dill.load(f)
        
    with gzip.open(f"{data_path}/features.gz.dill", "rb") as f:                
        X = dill.load(f)
    
    with gzip.open(f"{data_path}/target.gz.dill", "rb") as f:                
        y = dill.load(f)
        
    print('=== DONE ===')
    
    
    
    print('=== DATA SPLITTING ===')

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, stratify=y, random_state=1)
        
    print('=== DONE ===')
    
    
    
    print('=== TEST DATA PREPARATION ===')
    
    Xt = data_prep.transform(X_test)
    
    print('=== DONE ===')
    
    
    print('=== MODEL PREDICTION ===')
    
    def predictions(Xt):
        if False:
            pred = calibrator.predict_proba(Xt) # gets the probability of belonging to the positvie class
            #print(f'This is from predict proba {pred}')

            if len(pred.shape) > 1: # pred is 2-dim (multi-input)
                pred = pred[:, 1]

            else: # pred is 1-dim (single-input)
                pred = pred[1]

        else: # get raw predictions
            pred = calibrator.predict(Xt) # gets the prediction
            
            #print(f'This is from predict {pred}')

        return pred
    
    predictions = predictions(Xt)

    print('=== DONE ===')
    
    
    print('=== SERIALIZING PREDICTIONS ===')
                       
    with gzip.open(f"{data_path}/predictions.gz.dill", "wb") as f:                
        dill.dump(predictions, f)
    
    with gzip.open(f"{data_path}/testTargets.gz.dill", "wb") as f:                
        dill.dump(y_test, f)
        
    with gzip.open(f"{data_path}/testFeatures.gz.dill", "wb") as f:                
        dill.dump(X_test, f)
    
    
    print(' === DONE ===')

In [9]:
predictions(output_dir)

=== DE-SERIALIZING FUNCTIONS, FEATURES, TARGET, AND MODEL ===
=== DONE ===
=== DATA SPLITTING ===
=== DONE ===
=== TEST DATA PREPARATION ===
=== DONE ===
=== MODEL PREDICTION ===
=== DONE ===
=== SERIALIZING PREDICTIONS ===
 === DONE ===


In [12]:
def prediction_summary(data_path):
    import sys, subprocess
    subprocess.run([sys.executable, "-m", "pip", "install", "pandas"])
    subprocess.run([sys.executable, "-m", "pip", "install", "dill"])
    subprocess.run([sys.executable, "-m", "pip", "install", "scikit-learn==0.22"])
    subprocess.run([sys.executable, "-m", "pip", "install", "numpy"])
    
    
    import random as python_random
    
    from sklearn.metrics import f1_score
    import numpy as np
    import pandas as pd
        
    python_random.seed(12)
    np.random.seed(1)
    
    
    import dill
    import gzip
    
   
    
    
    print('=== DE-SERIALIZING PREDICTIONS, TEST FEATURES, AND TEST TARGETS ===')
    
    with gzip.open(f"{data_path}/predictions.gz.dill", 'rb') as f:
        pred = dill.load(f)

    with gzip.open(f"{data_path}/testTargets.gz.dill", 'rb') as f:
        y_test = dill.load(f)
        
    with gzip.open(f"{data_path}/testFeatures.gz.dill", "rb") as f:                
        x_test = dill.load(f)
        
    print('=== DONE ===')
    
    
    
    # defining a function to calculate cost savings
    def cost_saving(ytrue, ypred, amount, threshold=0.5,admin_cost=2.5, epsilon=1e-7):
        ypred = ypred.flatten()
        fp = np.sum((ytrue == 0) & (ypred == 1))
        cost = np.sum(fp*admin_cost) + np.sum((amount[(ytrue == 1) & (ypred == 0)]))
        max_cost = np.sum((amount[(ytrue == 1)])) 
        savings = 1 - (cost/(max_cost+epsilon))
        
        return savings
    


    print('=== MULTI-INPUT TESTING ===')
    
    is_fraud = (pred >= 0.5).astype(np.int64)
    pred_df = pd.DataFrame({'Class': is_fraud, 'Fraud_Probabilty': pred})
    
    
    
    print('f1_score is {}'.format(f1_score(y_test, is_fraud)))
    if isinstance(pred, np.ndarray):
        amount = x_test.iloc[:, -1]
    else:
        amount = x_test.iloc[:, -1]
    print('cost saving is {}'.format(cost_saving(y_test, is_fraud, amount)))
    
    print(pred_df.head())
    
    print('=== DONE ===')
    
    
    
    print('=== SINGLE-INPUT TESTING ===')
    
    
    is_fraud2 = (pred[0] >= 0.5).astype(np.int64)
    
    pred_df2 = pd.DataFrame({'Class': is_fraud2, 'Fraud_Probabilty': pred[0]}, index=[0])

    print('f1_score is {}'.format(f1_score([y_test[0]], [is_fraud2])))#.format(f1_score(y_test[0], is_fraud2)))
    print('cost saving is {}'.format(cost_saving(y_test.iloc[0], is_fraud2, x_test.iloc[0][-1].reshape(1))))
    
    print(pred_df2)
    
    print('=== DONE ===')
    
    print('=== SERIALZING RESULTS ===')
    # write predictions to results.txt
    with open(f'{data_path}/results.txt','w') as result:
        result.write(f'MULTI-INPUT TESTING:\n F1 SCORE: {f1_score(y_test, is_fraud)} \n COST SAVING: {cost_saving(y_test, is_fraud, amount)} \n {pred_df.head()} \n \n SINGLE-INPUT TESTING:\n F1 SCORE: {f1_score([y_test[0]], [is_fraud2])} \n COST SAVING: {cost_saving(y_test.iloc[0], is_fraud2, x_test.iloc[0][-1].reshape(1))} \n {pred_df2.head()}')
        
    print('=== DONE ===')

In [13]:
prediction_summary(output_dir)

=== DE-SERIALIZING PREDICTIONS, TEST FEATURES, AND TEST TARGETS ===
=== DONE ===
=== MULTI-INPUT TESTING ===
f1_score is 0.8897959183673471
cost saving is 0.9624808839300956
   Class  Fraud_Probabilty
0      0                 0
1      0                 0
2      0                 0
3      0                 0
4      0                 0
=== DONE ===
=== SINGLE-INPUT TESTING ===
f1_score is 0.0
cost saving is 1.0
   Class  Fraud_Probabilty
0      0                 0
=== DONE ===
=== SERIALZING RESULTS ===
=== DONE ===


### Creating Components

In [14]:
import kfp
from kfp import dsl
import kfp.components as comp

In [15]:
!which dsl-compile

In [16]:
data_download_n_class_declr_op = comp.func_to_container_op(data_download_n_class_declr, base_image= "python:3.7")
fraud_detection_model_op = comp.func_to_container_op(fraud_detection_model, base_image= "python:3.7")
predictions_op = comp.func_to_container_op(predictions, base_image="python:3.7")
prediction_summary_op = comp.func_to_container_op(prediction_summary, base_image="python:3.7")

### CREATE PIPELINE

In [17]:
# create client that would enable communication with the Pipelines API server 
client = kfp.Client()

In [18]:
@dsl.pipeline(name ="Fraud Detection",
        description = "Fraud Detection Pipeline")

def fraud_detection(data_path:str):
    
    volume_op = dsl.VolumeOp(
        name="data_volume",
        resource_name="data-volume",
        size="1Gi",
        modes=dsl.VOLUME_MODE_RWO)
    
    # Create data download components.
    data_download_class_declr_container = data_download_n_class_declr_op(data_path).add_pvolumes({data_path:volume_op.volume})

    # Create data preprocessing component.
    fraud_detection_model_container = fraud_detection_model_op(data_path).add_pvolumes({data_path: data_download_class_declr_container.pvolume})
        
    # Create Predictions Component.
    predictions_container = predictions_op(data_path)\
                                        .add_pvolumes({data_path:fraud_detection_model_container.pvolume})
    
    # Prediction Summary Component.
    prediction_summary_container = prediction_summary_op(data_path)\
                                        .add_pvolumes({data_path:predictions_container.pvolume})
    
    # Print the result of the prediction
    result_container = dsl.ContainerOp(
            name="print_prediction",
            image='library/bash:4.4.23',
            pvolumes={data_path: prediction_summary_container.pvolume},
            arguments=['cat', f'{data_path}/results.txt']
    )

In [19]:
DATA_PATH = "/mnt"

pipeline_func = fraud_detection

experiment_name = 'fraud_detection_kubeflow'
run_name = pipeline_func.__name__ + ' run'

arguments = {"data_path":DATA_PATH}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)