In [1]:
import kfp
from kfp import dsl
from kfp.components import (
    InputPath,
    InputTextFile,
    OutputPath,
    OutputTextFile,
    func_to_container_op,
)

import pandas as pd
from typing import NamedTuple

import sys

sys.path.insert(0, "..")
from constants import NAMESPACE, HOST, NAMESPACE
from utils import get_session_cookie, get_or_create_experiment, get_or_create_pipeline

In [2]:
# Where all the runs belong to the pipeline reside in
EXPERIMENT_NAME = "mle-3-intrusion-detection-training"

## Define pipeline components

In [3]:
import constants

# The first component to download data, train-test split
# and then dump all the data for downstream components to use
def prepare_data(
    X_train_path: OutputPath("PKL"),
    y_train_path: OutputPath("PKL"),
    mean_path:  OutputPath("MEAN"),
    stdev_path: OutputPath("STDEV"),
) -> str:
    import numpy as np
    import joblib
    import random
    import logging
    import os

    import mlflow
    mlflow.set_tracking_uri(constants.TRACKING_URI) # Define which tracking server to use

    from alibi_detect import datasets
    from alibi_detect.utils.data import create_outlier_batch

    # Load the dataset from alibi_detect
    intrusions = datasets.fetch_kdd()

    # Set seed to ensure reproducibility
    def seed_everything(seed: int):
        random.seed(seed)
        os.environ["PYTHONHASHSEED"] = str(seed)
        np.random.seed(seed)

    seed_everything(42)

    # Create X_train and y_train
    logging.info("Creating data...")
    n_samples = 4000
    normal_batch = create_outlier_batch(
        intrusions.data, intrusions.target, n_samples=n_samples, perc_outlier=0
    )
    X_train, y_train = normal_batch.data.astype(float), normal_batch.target

    # Preprocess X_train and y_train
    mean, stdev = X_train.mean(axis=0), X_train.std(axis=0)
    X_train = (X_train - mean) / stdev

    # Dump data to pkl for downstream components to use
    logging.info("Dumping data...")
    joblib.dump(X_train, X_train_path)
    joblib.dump(y_train, y_train_path)
    joblib.dump(mean, mean_path)
    joblib.dump(stdev, stdev_path)

    # Log all artifacts to MLFlow
    # There are many ways to log. from log_param, log_metric, 
    # to log_artifact and log_artifacts, please refer to https://mlflow.org/docs/latest/tracking/tracking-api.html#logging-functions
    with mlflow.start_run() as run:
        mlflow.log_artifact(X_train_path)
        mlflow.log_artifact(y_train_path)
        mlflow.log_artifact(mean_path)
        mlflow.log_artifact(stdev_path)

    # Get the run_id to pass to the following step
    # so that 2 steps can log into the same run
    run_id = run.info.run_id

    return run_id

# Instead of using create_component_from_func,
# you can use this instead
prepare_data_op = func_to_container_op(
    func=prepare_data,
    base_image="python:3.9",
    packages_to_install=[
        "alibi-detect[tensorflow]==0.11.1",
        "cloudpickle==2.1.0",
        "kfp==1.8.22",
        "urllib3==1.26.15",
        "requests-toolbelt==0.10.1",  # To fix ImportError: cannot import name 'appengine' from 'urllib3.contrib'
        "mlflow==2.9.2"
    ],
    modules_to_capture=[
        "constants",
    ],
    use_code_pickling=True,
)

In [4]:
import intrusion_detection_model

# The 2nd component receives outputs from the 1st component
# and train
def train(
    run_id: str,
    X_train_path: InputPath("PKL"),
    y_train_path: InputPath("PKL"),
    vae_output_path: OutputPath("VAE"),
) -> NamedTuple('DummyOutputs', [('run_id', str), ('n_features', int)]):
    import joblib
    
    from alibi_detect.models.tensorflow import elbo
    
    import mlflow
    mlflow.set_tracking_uri(constants.TRACKING_URI) # Define which tracking server to use
    
    # Load data from the previous step
    X_train = joblib.load(X_train_path)
    y_train = joblib.load(y_train_path)

    # Train a VAE model
    n_features = X_train.shape[1]
    latent_dim = 2
    vae_trainer = intrusion_detection_model.Trainer("vae", n_features, latent_dim)

    # Pay attention here, we use the output run_id 
    # of the last step before logging
    with mlflow.start_run(run_id=run_id):
        # Define some params for training
        perc_outlier = 5
        
        vae_trainer.train(
            X_train,
            perc_outlier=perc_outlier,
            loss_fn=elbo,
            cov_elbo=dict(sim=0.01),
            epochs=1,
            verbose=True,
        )
        # We want to log some params to MLFlow
        mlflow.log_param("perc_outlier", 5)
    
        # Save the model for prediction
        vae_trainer.save_model(vae_output_path)

        # We also want to save the model to MLFlow,
        # unluckily, MLFLow does not support saving a VAE model directly in https://mlflow.org/docs/latest/models.html#built-in-model-flavors.
        # Therefore, you can use mlflow.pyfunc.log_model for your custom model, or simple mlflow.log_artifacts as follows
        mlflow.log_artifacts(vae_output_path)

        # and create a fake pyfunc model, so that you can register the model
        # , if you don't do it, you can not use the `register model` function of mlflow
        class MyModel(mlflow.pyfunc.PythonModel):
            def predict(self, context, model_input, params=None):
                return None
                
        mlflow.pyfunc.log_model(artifact_path="model", python_model=MyModel())

        # Continue to pass run_id to the following step
        return run_id, X_train.shape[1]

train_op = func_to_container_op(
    func=train,
    base_image="python:3.9",
    packages_to_install=[
        "alibi-detect[tensorflow]==0.11.1",
        "alibi==0.9.1",
        "scikit-learn==1.0.2",
        "joblib==1.3.2",
        "cloudpickle==2.1.0",
        "pandas==1.3.5",
        "kfp==1.8.22",
        "urllib3==1.26.15",
        "requests-toolbelt==0.10.1",  # To fix ImportError: cannot import name 'appengine' from 'urllib3.contrib'
        "mlflow==2.9.2"
    ],
    modules_to_capture=[
        "intrusion_detection_model",
        "constants"
    ],  # https://kubeflow-pipelines.readthedocs.io/en/1.8.13/source/kfp.components.html#kfp.components.func_to_container_op
    use_code_pickling=True,
)

2023-12-27 12:29:35.890597: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-12-27 12:29:36.009766: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-12-27 12:29:36.517598: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64
2023-12-27 12:29:36.517666: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot o

In [5]:
# The 3rd component receives outputs from the 2nd component
# in combination with prediction data from the 1st component
# to evaluate the model
def evaluate(
    run_id: str,
    n_features: int,
    mean_path: InputPath("MEAN"),
    stdev_path: InputPath("STDEV"),
    vae_output_path: InputPath("VAE"),
) -> NamedTuple("Outputs", [("mlpipeline_metrics", "Metrics"),]):
    from alibi_detect import datasets
    from alibi_detect.utils.data import create_outlier_batch

    import joblib
    import json

    import mlflow
    mlflow.set_tracking_uri(constants.TRACKING_URI) # Define which tracking server to use

    # Load the dataset from alibi_detect
    intrusions = datasets.fetch_kdd()
    
    # Generate an evaluation batch, and create evaluation data
    perc_outlier = 5
    n_samples = 10000

    # Create a batch of outlier for testing purposes
    outlier_batch = create_outlier_batch(
        intrusions.data,
        intrusions.target,
        n_samples=n_samples,
        perc_outlier=perc_outlier,
    )

    # Get features and target
    X_test, y_test = outlier_batch.data.astype(float), outlier_batch.target

    # Normalize based on mean and stdev from the training set
    mean = joblib.load(mean_path)
    stdev = joblib.load(stdev_path)
    X_test = (X_test - mean) / stdev

    # Again, specify hyper-params to construct our network
    latent_dim = 2

    vae_trainer = intrusion_detection_model.Trainer("vae", n_features, latent_dim)
    vae_trainer.load_model(vae_output_path)
    vae_preds = vae_trainer.predict(
        X_test,
        outlier_type="instance",  # use 'feature' or 'instance' level
        return_instance_score=True,  # Score used to determine outliers
    )

    # evaluate on the test data
    f1_score_value = vae_trainer.evaluate(
        y_test, vae_preds["data"]["is_outlier"]
    )
    metrics = {
        "metrics": [
            {
                "name": "f1_score",  # The name of the metric. Visualized as the column name in the runs table.
                "numberValue": f1_score_value,  # The value of the metric. Must be a numeric value.
                "format": "RAW",  # The optional format of the metric. Supported values are "RAW" (displayed in raw format) and "PERCENTAGE" (displayed in percentage format).
            }
        ]
    }
    with mlflow.start_run(run_id=run_id):
        mlflow.log_metric("f1_score", f1_score_value)
        
    return [json.dumps(metrics)]


evaluate_op = func_to_container_op(
    func=evaluate,
    base_image="python:3.9",
    packages_to_install=[
        "alibi-detect[tensorflow]==0.11.1",
        "alibi==0.9.1",
        "cloudpickle==2.1.0",
        "scikit-learn==1.0.2",
        "joblib==1.3.2",
        "pandas==1.3.5",
        "kfp==1.8.22",
        "urllib3==1.26.15",
        "requests-toolbelt==0.10.1",  # To fix ImportError: cannot import name 'appengine' from 'urllib3.contrib'
        "mlflow==2.9.2"
    ],
    modules_to_capture=[
        "intrusion_detection_model",
        "constants"
    ],  # https://kubeflow-pipelines.readthedocs.io/en/1.8.13/source/kfp.components.html#kfp.components.func_to_container_op
    use_code_pickling=True,
)

## Define some pipelines

In [6]:
@dsl.pipeline(name="Intrusion detection training", description="Intrusion detection.")
def intrusion_detection_pipeline():
    prepare_data_task = prepare_data_op()
    train_task = train_op(
        run_id=prepare_data_task.outputs["Output"],
        x_train=prepare_data_task.outputs["X_train"],
        y_train=prepare_data_task.outputs["y_train"],
    )
    print(train_task.outputs)
    evaluate_task = evaluate_op(
        run_id=train_task.outputs["run_id"],
        n_features=train_task.outputs["n_features"],
        mean=prepare_data_task.outputs["mean"],
        stdev=prepare_data_task.outputs["stdev"],
        vae_output=train_task.outputs["vae_output"],
    )

## Run the pipelines

In [7]:
# Get the token to authenticate to the `ml-pipeline` service
session_cookie = get_session_cookie()

# Initialize the client
client = kfp.Client(
    host=f"{HOST}/pipeline",
    cookies=f"authservice_session={session_cookie}",
    namespace=NAMESPACE,
)

In [8]:
client.create_run_from_pipeline_func(
    intrusion_detection_pipeline,
    arguments={},
    experiment_name=EXPERIMENT_NAME,
    namespace=NAMESPACE,
)

{'vae_output': {{pipelineparam:op=train;name=vae_output}}, 'run_id': {{pipelineparam:op=train;name=run_id}}, 'n_features': {{pipelineparam:op=train;name=n_features}}}


RunPipelineResult(run_id=4b61d015-20cb-44e9-ab21-aba189f4abc4)