# Import stuff

In [None]:
import kfp
import kfp.dsl as dsl

# Create the functions for the pipeline steps

In [None]:
def prepare_data():
    import pandas as pd

    print("----- Inside prepare_data component -----")

    # Load dataset
    df = pd.read_csv("https://raw.githubusercontent.com/prasertcbs/basic-dataset/master/MNIST.csv")
    df = df.dropna()
    df.to_csv(f'data/final_df.csv', index=False)

    print("\n ----- data csv is saved to PV location /data/final_df.csv -----")

In [None]:
def train_test_split():
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split

    print("----- Inside train_test_split component -----")

    final_data = pd.read_csv(f'data/final_df.csv')
    target_column = 'target'
    X = final_data.loc[:, final_data.columns != target_column]
    y = final_data.loc[:, final_data.columns == target_column]

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, stratify=y, random_state=42)

    np.save(f'data/X_train.npy', X_train)
    np.save(f'data/X_test.npy', X_test)
    np.save(f'data/y_train.npy', y_train)
    np.save(f'data/y_test.npy', y_test)

    print("\n----- X_train -----\n")
    print(X_train)
    
    print("\n----- X_test -----\n")
    print(X_test)

    print("\n----- y_train -----\n")
    print(y_train)

    print("\n----- y_test -----\n")
    print(y_test)

In [None]:
def train_model():
    import pandas as pd
    import numpy as np
    from sklearn.neighbors import KNeighborsClassifier
 
    print("---- Inside training_basic_classifier component ----")

    X_train = np.load(f'data/X_train.npy',allow_pickle=True)
    y_train = np.load(f'data/y_train.npy',allow_pickle=True)

    neigh = KNeighborsClassifier(n_neighbors = 10)
    neigh.fit(X_train, y_train)

    import pickle
    with open(f'data/model.pkl', 'wb') as f:
        pickle.dump(neigh, f)

    print("\n K nearest neighbors is trained on MNIST and saved to PV location /data/model.pkl")

In [9]:
def predict_on_test_data():
    import pandas as pd
    import numpy as np
    import pickle

    print("----- Inside predict_on_test_data component -----")

    with open(f'data/model.pkl', 'rb') as f:
        k_nearest_neigh = pickle.load(f)
    X_test = np.load(f'data/X_test.npy',allow_pickle=True)
    y_pred = k_nearest_neigh.predict(X_test)
    np.save(f'data/y_pred.npy', y_pred)

    print("\n----- Predicted classes -----\n")
    print(y_pred)

In [None]:
def predict_prob_on_test_data():
    import pandas as pd
    import numpy as np
    import pickle

    print("----- Inside predict_prob_on_test_data component -----")

    with open(f'data/model.pkl','rb') as f:
        k_nearest_neigh = pickle.load(f)
    X_test = np.load(f'data/X_test.npy',allow_pickle=True)
    y_pred_prob = k_nearest_neigh.predict_proba(X_test)
    np.save(f'data/y_pred_prob.npy', y_pred_prob)
    
    print("\n----- Predicted Probabilities -----\n")
    print(y_pred_prob)

In [None]:
def get_metrics():
    import pandas as pd
    import numpy as np
    from sklearn.metrics import accuracy_score,precision_score,recall_score,log_loss
    from sklearn import metrics

    print("----- Inside get_metrics component -----")

    y_test = np.load(f'data/y_test.npy',allow_pickle=True)
    y_pred = np.load(f'data/y_pred.npy',allow_pickle=True)
    y_pred_prob = np.load(f'data/y_pred_prob.npy',allow_pickle=True)
    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred,average='micro')
    recall = recall_score(y_test, y_pred,average='micro')
    entropy = log_loss(y_test, y_pred_prob)
    
    y_test = np.load(f'data/y_test.npy',allow_pickle=True)
    y_pred = np.load(f'data/y_pred.npy',allow_pickle=True)
    print(metrics.classification_report(y_test, y_pred))
    
    print("\n Model Metrics:", {'accuracy': round(acc, 2), 'precision': round(prec, 2), 'recall': round(recall, 2), 'entropy': round(entropy, 2)})

# Kubeflow pipeline creation

In [None]:
create_step_prepare_data = kfp.components.create_component_from_func(
    func=prepare_data,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0']
)

In [None]:
create_step_train_test_split = kfp.components.create_component_from_func(
    func=train_test_split,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==0.24.2']
)

In [None]:
create_step_train_model = kfp.components.create_component_from_func(
    func=train_model,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==0.24.2']
)

In [None]:
create_step_predict_on_test_data = kfp.components.create_component_from_func(
    func=predict_on_test_data,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==0.24.2']
)

In [None]:
create_step_predict_prob_on_test_data = kfp.components.create_component_from_func(
    func=predict_prob_on_test_data,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==0.24.2']
)

In [None]:
create_step_get_metrics = kfp.components.create_component_from_func(
    func=get_metrics,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==0.24.2']
)

# Define the pipeline

In [None]:
@dsl.pipeline(
    name='Testing MNIST K-Nearest-Neighbors with the pipeline in kubeflow',
    description='Test pipeline'
)
# Define the parameters to be fed into the pipeline
def mnist_test_pipeline(data_path: str):
    vop = dsl.VolumeOp(
        name="mnist-test-volume",
        resource_name="mnist-test_volume",
        size="2Gi",
        modes=dsl.VOLUME_MODE_RWO)
    
    prepare_data_task = create_step_prepare_data().add_pvolumes({data_path: vop.volume})
    train_test_split = create_step_train_test_split().add_pvolumes({data_path: vop.volume}).after(prepare_data_task)
    classifier_training = create_step_train_model().add_pvolumes({data_path: vop.volume}).after(train_test_split)
    log_predicted_data = create_step_predict_on_test_data().add_pvolumes({data_path: vop.volume}).after(classifier_training)
    log_predicted_probabilities = create_step_predict_prob_on_test_data().add_pvolumes({data_path: vop.volume}).after(log_predicted_data)
    logs_metrics_data = create_step_get_metrics().add_pvolumes({data_path: vop.volume}).after(log_predicted_probabilities)

    prepare_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    train_test_split.execution_options.caching_strategy.max_cache_staleness = "P0D"
    classifier_training.execution_options.caching_strategy.max_cache_staleness = "P0D"
    log_predicted_data.execution_options.caching_strategy.max_cache_staleness = "P0D"
    log_predicted_probabilities.execution_options.caching_strategy.max_cache_staleness = "P0D"
    logs_metrics_data.execution_options.caching_strategy.max_cache_staleness = "P0D"

In [None]:
kfp.compiler.Compiler().compile(
    pipeline_func=mnist_test_pipeline,
    package_path='mnist_test_package.yaml'
)

In [None]:
client = kfp.Client()

In [None]:
DATA_PATH = '/data'

import datetime
print(datetime.datetime.now().date())

pipeline_func = mnist_test_pipeline
experiment_name = 'mnist_k_nearest_neighbors_test_experiment' + '_' + str(datetime.datetime.now().date())
run_name = pipeline_func.__name__ + ' run'
namespace = "kubeflow-user-example-com"

arguments = {"data_path":DATA_PATH}

kfp.compiler.Compiler().compile(pipeline_func,
    '{}.zip'.format(experiment_name))

run_result = client.create_run_from_pipeline_func(pipeline_func,
                                                  experiment_name=experiment_name,
                                                  run_name=run_name,
                                                  arguments=arguments)