# Kubeflow pipeline implementation

## You can find here the implementation of simple Kubeflow pipeline on a classification dataset 

### Make sure you have installed kubeflow and kfp package with all required dependecies

In [239]:
#Importing required libraries for the kubeflow pipeline
import kfp
import kfp.components as comp
import requests
import kfp.dsl as dsl

### Checking the version of the installation of kfp

In [240]:
#Details of kfp library
!pip show kfp

Name: kfp
Version: 1.8.21
Summary: KubeFlow Pipelines SDK
Home-page: https://github.com/kubeflow/pipelines
Author: The Kubeflow Authors
Author-email: 
License: 

## Defining the required components for the pipeline

### Begin with the components that are necessary for the pipeline as per the requirements
#### First step is of loading the dataset, in this example we read from a given link and save it locally, here a few preprocessing steps can be added, we can also define a seperate preprocessing component as needed.

In [241]:
#Importing the required dataset

def prepare_data():
    import pandas as pd
    
    import os

    cwd = os.getcwd()
    print("Current working directory:", cwd)


    print("---- Inside prepare_data component ----")
    # Load dataset
    dataset = pd.read_csv("https://raw.githubusercontent.com/Varbcgi/Diabetes_prediction/main/diabetes_binary.csv")
    dataset = dataset.dropna()
    dataset.to_csv(f'data/final_df.csv', index=False)
    print("\n ---- data csv is saved to PV location /data/final_df.csv ----")
    return dataset


Location: c:\users\varun.bhoj\anaconda3\lib\site-packages
Requires: absl-py, click, cloudpickle, Deprecated, docstring-parser, fire, google-api-core, google-api-python-client, google-auth, google-cloud-storage, jsonschema, kfp-pipeline-spec, kfp-server-api, kubernetes, protobuf, pydantic, PyYAML, requests-toolbelt, strip-hints, tabulate, typer, uritemplate, urllib3
Required-by: 


### Splitting component
#### The dependent and independent variables are split and saved as numpy files, these induvidual files can further be used by all the induvidual components

In [242]:
def train_test_split():
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    print("---- Inside train_test_split component ----")
    dataset = pd.read_csv(f'data/final_df.csv')
    #Seperating dependent variable from independent variable
    y_diabetes = dataset['Diabetes_binary'].values
    X = dataset.drop(['Diabetes_binary'], axis=1)


    #Splitting dataset into test and training sets
    from sklearn.model_selection import train_test_split

    X_train, X_test, y_train, y_test = train_test_split(X, y_diabetes, test_size = 0.30, random_state = 44, stratify = y_diabetes)

    np.save(f'data/X_train.npy', X_train)
    np.save(f'data/X_test.npy', X_test)
    np.save(f'data/y_train.npy', y_train)
    np.save(f'data/y_test.npy', y_test)
    return X_train, X_test, y_train, y_test

### Training component
#### The training_basic_classifier() function is responsible for training a basic classifier model. It takes care of the training process and returns the trained model.

In [243]:
def training_basic_classifier():
    import pandas as pd
    
    import numpy as np
    from sklearn.ensemble import RandomForestClassifier

    print("---- Inside training_basic_classifier component ----")

    X_train = np.load(f'data/X_train.npy',allow_pickle=True)
    y_train = np.load(f'data/y_train.npy',allow_pickle=True)

    classifier = RandomForestClassifier(n_estimators=115, max_features=10 , max_depth=10)
    classifier.fit(X_train,y_train)
    import pickle
    with open(f'data/model.pkl', 'wb') as f:
        pickle.dump(classifier, f)

    print("\n ----RandomForestClassifier regression classifier is trained on Diabetes data and saved to PV location /data/model.pkl ----")
    

### Predict component
#### Here you can define a function which takes in the pickle file and predicts on test data

In [244]:
def predict_on_test_data():
    import pandas as pd
    import numpy as np
    import pickle
    print("---- Inside predict_on_test_data component ----")
    with open(f'data/model.pkl','rb') as f:
        rfc = pickle.load(f)
    X_test = np.load(f'data/X_test.npy',allow_pickle=True)
    y_pred = rfc.predict(X_test)
    np.save(f'data/y_pred.npy', y_pred)

    print("\n---- Predicted classes ----")
    print("\n")
    print(y_pred)

### Prediction probabilites
#### This function generates  probabilities of the target classes for a given set of input samples. The resulting probabilities are stored in the probabilities array, which can then be used for further analysis or decision-making.

In [245]:
def predict_prob_on_test_data():
    import pandas as pd
    import numpy as np
    import pickle
    print("---- Inside predict_prob_on_test_data component ----")
    with open(f'data/model.pkl','rb') as f:
        logistic_reg_model = pickle.load(f)
    X_test = np.load(f'data/X_test.npy',allow_pickle=True)
    y_pred_prob = logistic_reg_model.predict_proba(X_test)
    np.save(f'data/y_pred_prob.npy', y_pred_prob)
    
    print("\n---- Predicted Probabilities ----")
    print("\n")
    print(y_pred_prob)

### Metrics component
#### The get_metrics() function is used to retrieve evaluation metrics for a trained model. It calculates and returns a set of metrics that provide insights into the model's performance and accuracy.

In [2]:
def get_metrics():
    import pandas as pd
    import numpy as np
    from sklearn.metrics import accuracy_score,precision_score,recall_score,log_loss
    from sklearn import metrics
    print("---- Inside get_metrics component ----")
    y_test = np.load(f'data/y_test.npy',allow_pickle=True)
    y_pred = np.load(f'data/y_pred.npy',allow_pickle=True)
    y_pred_prob = np.load(f'data/y_pred_prob.npy',allow_pickle=True)
    
    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred,average='micro')
    recall = recall_score(y_test, y_pred,average='micro')
    entropy = log_loss(y_test, y_pred_prob)
    
    y_test = np.load(f'data/y_test.npy',allow_pickle=True)
    y_pred = np.load(f'data/y_pred.npy',allow_pickle=True)
    print(metrics.classification_report(y_test, y_pred))
    
    print("\n Model Metrics:", {'accuracy': round(acc, 2), 'precision': round(prec, 2), 'recall': round(recall, 2), 'entropy': round(entropy, 2)})

## Creation of components

### Components of pipeline
#### Components are created using the create_component_from_func method from the kfp.components module. It wraps the prepare_data function and specifies the base image as python:3.9 and the packages to install as per the requirement of that particular component

In [247]:
create_step_prepare_data = kfp.components.create_component_from_func(
    func=prepare_data,
    base_image='python:3.9',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0']
)

In [248]:
create_step_train_test_split = kfp.components.create_component_from_func(
    func=train_test_split,
    base_image='python:3.9',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==1.2.1']
)

In [249]:
create_step_training_basic_classifier = kfp.components.create_component_from_func(
    func=training_basic_classifier,
    base_image='python:3.9',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==1.2.1']
)

In [250]:
create_step_predict_on_test_data = kfp.components.create_component_from_func(
        func=predict_on_test_data,
    base_image='python:3.9',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==1.2.1']
)

In [251]:
create_step_predict_prob_on_test_data = kfp.components.create_component_from_func(
        func=predict_prob_on_test_data,
    base_image='python:3.9',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==1.2.1']
)

In [252]:
create_step_get_metrics = kfp.components.create_component_from_func(
    func=get_metrics,
    base_image='python:3.9',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==1.2.1']
)

## Defining the pipeline

### Now we define a pipeline as per our requirements
#### In this example we have VolumeOP, The VolumeOp is typically used to define and manage persistent storage resources within a Kubeflow Pipeline. In this specific case, it is creating a volume with a specific size and access mode, which can be used by subsequent steps in the pipeline to store or access data.
#### size="1Gi specifies the size of the requested storage for the volume. In this case, it is set to 1 gigabyte (1Gi). 
#### modes=dsl.VOLUME_MODE_RWO sets the access mode for the volume. In this case, it is set to ReadWriteOnce (RWO), which means the volume can be mounted as read-write by a single node.

In [253]:
# Define the pipeline
@dsl.pipeline(
   name='Diabetes-classifier',
   description='A sample pipeline that performs diabetes classifier task'
)
# Define parameters to be fed into pipeline
def diabetes_classifier_pipeline(data_path: str):
    vop = dsl.VolumeOp(
    name="t-vol",
    resource_name="t-vol", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWO)
    
    prepare_data_task = create_step_prepare_data().add_pvolumes({data_path: vop.volume})
    train_test_split = create_step_train_test_split().add_pvolumes({data_path: vop.volume}).after(prepare_data_task)
    classifier_training = create_step_training_basic_classifier().add_pvolumes({data_path: vop.volume}).after(train_test_split)
    log_predicted_class = create_step_predict_on_test_data().add_pvolumes({data_path: vop.volume}).after(classifier_training)
    log_predicted_probabilities = create_step_predict_prob_on_test_data().add_pvolumes({data_path: vop.volume}).after(log_predicted_class)
    log_metrics_task = create_step_get_metrics().add_pvolumes({data_path: vop.volume}).after(log_predicted_probabilities)

    

## Compilation

### The kfp.compiler.Compiler().compile() 
#### This function is used to compile a Kubeflow Pipeline defined as a Python function into a YAML representation that can be executed by a Kubeflow Pipelines environment.

In [254]:

kfp.compiler.Compiler().compile(
    pipeline_func=diabetes_classifier_pipeline,
    package_path='diabetes_classifier_pipeline.yaml')


#### The kfp.Client() class is used to create a client object that interacts with a Kubeflow Pipelines deployment. The client provides methods to interact with the Kubeflow Pipelines service, such as submitting and managing pipeline runs, retrieving pipeline metadata, and accessing experiment information.

In [255]:
client = kfp.Client()
#session_cookie = "MTY2MDY0Mjg0OXxOd3dBTkRSVE5FeElTMEZDVDFVeU5EZE1SMHhUVHpRMU5FcFpNMWRNVWpaTFVrOHlXRFJOVlRReVVFNUxOazFZVEVWQ05FUkZTRUU9fM7IcyOyK49OM0dMDjRJR85gqDksj-YOOLsagNs-_-KR"
# HOST = "http://localhost:8080/"
# namespace = "kubeflow"
# client = kfp.Client(
#     host=f"{HOST}/pipeline",
#     #cookies=f"authservice_session={session_cookie}",
#     namespace=namespace,
# )

In [256]:
DATA_PATH = '/data'

import datetime
print(datetime.datetime.now().date())


pipeline_func = diabetes_classifier_pipeline
experiment_name = 'diabetes_classifier_exp_v5' +"_"+ str(datetime.datetime.now().date())
run_name = pipeline_func.__name__ + ' run'
namespace = "kubeflow"

arguments = {"data_path":DATA_PATH}

kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)

2023-05-16
