In [1]:
import kfp
import kfp.components as comp
import requests
import kfp.dsl as dsl

In [2]:
def prepare_data():
    import pandas as pd


    print("---- Inside prepare_data component ----")
    # Load dataset
    df = pd.read_csv("https://raw.githubusercontent.com/TripathiAshutosh/mlflow/main/banking.csv")
    df = df.dropna()
    

    df.to_csv(f'data/final_df.csv', index=False)
    print("\n ---- data csv is saved to PV location /data/final_df.csv ----")
    

In [3]:
def train_test_split():
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    print("---- Inside train_test_split component ----")
    final_data = pd.read_csv(f'data/final_df.csv')
    target_column = 'class'
    X = final_data.loc[:, final_data.columns != 'y']
    y = final_data.loc[:, final_data.columns == 'y']
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3,stratify = y, random_state=47)
    
    np.save(f'data/X_train.npy', X_train)
    np.save(f'data/X_test.npy', X_test)
    np.save(f'data/y_train.npy', y_train)
    np.save(f'data/y_test.npy', y_test)
    
    print("\n---- X_train ----")
    print("\n")
    print(X_train)
    
    print("\n---- X_test ----")
    print("\n")
    print(X_test)
    
    print("\n---- y_train ----")
    print("\n")
    print(y_train)
    
    print("\n---- y_test ----")
    print("\n")
    print(y_test)

In [4]:
def training_basic_classifier():
    import pandas as pd
    import numpy as np
    from sklearn.linear_model import LogisticRegression
    from sklearn.preprocessing import LabelEncoder, OneHotEncoder

    print("---- Inside training_basic_classifier component ----")

    #performing mapping to the static float values for the "job" column
    X_train = pd.DataFrame()
    X_train['age'] = [29, 32, 19, 52, 41]
    X_train['job'] = ['admin.', 'admin.', 'student', 'management', 'blue-collar']
    X_train['marital'] = ['single', 'married', 'single', 'married', 'married']

    def convert_job_to_float(job_string):
        job_mapping = {
            'admin.': 0,
            'student': 1,
            'management': 2,
            'blue-collar': 3
        }
        return job_mapping.get(job_string, np.nan)

    X_train['job'] = X_train['job'].apply(convert_job_to_float)

    # Performing one-hot encoding on the 'marital' column
    marital_encoder = OneHotEncoder()
    X_train_encoded = marital_encoder.fit_transform(X_train['marital'].values.reshape(-1, 1)).toarray()
    X_train_encoded = pd.DataFrame(X_train_encoded, columns=marital_encoder.categories_[0])
    X_train = pd.concat([X_train, X_train_encoded], axis=1)
    X_train = X_train.drop('marital', axis=1)

    print(X_train)

    y_train = pd.DataFrame()
    y_train['target'] = [0, 1, 0, 1, 1]  # Dummy target values for demonstration

    classifier = LogisticRegression(max_iter=500)
    classifier.fit(X_train, y_train.values.ravel())

    import pickle
    with open('data/model.pkl', 'wb') as f:
        pickle.dump(classifier, f)

    print("\nTraining basic classifier on data and saved to PV location /data/model.pkl")


In [5]:
create_step_prepare_data = kfp.components.create_component_from_func(
    func=prepare_data,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0']
)

In [6]:
create_step_train_test_split = kfp.components.create_component_from_func(
    func=train_test_split,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==0.24.2']
)

In [7]:
create_step_training_basic_classifier = kfp.components.create_component_from_func(
    func=training_basic_classifier,
    base_image='python:3.7',
    packages_to_install=['pandas==1.2.4','numpy==1.21.0','scikit-learn==0.24.2']
)

In [8]:
# Define the pipeline
@dsl.pipeline(
   name='Basic MLOPS classifier Kubeflow Demo Pipeline',
   description='A sample pipeline that performs IRIS classifier task'
)
# Define parameters 
def basic_classifier_pipeline(data_path: str):
    vop = dsl.VolumeOp(
    name="t-vol-1",
    resource_name="t-vol-1", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWO)
    
    prepare_data_task = create_step_prepare_data().add_pvolumes({data_path: vop.volume})
    train_test_split = create_step_train_test_split().add_pvolumes({data_path: vop.volume}).after(prepare_data_task)
    classifier_training = create_step_training_basic_classifier().add_pvolumes({data_path: vop.volume}).after(train_test_split)

    
    prepare_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    train_test_split.execution_options.caching_strategy.max_cache_staleness = "P0D"
    classifier_training.execution_options.caching_strategy.max_cache_staleness = "P0D"

    
    
    

In [9]:
kfp.compiler.Compiler().compile(
    pipeline_func=basic_classifier_pipeline,
    package_path='basic_classifier_pipeline_adil.yaml')


In [10]:
client = kfp.Client()


In [11]:
DATA_PATH = '/data'

import datetime
print(datetime.datetime.now().date())


pipeline_func = basic_classifier_pipeline
experiment_name = 'mlops_task_classifier_exp' +"_"+ str(datetime.datetime.now().date())
run_name = pipeline_func.__name__ + ' run'


arguments = {"data_path":DATA_PATH}

kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)

2023-07-10
