In [None]:
kfp_endpoint = 'http://localhost:8080'

base_image='python:3.8'

In [None]:


import kfp
from kfp.components import InputPath, OutputPath
from kfp.components import func_to_container_op

In [None]:
client = kfp.Client(host=kfp_endpoint)

In [None]:
packages_to_install=["pandas", "scikit-learn", "matplotlib", "numpy", "joblib", "optuna"]
create_component_get = lambda x: func_to_container_op(x, base_image=base_image, packages_to_install = ["pandas", "scikit-learn"])
create_component_train = lambda x: func_to_container_op(x, base_image=base_image, packages_to_install = packages_to_install)

## Creating Data

In [None]:
@create_component_get
def get_noise_date(output_path: OutputPath(str)):
    from random import random
    from sklearn.datasets import load_breast_cancer
    import pandas as pd
    data = load_breast_cancer()
    df = pd.DataFrame(data.data, columns=data.feature_names)
    df_fake = pd.DataFrame([random() for x in range(df.shape[0])], columns=["noise"])
    df_fake["target"] = data.target
    df_fake["id"] = df["id"] = df.index.values

    df_fake.to_csv(output_path, index=False)


@create_component_get
def get_data(output_text_path: OutputPath(str)):
    import pandas as pd
    from sklearn.datasets import load_breast_cancer
    import logging

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    logger.info(f'Reading data from source')

    data = load_breast_cancer()
    df = pd.DataFrame(data.data, columns=data.feature_names)
    df['target'] = data.target
    logger.info(f'Saving processed data to {output_text_path}')
    df["id"] = df.index.values

    df.to_csv(output_text_path, index=False)

In [None]:
@create_component_get
def get_data(output_text_path: OutputPath(str)):
    import pandas as pd
    from sklearn.datasets import load_breast_cancer
    import logging

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    logger.info(f'Reading data from source')

    data = load_breast_cancer()
    df = pd.DataFrame(data.data, columns=data.feature_names)
    df['target'] = data.target
    logger.info(f'Saving processed data to {output_text_path}')
    df["id"] = df.index.values

    df.to_csv(output_text_path, index=False)

# Join Data

In [None]:
@create_component_get
def join_data(data_path: InputPath(), noise_path: InputPath(), output_path: OutputPath(str)):
    """
    Join multiple DataFrames based on the 'id' column while ensuring no duplicated 'id' and 'target' values.
    """
    import pandas as pd
    
    df = pd.read_csv(data_path)

    df_noise = pd.read_csv(noise_path)
    
    
    
    merged_df = pd.merge(df_noise, df, on=['id', 'target'], how='inner')

    merged_df = merged_df.drop('id', axis=1)
    merged_df.to_csv(output_path, index=False)

# Split the Data

In [None]:
@create_component_get
def split_data(
    text_path: InputPath(),
    output_train:OutputPath(str),
    output_val:OutputPath(str),
    output_test:OutputPath(str)
):
    
    import pandas as pd
    from sklearn.model_selection import train_test_split



    df = pd.read_csv(text_path)
    X = df.drop(columns=['target'])
    y = df['target']
    X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
    pd.concat([X_train, y_train], axis=1).to_csv(output_train, index=False)
    pd.concat([X_val, y_val], axis=1).to_csv(output_val, index=False)
    pd.concat([X_test, y_test], axis=1).to_csv(output_test, index=False)

# Preprocess Data

In [None]:
@create_component_get
def preprocess_data(input_train: InputPath(), scaled_path: OutputPath(str), scaler_path: OutputPath(str)):
    import os
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    import joblib

    
    df = pd.read_csv(input_train)
    X = df.drop(columns=['target'])
    y = df['target']
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    df_scaled = pd.concat([pd.DataFrame(X_scaled, columns=X.columns), y], axis=1)
    df_scaled.to_csv(scaled_path, index=False)

    joblib.dump(scaler, scaler_path)

# Train the Model

In [None]:
@create_component_train
def train_model(input_train: InputPath(), input_val: InputPath(), scaler_path:InputPath(), model_path: OutputPath(str), num_of_trial: int = 100):

    import joblib
    import optuna
    import pandas as pd
    from sklearn.pipeline import Pipeline
    from sklearn.linear_model import LogisticRegression
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score
    
    import logging

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    logger.info(f'Reading data from source')

    df_train= pd.read_csv(input_train)
    X_train = df_train.drop(columns=['target'])
    y_train = df_train['target']

    df_val= pd.read_csv(input_val)
    X_val = df_val.drop(columns=['target'])
    y_val = df_val['target']

    logger.info(f'Reading Scaler')

    scaler = joblib.load(scaler_path)
    
    # Scale the data
    X_train_scaled = scaler.transform(X_train)
    X_val_scaled = scaler.transform(X_val)

    # Define the objective function for Optuna
    def objective(trial):
        model_name = trial.suggest_categorical('model', ['LogisticRegression', 'RandomForest'])

        if model_name == 'LogisticRegression':
            C = trial.suggest_float('C', 1e-4, 1e2, log=True)
            l1_ratio = trial.suggest_float('l1_ratio', 0, 1)

            model = LogisticRegression(
                penalty='elasticnet', 
                C=C, 
                solver = 'saga',
                l1_ratio=l1_ratio
            )
        
        else: 
            n_estimators = trial.suggest_int('n_estimators', 10, 100)
            max_depth = trial.suggest_int('max_depth', 2, 8)
            min_samples_split = trial.suggest_int('min_samples_split', 2, 10)
            model = RandomForestClassifier(
                n_estimators=n_estimators,
                max_depth=max_depth,
                min_samples_split=min_samples_split
            )

        model.fit(X_train_scaled, y_train)
        y_pred = model.predict(X_val_scaled)
        accuracy = accuracy_score(y_val, y_pred)

        logger.info("*"*30)
        logger.info(f'{model}')
        logger.info("*"*30)
        logger.info(f"accuracy {accuracy}")
        
        return accuracy

    logger.info(f'Hiperparameter tunning')
    
    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=num_of_trial)

    # Get the best model
    best_trial = study.best_trial
    if best_trial.params['model'] == 'LogisticRegression':
            best_model = LogisticRegression(
                penalty='elasticnet',
                C=best_trial.params['C'],
                solver='saga',
                l1_ratio=best_trial.params['l1_ratio']
            )
    else:
        best_model = RandomForestClassifier(
            n_estimators=best_trial.params['n_estimators'],
            max_depth=best_trial.params['max_depth'],
            min_samples_split=best_trial.params['min_samples_split']
        )

    
    final_pipeline = Pipeline([
        ('scaler', scaler),
        ('model', best_model)
    ])

    # Train the final pipeline on the full training data
    final_pipeline.fit(X_train, y_train)

    # Evaluate the model on the validation data
    y_val_pred = final_pipeline.predict(X_val)
    val_accuracy = accuracy_score(y_val, y_val_pred)

    logging.info(f"Best model parameters: {best_trial.params}")
    logging.info(f"Validation accuracy: {val_accuracy}")

    logging.info(f"Saving model at: {model_path}")

    joblib.dump(final_pipeline, model_path)



# Define the Pipeline

In [None]:
def training_pipeline():
    repeat_lines_task = get_data()
    noise_data = get_noise_date()
    join_task = join_data(repeat_lines_task.output, noise_data.output)
    split_task = split_data(join_task.output)
    preprocess_data_task = preprocess_data(split_task.outputs["output_train"])
    train_model(split_task.outputs["output_train"], split_task.outputs["output_val"], preprocess_data_task.outputs["scaler"])

# Execute a single run of the Pipeline

In [None]:
kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(training_pipeline, arguments={})

# Compile the Pipeline

In [None]:
pipeline_path = 'training_pipeline.yaml'
kfp.compiler.Compiler().compile(
            pipeline_func=training_pipeline,
            package_path=pipeline_path
            
        )

# Upload the Pipeline

In [None]:
pipeline = client.upload_pipeline(
    pipeline_package_path=pipeline_path,
    pipeline_name='Training Pipeline'
)