# Importing Libraries

In [86]:
import kfp
from kfp import dsl
import kfp.components as components

from kfp import components, dsl, Client


# Preparing Datasets

In [87]:
def prepare_dataset():
    from sklearn import datasets
    import pandas as pd
    print('Preparing Datasets')
    iris = datasets.load_iris()
    X = pd.DataFrame(iris.data)
    X.columns =  ['Sepal_Length','Sepal_Width','Petal_Length','Petal_Width'] 
    y = pd.DataFrame(iris.target)
    y.columns = ['Targets']
    saved_folder = 'assets/'
    x_saved_folder = 'assets/x_iris.csv'
    y_saved_folder = 'assets/y_iris.csv'
    X.to_csv(x_saved_folder, index=False)
    y.to_csv(y_saved_folder, index=False)
    
    print(f'Data saved succesfully onto {saved_folder}')


# Train Test Split

In [88]:
def train_test_split():
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    print('Train test split')
    x_df = pd.read_csv('assets/x_iris.csv')
    y_df = pd.read_csv('assets/y_iris.csv')
    
    x_train, x_test, y_train, y_test = train_test_split(x_df, y_df, test_size=0.2, stratify=y_df, random_state=42)
    y_train = np.array(y_train).reshape(-1,)
    y_test = np.array(y_test).reshape(-1,)
    np.save('assets/x_train.npy', x_train)
    np.save('assets/x_test.npy', x_test)
    np.save('assets/y_train.npy', y_train)
    np.save('assets/y_test.npy', y_test)
    
    print('X and Y data are saved')
    

# Training Classifier

In [89]:
def training_basic_classifier():
    import pandas as pd
    import numpy as np
    from sklearn.ensemble import RandomForestClassifier
    
    print('Training Classifier')
    
    x_train = np.load('assets/x_train.npy', allow_pickle=True)
    y_train = np.load('assets/y_train.npy', allow_pickle=True)
    
    classifier = RandomForestClassifier()
    classifier.fit(x_train, y_train)
    
    import pickle
    
    with open('assets/rfc.pkl', 'wb') as f:
        pickle.dump(classifier, f)
    
    print('Random Forest Classifier is trained and the model is saved')

# Prediction

In [90]:
def predict_on_test_data():
    import pandas as pd
    import numpy as np
    import pickle
    print('Predicting outcome')
    with open('assets/rfc.pkl', 'rb') as f:
        rfc = pickle.load(f)
        
    x_test = np.load('assets/x_test.npy', allow_pickle=True)
    y_pred = rfc.predict(x_test)
    
    np.save('assets/y_pred.npy', y_pred)
    
    print('Y predicted value has been saved')
    

# Predict Probabilities

In [91]:
def predict_proba():
    import pandas as pd
    import numpy as np
    import pickle
    print('Predicting Probabilities')
    with open('assets/rfc.pkl', 'rb') as f:
        rfc = pickle.load(f)
        
    x_test = np.load('assets/x_test.npy', allow_pickle=True)
    y_pred_proba = rfc.predict_proba(x_test)
    np.save('assets/y_pred_proba.npy', y_pred_proba)
    print('Predicted Probabilitiy')

# Get Metrics

In [92]:
def get_metrics():
    import pandas as pd
    import numpy as np
    from sklearn.metrics import accuracy_score, precision_score, recall_score, log_loss
    from sklearn import metrics
    
    
    print('Metrics')
    y_test = np.load('assets/y_test.npy', allow_pickle=True)
    y_pred = np.load('assets/y_pred.npy', allow_pickle=True)
    y_pred_proba = np.load('assets/y_pred_proba.npy', allow_pickle=True)
    
    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred, average='micro')
    rec = recall_score(y_test, y_pred, average='micro')
    entropy = log_loss(y_test, y_pred_proba)
    
    print(f'Model Metrics: \nAccuracy: {round(acc, 2)}\nPrecision: {round(prec, 2)}\nRecall: {round(rec, 2)}\nEntropy: {round(entropy, 2)}')
    

In [93]:
# prepare_dataset()
# train_test_split()
# training_basic_classifier()
# predict_on_test_data()
# predict_proba()
# get_metrics()

# Kubeflow pipeline creation

In [94]:
create_step_prepare_data = kfp.components.create_component_from_func(
    func=prepare_dataset,
    base_image='python:3.10',
    packages_to_install=['pandas', 'numpy', 'scikit-learn']
)

In [95]:
create_step_train_test_split = kfp.components.create_component_from_func(
    func=train_test_split,
    base_image='python:3.10',
    packages_to_install=['pandas', 'numpy', 'scikit-learn']
)

In [96]:
create_step_training_basic_classifier = kfp.components.create_component_from_func(
    func=training_basic_classifier,
    base_image='python:3.10',
    packages_to_install=['pandas', 'numpy', 'scikit-learn']
)

In [97]:
create_step_predict_on_test_data = kfp.components.create_component_from_func(
    func=predict_on_test_data,
    base_image='python:3.10',
    packages_to_install=['pandas', 'numpy', 'scikit-learn']
)

In [98]:
create_step_predict_prob_on_test_data = kfp.components.create_component_from_func(
    func=predict_proba,
    base_image='python:3.10',
    packages_to_install=['pandas', 'numpy', 'scikit-learn']
)

In [99]:
create_step_get_metrics = kfp.components.create_component_from_func(
    func=get_metrics,
    base_image='python:3.10',
    packages_to_install=['pandas', 'numpy', 'scikit-learn']
)

# Pipeline

In [100]:
@dsl.pipeline(
    name='IRIS Classification using Random Forest Classifier',
    description='A Pipeline to perform classification task'
)

def iris_classifier_pipeline(data_path: str):
    vop = dsl.VolumeOp(name='t-vol', resource_name='t-vol', size='1Gi', modes=dsl.VOLUME_MODE_RWO)
    
    prepare_data = create_step_prepare_data().add_pvolumes({data_path: vop.volume})
    train_test_split = create_step_train_test_split().add_pvolumes({data_path: vop.volume}).after(prepare_data)
    classifier = create_step_training_basic_classifier().add_pvolumes({data_path: vop.volume}).after(train_test_split)
    log_predicted_class = create_step_predict_on_test_data().add_pvolumes({data_path: vop.volume}).after(classifier)
    log_predicted_probabilities = create_step_predict_prob_on_test_data().add_pvolumes({data_path: vop.volume}).after(log_predicted_class)
    log_metrics = create_step_get_metrics().add_pvolumes({data_path: vop.volumes}).after(log_predicted_probabilities)
    
    prepare_data.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    train_test_split.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    classifier.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    log_predicted_class.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    log_predicted_probabilities.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    log_metrics.execution_options.caching_strategy.max_cache_staleness = 'P0D'

In [101]:
kfp.compiler.Compiler().compile(
    pipeline_func=iris_classifier_pipeline,
    package_path='IRIS_classifier_pipeline.yaml'
)

# Client

In [102]:
client = kfp.Client()

In [103]:
DATA = '/assets'

import datetime
print(datetime.datetime.now().date())

pipeline_func = iris_classifier_pipeline
experiment_name = 'iris_classifier_exp' + "_" + str(datetime.datetime.now().date())
run_name = pipeline_func.__name__+' run'
namespace = "kubeflow"

arguments = {"data_path": DATA}

kfp.compiler.Compiler().compile(pipeline_func, '{}.zip'.format(experiment_name))

run_result = client.create_run_from_pipeline_func(
    pipeline_func,
    experiment_name=experiment_name,
    run_name=run_name,
    arguments=arguments
)

2023-06-11
