In [308]:
# !pip install kfp

In [309]:
pip install kfp[kubernetes] --pre

Note: you may need to restart the kernel to use updated packages.


In [310]:
!pip show kfp

Name: kfp
Version: 2.7.0
Summary: Kubeflow Pipelines SDK
Home-page: https://github.com/kubeflow/pipelines
Author: The Kubeflow Authors
Author-email: 
License: 
Location: c:\users\bhavl\appdata\local\programs\python\python310\lib\site-packages
Requires: click, docstring-parser, google-api-core, google-auth, google-cloud-storage, kfp-pipeline-spec, kfp-server-api, kubernetes, protobuf, PyYAML, requests-toolbelt, tabulate, urllib3
Required-by: kfp-kubernetes


In [311]:
import kfp
import kfp.components as comp
import requests
import kfp.dsl as dsl
from kfp.dsl import pipeline
from kfp import kubernetes

In [312]:
@dsl.component(base_image = 'python:3.10',packages_to_install=['pandas'])
# @dsl.component.use_cache(False)
def prepare_data():
    import os
    import pandas as pd
    # os.mkdir("C:/Users/bhavl/Downloads/IRIS/data")
    # os.makedirs("data", exist_ok=True)
    print("Inside data ")
    df = pd.read_csv("https://raw.githubusercontent.com/TripathiAshutosh/dataset/main/iris.csv")
    df = df.dropna()
    
    df.to_csv(f'data/final_data.csv',index = False)
    print("data saved in csv format")
    
    

In [313]:
@dsl.component(base_image= 'python:3.10',packages_to_install=['pandas','numpy','scikit-learn'])
def train_test_split():

    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    print("seperating data into test train")
    final_data = pd.read_csv(f'data/final_data.csv')
    target_column = 'class'
    X = final_data.loc[:,final_data.columns != target_column]
    y = final_data.loc[:,final_data.columns==target_column]
    X_train,X_test,y_train,y_test = train_test_split(X,y,test_size=0.3,stratify = y,random_state = 47)
    np.save(f'data/X_train.npy',X_train)
    np.save(f'data/X_test.npy',X_test)
    np.save(f'data/y_train.npy',y_train)
    np.save(f'data/y_test.npy',y_test)

    print("training data")
    print("/n")
    print(X_train)

    print("test data")
    print("/n")
    print(X_test)
    

In [314]:
@dsl.component(base_image = 'python:3.10',packages_to_install=['pandas','numpy','scikit-learn'])
def train_basic_classifier():
    import pandas as pd
    import numpy as np
    from sklearn.linear_model import LogisticRegression
    X_train = np.load(f'data/X_train.npy',allow_pickle=True)
    y_train = np.load(f'data/y_train.npy',allow_pickle=True)
    classifier = LogisticRegression(max_iter = 500)
    classifier.fit(X_train,y_train)
    import pickle
    with open(f'data/model.pkl','wb') as f:
        pickle.dump(classifier,f)
    print("LOGISTIC REGRESSION TRAINED")
    
    
    
    

In [315]:
@dsl.component(base_image = 'python:3.10',packages_to_install=['pandas','numpy','scikit-learn'])
def test_on_data():
    import pandas as pd
    import numpy as np
    import pickle
    with open(f'data/model.pkl','rb') as f:
        l_model = pickle.load(f)
    X_test = np.load(f'data/X_test.npy',allow_pickle=True)
    y_pred = l_model.predict(X_test)
    
    
    np.save(f'data/y_pred.npy',y_pred)
    
    print("Prediction Completed")
    print(y_pred)


In [316]:
@dsl.component(base_image = 'python:3.10',packages_to_install=['pandas','numpy','scikit-learn'])
def predict_prob_on_test_data():
    import pandas as pd
    import numpy as np
    import pickle
    print("Inside predict_prob_on_test_data component")
    with open(f'data/model.pkl','rb') as f:
        logistic_reg_model = pickle.load(f)
    X_test = np.load(f'data/X_test.npy',allow_pickle=True)
    y_pred_prob = logistic_reg_model.predict_proba(X_test)
    np.save(f'data/y_pred_prob.npy', y_pred_prob)
    
    print("\nPredicted Probabilities")
    print("\n")
    print(y_pred_prob)

In [317]:
@dsl.component(base_image = 'python:3.10',packages_to_install=['pandas','numpy','scikit-learn'])
def get_metrics():
    import subprocess
    subprocess.run(['pip', 'install', 'pandas','numpy', 'scikit-learn'])
    import pandas as pd
    import numpy as np
    from sklearn.metrics import accuracy_score , precision_score,recall_score,log_loss
    from sklearn import metrics
    y_test = np.load(f'data/y_test.npy',allow_pickle=True)
    y_pred = np.load(f'data/y_pred.npy',allow_pickle=True)
    y_pred_prob = np.load(f'data/y_pred_prob.npy',allow_pickle=True)
    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred,average='micro')
    recall = recall_score(y_test, y_pred,average='micro')
    entropy = log_loss(y_test, y_pred_prob)
    acc_score = accuracy_score(y_test,y_pred)
    log = log_loss(y_test,y_pred_prob)
    print("\n Model Metrics:", {'accuracy': round(acc, 2), 'precision': round(prec, 2), 'recall': round(recall, 2), 'entropy': round(entropy, 2)})

        
    

In [318]:
@pipeline(name='iris-pipeline', description='Pipeline to prepare Iris dataset.')
def iris_pipeline(data_path: str):
    vop = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name_suffix='-my-pvc',
        access_modes=['ReadWriteMany'],
        size='1Gi',
        storage_class_name='standard',
        
    )
    
    prepare_iris_task = prepare_data()
    prepare_iris_task.set_caching_options(False)
    kubernetes.mount_pvc(
        prepare_iris_task,
        pvc_name=vop.outputs['name'],
        mount_path='/data',
    )
    split_data = train_test_split().after(prepare_iris_task)
    split_data.set_caching_options(False)
    kubernetes.mount_pvc(
        split_data,
        pvc_name=vop.outputs['name'],
        mount_path='/data',
    )
    classifier_training = train_basic_classifier().after(split_data)
    classifier_training.set_caching_options(False)
    test = test_on_data().after(classifier_training)
    test.set_caching_options(False)
    proba = predict_prob_on_test_data().after(test)
    proba.set_caching_options(False)
    evaluate = get_metrics().after(proba)
    evaluate.set_caching_options(False)
    
    
    kubernetes.mount_pvc(
        classifier_training,
        pvc_name=vop.outputs['name'],
        mount_path='/data',
    )
    kubernetes.mount_pvc(
        test,
        pvc_name=vop.outputs['name'],
        mount_path='/data',
    )
    kubernetes.mount_pvc(
        proba,
        pvc_name=vop.outputs['name'],
        mount_path='/data',
    )
    kubernetes.mount_pvc(
        evaluate,
        pvc_name=vop.outputs['name'],
        mount_path='/data',
    )
    # prepare_iris_task.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    # split_data.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    # classifier_training.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    # test.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    # proba.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    # evaluate.execution_options.caching_strategy.max_cache_staleness = 'P0D'

In [319]:
# pvc1 = kubernetes.CreatePVC(
#         # can also use pvc_name instead of pvc_nstrategy.max_cache_staleness = 'P0D'
#     evaluate.execution_options.caching_ame_suffix to use a pre-existing PVC
#         pvc_name_suffix='-my-pvc',
#         access_modes=['ReadWriteOnce'],
#         size='5Gi',
#         storage_class_name='standard',
#     )

In [320]:
kfp.compiler.Compiler().compile(
    pipeline_func=iris_pipeline,
    package_path='./iris_pipeline.yaml'
)