In [None]:
# 
# Creating a ML pipeline function for machine failure dataset
#
import kfp
from kfp import dsl
import kfp.components as components
from typing import NamedTuple
from datetime import datetime


def get_data_batch() -> NamedTuple('Output', [('mlpipeline_ui_metadata', 'UI_metadata')]):
    """
    Function to get dataset and load modified data to minio bucket
    """
    print("getting data")
    from minio import Minio
    import pandas as pd
    import numpy as np
    import json
    from sklearn.model_selection import train_test_split
    
    ## get data from minio
    minio_client = Minio(
       "<minio_ep>",
        access_key="<minio_accK>",
        secret_key="<minio_secK>",
        secure=False
    )
    minio_bucket = "kubeflow"

    minio_client.fget_object(minio_bucket,f'datasets/machine_failure/machine_failure_dataset.csv',"/tmp/machine_failure_dataset.csv")
    

    ## load data
    df = pd.read_csv("/tmp/machine_failure_dataset.csv")
    df_modify = pd.get_dummies(df, columns=['Machine_Type'], drop_first=True, dtype=int)
    cols = ['Temperature', 'Vibration', 'Power_Usage', 'Humidity',
       'Machine_Type_Lathe', 'Machine_Type_Mill', 'Failure_Risk']
    df_modify_reorder = df_modify[cols]

    ## split data

    train_dataset, test_dataset = train_test_split(df_modify_reorder, test_size=0.2, random_state=42)
    train_dataset.to_csv("/tmp/train_dataset.csv", index=False)
    test_dataset.to_csv("/tmp/test_dataset.csv", index=False)

    # save to dataset file, store in Minio
    minio_client.fput_object(minio_bucket,f"datasets/machine_failure/train_dataset.csv","/tmp/train_dataset.csv")
    minio_client.fput_object(minio_bucket,f"datasets/machine_failure/test_dataset.csv","/tmp/test_dataset.csv")

    

    # evidently ai
    from evidently import ColumnMapping
    from evidently.report import Report
    from evidently.metric_preset import  DataQualityPreset  
    from collections import namedtuple

    column_mapping = ColumnMapping()
    column_mapping.target = 'Failure_Risk'
    column_mapping.numerical_features = ['Temperature', 'Vibration', 'Power_Usage', 'Humidity','Machine_Type_Lathe', 'Machine_Type_Mill']

    report = Report(metrics=[DataQualityPreset()])
    report.run(reference_data=None, current_data=df_modify_reorder, column_mapping=column_mapping)
    # report.save_html("/tmp/report.html")

    metadata = {
        'outputs' : [{
        'type': 'web-app',
        'storage': 'inline',
        'source': report.get_html(),
        }]
    }
    output = namedtuple('output', ['mlpipeline_ui_metadata'])
    return output(json.dumps(metadata))

def model_building(
     rand_iter, rand_cv
) -> NamedTuple('Output', [('mlpipeline_ui_metadata', 'UI_metadata'),('mlpipeline_metrics', 'Metrics')]):
    """
    Build the model with Keras API
    Export model parameters
    """
    from sklearn.model_selection import RandomizedSearchCV
    from scipy.stats import uniform, randint
    from xgboost import XGBClassifier
    from minio import Minio
    import numpy as np
    import pandas as pd
    import json
    
    ## get data from minio
    minio_client = Minio(
       "<minio_ep>",
        access_key="<minio_accK>",
        secret_key="<minio_secK>",
        secure=False
    )
    minio_bucket = "kubeflow"
    minio_client.fget_object(minio_bucket,f'datasets/machine_failure/train_dataset.csv',"/tmp/train_dataset.csv")
    minio_client.fget_object(minio_bucket,f'datasets/machine_failure/test_dataset.csv',"/tmp/test_dataset.csv")
    
    train_dataset = pd.read_csv("/tmp/train_dataset.csv")
    train_input = train_dataset.drop(columns=['Failure_Risk'], axis=1)
    train_tgt = train_dataset['Failure_Risk']
    
    test_dataset = pd.read_csv("/tmp/test_dataset.csv")
    test_input = test_dataset.drop(columns=['Failure_Risk'], axis=1)
    test_tgt = test_dataset['Failure_Risk']
    
    model = XGBClassifier(random_state=42)
    rand_grid = {
        'n_estimators': randint(50, 500),
        'max_depth': randint(5, 100),
        'learning_rate':  uniform(0.01, 0.29),
        'colsample_bytree': uniform(0.1, 0.9)
    }

    xg_random = RandomizedSearchCV(
        estimator=model, 
        param_distributions=rand_grid,
        n_iter=int(rand_iter),
        cv=int(rand_cv),
        verbose=1,
        random_state=42,
        n_jobs=-1
    )
    xg_random.fit(train_input, train_tgt)

    test_pred = xg_random.best_estimator_.predict(test_input)

    
    from sklearn.metrics import accuracy_score, classification_report,confusion_matrix
    import matplotlib.pyplot as plt
    #show model summary - how it looks
    test_accuracy = accuracy_score(test_tgt, test_pred)
    test_report = classification_report(test_tgt, test_pred)
    cm = confusion_matrix(test_tgt, test_pred)

    
    # Confusion Matrix
    vocab = list(np.unique(test_tgt))
    data = []
    for target_index, target_row in enumerate(cm):
        for predicted_index, count in enumerate(target_row):
            data.append((vocab[target_index], vocab[predicted_index], count))

    df_cm = pd.DataFrame(data, columns=['target', 'predicted', 'count'])
    cm_csv = df_cm.to_csv(header=False, index=False)
    
    metadata = {
        "outputs": [
            {
                "type": "confusion_matrix",
                "format": "csv",
                "schema": [
                    {'name': 'target', 'type': 'CATEGORY'},
                    {'name': 'predicted', 'type': 'CATEGORY'},
                    {'name': 'count', 'type': 'NUMBER'},
                  ],
                "target_col" : "actual",
                "predicted_col" : "predicted",
                "source": cm_csv,
                "storage": "inline",
                "labels": [0,1]
            },
            {
                'storage': 'inline',
                'source': '''# Model Overview
## Model Summary

```
{}
```

'''.format(test_report),
                'type': 'markdown',
            }
        ]
    }
    
    metrics = {
      'metrics': [{
          'name': 'accuracy',
          'numberValue':  float(test_accuracy),
          'format' : "PERCENTAGE"
        }]}
    
    ### Save model to minI
    xg_random.best_estimator_.save_model("/tmp/model.bst")
    minio_client.fput_object(minio_bucket,f"models/machine_failure/model.bst", "/tmp/model.bst")

    from collections import namedtuple
    output = namedtuple('output', ['mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return output(json.dumps(metadata),json.dumps(metrics))

comp_get_data_batch = components.create_component_from_func(get_data_batch,base_image="python:3.10.0",
                                                            packages_to_install=['scikit-learn','minio','pandas','evidently'])
comp_model_building = components.create_component_from_func(model_building,base_image="kubeflownotebookswg/jupyter-tensorflow-full:v1.7.0",
                                                            packages_to_install=['xgboost==1.6.0'])

@dsl.pipeline(
    name='machine-failure-pipeline',
    description='example pipeline for machine failure dataset'
)
def output_test( rand_iter, rand_cv):
    
    now = datetime.now()
    v = now.strftime("%Y%m%d%H%M%S")
    minio_bucket = "kubeflow"

    step1 = comp_get_data_batch()

    
    step2 = comp_model_building(rand_iter, rand_cv)
    step2.after(step1)
    seldon_deployment = {
        "apiVersion": "machinelearning.seldon.io/v1",
        "kind": "SeldonDeployment",
        "metadata": {
            "name": f"machine-failure-{v}",
            "namespace": "kubeflow-user-example-com"
        },
        "spec": {
            "protocol": "seldon",
            "predictors": [
                {
                    "name": "machine-failure-predictor",
                    "replicas": 1,
                    "graph": {
                        "name": "classifier",
                        "implementation": "XGBOOST_SERVER",
                        "modelUri": f"s3://{minio_bucket}/models/machine_failure/",
                        "envSecretRefName": "seldon-init-container-secret"
                    }
                }
            ]
        }
    }
    
    step3 = dsl.ResourceOp(
        name=f'seldon-deployment-{v}',
        k8s_resource=seldon_deployment,
        action="create",
        attribute_outputs={"name": "{.metadata.name}"}
    )
    step3.after(step2)
    


if __name__ == "__main__":
    kubeflow_gateway_endpoint = "<kubeflow-gateway-endpoint>" # e.g. 172.0.0.1
    authservice_session_cookie = "<authservice_session_cookie>"
    
    client = kfp.Client(host=f"https://{kubeflow_gateway_endpoint}/pipeline",
                        cookies=f"authservice_session={authservice_session_cookie}",
                        ssl_ca_cert="cert/tls.crt") # need to store tls.crt before running the pipeline

    arguments = {
        "rand_iter": 1000,
        "rand_cv": 5,
    }


    client.create_run_from_pipeline_func(output_test,arguments=arguments,experiment_name="machine-failure")