In [None]:
#!python -m pip install --user --upgrade pip

In [None]:
import kfp
from kfp import dsl

### component that obtains data

In [None]:
def obtain_data_op():
    return dsl.ContainerOp(
        name = 'Obtain Data',
        image = 'mojolaoluwa/get-data-component:v.0.1',
        arguments = [],
        file_outputs={
            'data': '/obtain_data/data'
        }      
    )

### component that does preprocessing

In [None]:
def preprocess_op(data):
    return dsl.ContainerOp(
        name = 'Preprocess Data',
        image = 'mojolaoluwa/preprocess-component:v.0.1',
        arguments = [
            '--data', data
        ],
        file_outputs={
            'X_train':'/preprocess_data/X_train.npy',
            'X_test':'/preprocess_data/X_test.npy',
            'y_train':'/preprocess_data/y_train.npy',
            'y_test':'/preprocess_data/y_test.npy'     
        }
    )

### component for training the model

In [None]:
def train_op(X_train, y_train):
    return dsl.ContainerOp(
        name = 'Train Model',
        image = 'mojolaoluwa/train-tensorflow:v.0.1' ,
        arguments = [
            '--X_train', X_train,
            '--y_train', y_train   
        ],
        file_outputs={
            'model':'/train_data/classifier.h5'
        }
    )

### components for predicting on the test data

In [None]:
def predict_op(X_test, y_test, model):
    return dsl.ContainerOp(
        name='Predict Model',
        image=
        arguments = [
            '--X_test', X_test,
            '--y_test', y_test,
            '--model', model
        ],
        file_outputs={
            'results':'/predict_data/results'
        }
    )

### Defining pipeline and including its components

In [None]:
@dsl.pipeline(
    name='Churn modelling pipeline',
   description='An ML reusable pipeline that performs customer segmentation to determine customers with high risk of leaving a bank .'
)

# Define parameters to be fed into pipeline
def churn_reuseable_tensorflow_pipeline():
    _obtain_data_op = obtain_data_op()
    
    _preprocess_op = preprocess_op(
        dsl.InputArgumentPath(_obtain_data_op.outputs['data'])).after(_obtain_data_op)
    
    _train_op = train_op(
        dsl.InputArgumentPath(_preprocess_op.outputs['X_train']),
        dsl.InputArgumentPath(_preprocess_op.outputs['y_train'])).after(_preprocess_op)

    _predict_op = predict_op(
        dsl.InputArgumentPath(_preprocess_op.outputs['X_test']),
        dsl.InputArgumentPath(_preprocess_op.outputs['y_test']),
        dsl.InputArgumentPath(_train_op.outputs['model'])).after(_train_op)

In [None]:
# Compile pipeline to generate compressed YAML definition of the pipeline.
experiment_name = 'churn_analysis_tensorflow_pipeline'
kfp.compiler.Compiler().compile(churn_reuseable_tensorflow_pipeline,  
  '{}.zip'.format(experiment_name))

### running the pipeline

In [None]:
client = kfp.Client()
client.create_run_from_pipeline_func(churn_reuseable_tensorflow_pipeline, arguments={})

In [None]:
# create  directory for outputs.
output_dir = "/home/jovyan/trial/"

In [None]:
# create preprocessing fucntion

def get_data(data_path):
    #importing libraries
    import pickle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas==0.23.4'])
    import pandas as pd
    #importing the data
    data = pd.read_csv("https://raw.githubusercontent.com/AdeloreSimiloluwa/Artificial-Neural-Network/master/data/Churn_Modelling.csv")

    ## serialize clean data to output directory
    with open(f'{data_path}/clean_data','wb') as f:
        pickle.dump((data),f)
    
    return (print('Done!'))

In [None]:
# create training and prediction function

def preprocess(data_path):
    import pickle
    # import Library
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install','scikit-learn==0.22'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas==0.23.4'])
    import pandas as pd
    import numpy as np
    from sklearn.preprocessing import LabelEncoder
    from sklearn.preprocessing import OneHotEncoder
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler 

    # deserialize clean data from output directory
    with open(f'{data_path}/clean_data','rb') as f:
        data = pickle.load(f)
    #dropping some columns that are not needed
    data = data.drop(columns=['RowNumber','CustomerId','Surname'], axis=1)
    #data features
    X = data.iloc[:,:-1]
    #target data
    y = data.iloc[:,-1:]   
    #encoding the categorical columns
    le = LabelEncoder()
    ohe = OneHotEncoder()
    X['Gender'] = le.fit_transform(X['Gender'])
    geo_df = pd.DataFrame(ohe.fit_transform(X[['Geography']]).toarray())

    #getting feature name after onehotencoding
    geo_df.columns = ohe.get_feature_names(['Geography'])

    #merging geo_df with the main data
    X = X.join(geo_df) 
    #dropping the old columns after encoding
    X.drop(columns=['Geography'], axis=1, inplace=True)

    #splitting the data 
    X_train,X_test,y_train,y_test = train_test_split( X,y, test_size=0.2, random_state = 42)
    #feature scaling
    sc =StandardScaler()
    X_train = sc.fit_transform(X_train)
    X_test = sc.transform(X_test)
    
    # serialize clean data to output directory
    with open(f'{data_path}/split_data','wb') as f:
        pickle.dump((X_train, X_test, y_train, y_test),f)
   
    return(print('Done!'))

In [None]:
def train(data_path, model):
    #importing libraries
    import pickle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22'])
    import numpy as np
    from tensorflow import keras
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense

   # deserialize clean data from output directory
    with open(f'{data_path}/split_data','rb') as f:
        X_train, X_test, y_train, y_test = pickle.load(f)
    
    #initializing the classifier model with its input, hidden and output layers
    classifier = Sequential()
    classifier.add(Dense(units = 16, init='uniform', activation='relu', input_dim=12,))
    classifier.add(Dense(units = 8, init='uniform', activation='relu'))
    classifier.add(Dense(units = 1, init='uniform', activation='sigmoid'))
    #Compiling the classifier model with Stochastic Gradient Desecnt
    classifier.compile(optimizer = 'adam', loss='binary_crossentropy' , metrics =['accuracy'])
    #fitting the model
    classifier.fit(X_train, y_train, batch_size=10 , nb_epoch=150)
    #saving the model
    classifier.save(f'{data_path}/{model}')
    
    return(print('Done!'))

In [None]:
 def predict(data_path,model):
    #importing libraries
    import pickle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22'])
    import numpy as np
    from tensorflow import keras
    from tensorflow.keras.models import load_model
    
    # deserialize clean data from output directory
    with open(f'{data_path}/split_data','rb') as f:
        X_train, X_test, y_train, y_test = pickle.load(f)
        
    classifier = load_model(model)

    #Evaluate the model and print the results
    test_loss, test_acc = classifier.evaluate(X_test,  y_test, verbose=0)
    print('Test accuracy:', test_acc)
    print('Test loss:', test_loss)
    #model's prediction on test data
    y_pred = classifier.predict(X_test)
    # create a threshold for the confution matrics
    y_pred=(y_pred>0.5)
    
    # confusion metrics
    cm = confusion_matrix(y_test, y_pred)
      

In [None]:
# create light weight components
get_data_op = comp.func_to_container_op(get_data)
preprocess_op = comp.func_to_container_op(preprocess, base_image="python 3.7")
train_op = comp.func_to_container_op(train, base_image="tensorflow/tensorflow:latest-gpu-py3")
predict_op = comp.func_to_container_op(predict, base_image="tensorflow/tensorflow:latest-gpu-py3")

In [None]:
# define pipeline
@dsl.pipeline(name='Churn modelling pipeline',
   description='An ML reusable pipeline that performs customer segmentation to determine customers with high risk of leaving a bank .')

# Define parameters to be fed into pipeline
def road_safety_pipeline(data_path: str ):
    
    # Define volume to share data between components.
    vop = dsl.VolumeOp(
    name="create_volume",
    resource_name="data-volume", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWO) #RWO

    # Create get-data components.
    churn_get_data_container = get_data_op(data_path).add_pvolumes({data_path: vop.volume})

    # Create preprocess component.
    churn_preprocess_container = preprocess_op(data_path).add_pvolumes({data_path: churn_get_data_container.pvolume})
    
     # Create train component.
    churn_train_container = preprocess_op(data_path,model).add_pvolumes({data_path: churn_preprocess_container.pvolume})
    
     # Create predict component.
    churn_predict_container = preprocess_op(data_path,model).add_pvolumes({data_path: churn_train_container.pvolume})

    # Print the result of the prediction
    churn_result_container = dsl.ContainerOp(
            name="print_prediction",
            image='library/bash:4.4.23', # 'gcr.io/kubeflow-images-public/tensorflow-2.1.0-notebook-gpu:1.0.0'
            pvolumes={data_path: churn_predict_container.pvolume},
            arguments=['cat', f'{data_path}/results.txt']
    )

In [None]:
DATA_PATH = '/mnt' #'/home/jovyan/data/clean_data'
MODEL_PATH='churn_classifier.h5'

In [None]:
pipeline_func = churn_pipeline

experiment_name = 'churn_kubeflow'
run_name = pipeline_func.__name__ + ' run'

arguments = {"data_path":DATA_PATH,
            "model":MODEL_PATH}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)