In [1]:
!pip show kfp

Name: kfp
Version: 2.7.0
Summary: Kubeflow Pipelines SDK
Home-page: https://github.com/kubeflow/pipelines
Author: The Kubeflow Authors
Author-email: 
License: 
Location: /Applications/anaconda3/lib/python3.11/site-packages
Requires: click, docstring-parser, google-api-core, google-auth, google-cloud-storage, kfp-pipeline-spec, kfp-server-api, kubernetes, protobuf, PyYAML, requests-toolbelt, tabulate, urllib3
Required-by: 


In [2]:
import kfp
import kfp.components as comp
import requests
import kfp.dsl as dsl

In [3]:
@dsl.component()
def prepare_data(data_path:str):
    import pandas as pd
    df = pd.read_csv('iris.csv')
    df = df.dropna()
    df.to_csv(f'data/final_df.csv', index=False)
    print("\n ---- data csv is saved to PV location /data/final_df.csv ----")

  return component_factory.create_component_from_func(


In [4]:
@dsl.component
def train_test_split(data_path:str):
    import pandas as pd
    from sklearn.model_selection import train_test_split
    import numpy as np
    print("---- Inside train_test_split component ----")
    final_data = pd.read_csv(f'data/final_df.csv')
    X = final_data.drop(columns='class')
    y = final_data['class']
    X_train,X_test,y_train,y_test = train_test_split(X,y,test_size=0.3,random_state=47)
    np.save(f'data/X_train.npy', X_train)
    np.save(f'data/X_train.npy', X_test)
    np.save(f'data/X_train.npy', y_train)
    np.save(f'data/X_train.npy', y_test)
    print("\n---- X_train ----")
    print("\n")
    print(X_train)
    
    print("\n---- X_test ----")
    print("\n")
    print(X_test)
    
    print("\n---- y_train ----")
    print("\n")
    print(y_train)
    
    print("\n---- y_test ----")
    print("\n")
    print(y_test)

In [5]:
@dsl.component()
def train_basic_classifier(data_path:str):
    from sklearn.linear_model import LogisticRegression
    import numpy as np
    import pandas as pd

    print("---- Inside training_basic_classifier component ----")

    X_train = np.load(f'data/X_train.npy')
    y_train = np.load(f'data/y_train.npy')

    classifier = LogisticRegression(max_iter=500)
    classifier.fit(X_train,y_train)

    import pickle
    with open(f'data/model.pkl','wb') as f:
        pickle.dump(classifier,f)

    print("\n logistic regression classifier is trained on iris data and saved to PV location /data/model.pkl ----")

In [7]:
@dsl.component()
def predict_on_test_data(data_path:str):
    import pandas as pd
    import numpy as np
    import pickle
    print("---- Inside predict_on_test_data component ----")
    with open(f'data/model.pkl','rb') as f:
        logistic_reg_model = pickle.load(f)

    X_test = np.load(f'data/X_test')
    y_pred = logistic_reg_model.predict(X_test)
    np.save(f'data/y_pred.npy', y_pred)

    print("\n---- Predicted classes ----")
    print("\n")
    print(y_pred)

In [32]:
@dsl.component()
def predict_prob_on_test_data(data_path:str):
    import pandas as pd
    import numpy as np
    import pickle
    print("---- Inside predict_prob_on_test_data component ----")
    with open(f'data/model.pkl','rb') as f:
        logistic_reg_model = pickle.load(f)
    X_test = np.load(f'data/X_test.npy',allow_pickle=True)
    y_pred_prob = logistic_reg_model.predict_proba(X_test)
    np.save(f'data/y_pred_prob.npy', y_pred_prob)
    
    print("\n---- Predicted Probabilities ----")
    print("\n")
    print(y_pred_prob)

In [34]:
@dsl.component()
def get_metrics(data_path:str):
    import pandas as pd
    import numpy as np
    from sklearn.metrics import accuracy_score,precision_score,recall_score,log_loss,classification_report
    print("---- Inside get_metrics component ----")
    y_test = np.load(f'data/y_test.npy')
    y_pred = np.load(f'data/y_pred.npy')
    y_pred_prob = np.load(f'data/y_pred_prob.npy')
    acc = accuracy_score(y_pred,y_test)
    prec = precision_score(y_pred,y_test)
    recall = recall_score(y_pred,y_test)
    entropy = log_loss(y_test, y_pred_prob)
    print(classification_report(y_pred,y_test))
    print({'Accuracy':round(acc,2),'precision': round(prec, 2), 'recall': round(recall, 2), 'entropy': round(entropy, 2)})

# Kubeflow pipeline creation work start from here

In [15]:
def create_prepare_data_pipeline(data_path:str):
    return dsl.pipeline_component_from_func(prepare_data)

In [16]:
def create_train_test_split_pipeline(data_path:str):
    return dsl.pipeline_component_from_func(train_test_split)
def create_train_basic_classifier_pipeline(data_path:str):
    return dsl.pipeline_component_from_func(train_basic_classifier)
def create_predict_on_test_data_pipeline(data_path:str):
    return dsl.pipeline_component_from_func(predict_on_test_data)
def create_predict_prob_on_test_data_pipeline(data_path:str):
    return dsl.pipeline_component_from_func(predict_prob_on_test_data)
def create_get_metrics_pipeline(data_path:str):
    return dsl.pipeline_component_from_func(get_metrics)

In [17]:
# Define the pipeline
@dsl.pipeline(
   name='IRIS classifier Kubeflow Demo Pipeline',
   description='A sample pipeline that performs IRIS classifier task'
)
# Define parameters to be fed into pipeline
def iris_classifier_pipeline(data_path: str):
    # Create steps and define their execution order
    prepare_data_task = create_prepare_data_pipeline(data_path)
    train_test_split_task = create_train_test_split_pipeline(data_path).after(prepare_data_task)
    train_basic_classifier_task = create_train_basic_classifier_pipeline(data_path).after(train_test_split_task)
    predict_on_test_data_task = create_predict_on_test_data_pipeline(data_path).after(train_basic_classifier_task)
    predict_prob_on_test_data_task = create_predict_prob_on_test_data_pipeline(data_path).after(train_basic_classifier_task)
    get_metrics_task = create_get_metrics_pipeline(data_path).after(predict_on_test_data_task)
    
    prepare_data_task >> train_test_split_task >> train_basic_classifier_task >> predict_on_test_data_task >> predict_prob_on_test_data_task >> get_metrics_task

    
    
    

AttributeError: module 'kfp.dsl' has no attribute 'pipeline_component_from_func'