In [16]:
import kfp
import kfp.components as comp
from kfp.components import create_component_from_func, InputPath, OutputPath
import requests
import kfp.dsl as dsl



In [2]:
import logging

def prepare_data(output_csv: OutputPath(), csv_url: str) -> None:
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    df = pd.read_csv(csv_url)
    logger.info("Inside prepare data component")
    logger.debug(df.head())
    with open(output_csv, "w") as f:
        df.to_csv(f, index=False)


In [3]:
def train_test_split(input_iris_csv: InputPath(),
                     train_csv: OutputPath(),
                     test_csv: OutputPath()
                    ) -> None:
    import pandas as pd
    from sklearn.model_selection import train_test_split
    print("---- Inside train_test_split component ----")
    with open(input_iris_csv) as f:
        df = pd.read_csv(f)

    train_df, test_df = train_test_split(df, test_size=0.2, stratify=df['class'], random_state=47)
    print("\n train_df \n", train_df)
    print("\n test_df \n", test_df)
    
    with open(train_csv, "w") as f:
        train_df.to_csv(f, index=False)
    with open(test_csv, "w") as f:
        test_df.to_csv(f, index=False)


In [4]:
def training_basic_classifier(train_csv: InputPath(), 
                              trained_model: OutputPath(), 
                              target_column:str,
                             ) -> None:
    import pandas as pd
    from sklearn.linear_model import LogisticRegression
    import pickle

    print("------- Inside training_basic_classifier component -------")

    with open(train_csv) as f:
        train_df = pd.read_csv(f)
    target_column = target_column   

    X_train = train_df.loc[:, train_df.columns != target_column]
    y_train = train_df.loc[:, train_df.columns == target_column]

    classifier = LogisticRegression(max_iter=500)
    classifier.fit(X_train, y_train)

    with open(trained_model, 'wb') as f:
        pickle.dump(classifier, f)

    print("\nLogistic regression classifier is trained on iris data .....")


In [5]:
def predict_on_test_data(trained_model: InputPath(),
                         test_csv: InputPath(),
                         y_pred_csv: OutputPath(),
                         target_column: str) -> None:
    import pandas as pd
    import pickle

    print("---- Inside predict_on_test_data component ----")
    
    with open(test_csv, 'rb') as f:
        test_df = pd.read_csv(f)

    with open(trained_model, 'rb') as f:
        logistic_reg_model = pickle.load(f)
    target_column = target_column

    X_test = test_df.loc[:, test_df.columns != target_column]

    y_pred = logistic_reg_model.predict(X_test)
    y_pred_df = pd.DataFrame(y_pred)
    print(y_pred_df)

    with open(y_pred_csv, 'wb') as f:
        y_pred_df.to_csv(f, index=False)


In [6]:
def get_metrics(trained_model: InputPath(),
               test_csv: InputPath(),
               y_pred_csv: InputPath(),
               target_column: str) -> None:
    import pandas as pd
    import numpy as np
    import pickle
    from sklearn.metrics import accuracy_score, precision_score, recall_score, log_loss
    from sklearn import metrics
    print("---- Inside get_metrics component ----")
    with open(test_csv) as f:
        test_df = pd.read_csv(f)

    with open(trained_model, 'rb') as f:
        logistic_reg_model = pickle.load(f)

    with open(y_pred_csv, 'rb') as f:
        y_pred_df = pd.read_csv(f)
    
    y_pred = y_pred_df.to_numpy()
    y_test = test_df.loc[:, test_df.columns == target_column]

    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred, average='micro')
    recall = recall_score(y_test, y_pred, average='micro')

    print(metrics.classification_report(y_test, y_pred))
    
    print("\nModel Metrics:", {'accuracy': round(acc, 2), 'precision': round(prec, 2), 'recall': round(recall, 2)})


In [7]:
create_step_prepare_data = kfp.components.create_component_from_func(
    func=prepare_data,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0']
)

In [8]:
create_step_train_test_split = kfp.components.create_component_from_func(
    func=train_test_split,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0']
)

In [9]:
create_step_training_basic_classifier = kfp.components.create_component_from_func(
    func=training_basic_classifier,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0']
)

In [10]:
create_step_predict_on_test_data = kfp.components.create_component_from_func(
    func=predict_on_test_data,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0']
)

In [11]:
create_step_get_metrics = kfp.components.create_component_from_func(
    func=get_metrics,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0']
)

In [12]:
# Import necessary libraries and objects from KFP

# Define the pipeline
@dsl.pipeline(
   name='IRIS classifier csv and model passing',
   description='A sample pipeline that performs IRIS classifier task with csv passing between components'
)
# Define parameters to be fed into pipeline
def iris_classifier_pipeline(target_column:str, csv_url:str):
    prepare_data_task = create_step_prepare_data(csv_url=csv_url)  # Pass the data_path parameter
    train_test_split = create_step_train_test_split(prepare_data_task.outputs['output_csv'])
    classifier_training = create_step_training_basic_classifier(train_test_split.outputs['train_csv'],
                                                               target_column=target_column)
    predict_on_test_data = create_step_predict_on_test_data(classifier_training.outputs['trained_model'],
                                                           train_test_split.outputs['test_csv'],
                                                           target_column=target_column)
    log_metrics_task = create_step_get_metrics(classifier_training.outputs['trained_model'],
                                              train_test_split.outputs['test_csv'],
                                              predict_on_test_data.outputs['y_pred_csv'],
                                              target_column=target_column)

    prepare_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    train_test_split.execution_options.caching_strategy.max_cache_staleness = "P0D"
    classifier_training.execution_options.caching_strategy.max_cache_staleness = "P0D"
    predict_on_test_data.execution_options.caching_strategy.max_cache_staleness = "P0D"
    log_metrics_task.execution_options.caching_strategy.max_cache_staleness = "P0D"


In [13]:

kfp.compiler.Compiler().compile(
    pipeline_func=iris_classifier_pipeline,
    package_path='IRIS_Classifier_pipeline2.yaml')


In [14]:
client = kfp.Client()

In [15]:
target_column = 'class'
csv_url = 'https://raw.githubusercontent.com/TripathiAshutosh/dataset/main/iris.csv' 
import datetime
print(datetime.datetime.now().date())


pipeline_func = iris_classifier_pipeline
experiment_name = 'iris_classifier_exp' +"_"+ str(datetime.datetime.now().date())
run_name = pipeline_func.__name__ + ' run'
namespace = "kubeflow"

arguments = {"target_column":target_column, "csv_url":csv_url}

kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)


2023-09-10
