# 0.) Imports and Constants

In [1]:
import json
import kfp
from kfp.components import InputPath, OutputPath
import kfp.dsl as dsl
from kfp.dsl import PipelineConf, data_passing_methods
from kubernetes.client.models import V1Volume, V1PersistentVolumeClaimVolumeSource
import os
from pydoc import importfile
import requests
from tensorflow import keras
from typing import List


%load_ext lab_black

In [2]:
BASE_IMAGE = "quay.io/ibm/kubeflow-notebook-image-ppc64le:latest"

COMPONENT_CATALOG_FOLDER = f"{os.getenv('HOME')}/components"
COMPONENT_CATALOG_GIT = "https://github.com/lehrig/kubeflow-ppc64le-components.git"
COMPONENT_CATALOG_RELEASE = "main"

load_dataframe_via_trino_comp = kfp.components.load_component_from_file(
    "component.yaml"
)

ARGUMENTS = {
    "blackboard": "artefacts",
    "model_name": "fraud-detection",
    "cluster_configuration_secret": os.getenv(
        "CLUSTER_CONFIGURATION_SECRET", default=""
    ),
    "training_gpus": os.getenv("TRAINING_GPUS", default="1"),
    "training_node_selector": os.getenv("TRAINING_NODE_SELECTOR", default=""),
}
MODEL_NAME = ARGUMENTS["model_name"]

with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
    NAMESPACE = f.read()

ARGUMENTS

{'blackboard': 'artefacts',
 'model_name': 'fraud-detection',
 'cluster_configuration_secret': '',
 'training_gpus': '1',
 'training_node_selector': ''}

# 1.) Load Catalog with Reusable KF components

In [3]:
!git clone --branch $COMPONENT_CATALOG_RELEASE $COMPONENT_CATALOG_GIT $COMPONENT_CATALOG_FOLDER

fatal: destination path '/home/jovyan/components' already exists and is not an empty directory.


In [4]:
CATALOG = importfile(f"{COMPONENT_CATALOG_FOLDER}/catalog.py")

# 2.) Create custom components

## 2.1) Component: Preprocess data (dataset loading, rebalancing & splitting)

In [5]:
def preprocess_dataset(
    dataframe: InputPath(str),
    validation_dataset_dir: OutputPath(str),
    train_dataset_dir: OutputPath(str),
) -> List[str]:

    from imblearn.over_sampling import RandomOverSampler
    import math
    import numpy as np
    import os
    import pandas as pd

    from sklearn.impute import SimpleImputer
    from sklearn.preprocessing import (
        LabelEncoder,
        OneHotEncoder,
        FunctionTransformer,
        MinMaxScaler,
        LabelBinarizer,
    )
    from sklearn_pandas import DataFrameMapper

    def save_to_dir(x, y, directory):
        if not os.path.exists(directory):
            os.makedirs(directory)
        np.savez(os.path.join(directory, "data.npz"), x=x, y=y)

    def split_dataset(n, df):
        test = df.iloc[:n, :]
        train = df.iloc[n:, :]
        return test, train

    def merge_splits(frauds, non_frauds, split):
        print(
            f"{split} ratio fraud ({len(frauds)}) / non-fraud ({len(non_frauds)}):",
            len(frauds) / len(non_frauds),
        )
        df = pd.concat([frauds, non_frauds])
        df.sort_values("year_month_day_time", inplace=True)

        x, y = df.drop(["is fraud?"], axis=1), df["is fraud?"]
        min_ind = math.floor(len(x) / 128)
        x, y = x[-min_ind * 128 :], y[-min_ind * 128 :]
        y = y.astype("int")
        return x, y

    def timeEncoder(X):
        X_hm = X["time"].str.split(":", expand=True)
        d = pd.to_datetime(
            dict(
                year=X["year"],
                month=X["month"],
                day=X["day"],
                hour=X_hm[0],
                minute=X_hm[1],
            )
        ).astype(int)
        return pd.DataFrame(d)

    def amtEncoder(X):
        amt = (
            X.apply(lambda x: x[1:])
            .astype(float)
            .map(lambda amt: max(1, amt))
            .map(math.log)
        )
        return pd.DataFrame(amt)

    def decimalEncoder(X, length=5):
        dnew = pd.DataFrame()
        for i in range(length):
            dnew[i] = np.mod(X, 10)
            X = np.floor_divide(X, 10)
        return dnew

    def fraudEncoder(X):
        return np.where(X == "Yes", 1, 0).astype(int)

    # df_nf = pd.read_csv(f"{os.getenv('HOME')}/card_transactions_non-frauds.csv")
    # df_f = pd.read_csv(f"{os.getenv('HOME')}/card_transactions_frauds.csv")
    # tdf = pd.concat([df_nf, df_f])
    print("read in raw data")
    tdf = pd.read_feather(dataframe)
    tdf.columns = map(str.lower, tdf.columns)
    tdf["merchant name"] = tdf["merchant name"].astype(str)
    tdf.drop(["mcc", "zip", "merchant state"], axis=1, inplace=True)
    tdf.sort_values(by=["user", "card"], inplace=True)
    tdf.reset_index(inplace=True, drop=True)

    mapper = DataFrameMapper(
        [
            ("is fraud?", FunctionTransformer(fraudEncoder)),
            (
                "merchant name",
                [LabelEncoder(), FunctionTransformer(decimalEncoder), OneHotEncoder()],
            ),
            (
                "merchant city",
                [LabelEncoder(), FunctionTransformer(decimalEncoder), OneHotEncoder()],
            ),
            (["use chip"], [SimpleImputer(strategy="constant"), LabelBinarizer()]),
            (["errors?"], [SimpleImputer(strategy="constant"), LabelBinarizer()]),
            (
                ["year", "month", "day", "time"],
                [FunctionTransformer(timeEncoder), MinMaxScaler()],
            ),
            ("amount", [FunctionTransformer(amtEncoder), MinMaxScaler()]),
        ],
        input_df=True,
        df_out=True,
    )
    print("fit and transform dataframe")
    mapper.fit(tdf)
    tdf = mapper.transform(tdf)

    dataset = tdf
    dataset = dataset.sample(frac=1)  # shuffle randomly

    frauds = dataset[dataset["is fraud?"] == 1]
    non_frauds = dataset[dataset["is fraud?"] == 0]
    ratio = len(frauds) / len(non_frauds)
    print(
        f"{len(frauds)} Frauds ({len(frauds)/len(dataset)*100}%) and {len(non_frauds)} Non-Frauds ({len(non_frauds)/len(dataset)*100}%) - ratio: {ratio})."
    )

    test_ratio = 0.1
    n_test_frauds = int(test_ratio * len(frauds))
    n_test_non_frauds = int(test_ratio * len(non_frauds))
    n_train_frauds = len(frauds) - n_test_frauds
    n_train_non_frauds = len(non_frauds) - n_test_non_frauds
    # n_frauds = int(0.001 * len(dataset))
    # n_non_frauds = int(len(dataset) * 0.2 - n_frauds)

    print(f"Frauds in test split: {n_test_frauds}")
    test_frauds, train_frauds = split_dataset(n_test_frauds, frauds)

    print(f"Non-Frauds in test split: {n_test_non_frauds}")
    test_non_frauds, train_non_frauds = split_dataset(n_test_non_frauds, non_frauds)

    x_train, y_train = merge_splits(train_frauds, train_non_frauds, "Train")
    x_test, y_test = merge_splits(test_frauds, test_non_frauds, "Test")
    print(
        f"Using the following y-label: {y_train.name} and x-features: {x_train.columns}"
    )

    over_sampler = RandomOverSampler(random_state=37, sampling_strategy=0.1)
    train_input, train_target = over_sampler.fit_resample(x_train, y_train)
    # train_input, train_target = x_train, y_train # use this if you don't want to oversample
    print(
        sum(train_target == 0),
        "negative &",
        sum(train_target == 1),
        "positive training samples (after upsampling)",
    )
    print(
        sum(y_test == 0),
        "negative &",
        sum(y_test == 1),
        "positive test samples",
    )
    train = pd.concat([pd.DataFrame(train_target), pd.DataFrame(train_input)], axis=1)
    train.columns = dataset.columns
    train.sort_values("year_month_day_time", inplace=True)
    train_input, train_target = train.drop(["is fraud?"], axis=1), train["is fraud?"]

    train_target = train_target.to_numpy().reshape(len(train_target), 1)
    y_test = y_test.to_numpy().reshape(len(y_test), 1)

    save_to_dir(train_input.to_numpy(), train_target, train_dataset_dir)
    save_to_dir(x_test.to_numpy(), y_test, validation_dataset_dir)

    print(f"Pre-processed train dataset saved. Contents of '{train_dataset_dir}':")
    print(os.listdir("/".join(str(train_dataset_dir).split("/")[:-1])))
    print(f"Pre-processed test dataset saved. Contents of '{validation_dataset_dir}':")
    print(os.listdir("/".join(str(validation_dataset_dir).split("/")[:-1])))

    print(train_input.columns)
    return list(train_input.columns)


preprocess_dataset_comp = kfp.components.create_component_from_func(
    func=preprocess_dataset,
    base_image=BASE_IMAGE,
    packages_to_install=["imbalanced-learn", "scikit-learn", "sklearn-pandas"],
)

## 2.2) Specification: model training & evaluation

In [6]:
def train_model(
    model_dir: OutputPath(str),
    train_dataset_dir: InputPath(str),
    validation_dataset_dir: InputPath(str),
    epochs: int = 10,
    seqlen: int = 7,
):
    import numpy as np
    import os
    from tensorflow import keras
    from tensorflow.keras.callbacks import (
        EarlyStopping,
        ModelCheckpoint,
        ReduceLROnPlateau,
        TensorBoard,
    )
    from tensorflow.keras.layers import Input, LSTM, Dense
    from tensorflow.keras.metrics import (
        TruePositives,
        FalsePositives,
        FalseNegatives,
        TrueNegatives,
    )

    def load_dataset(path):
        data = np.load(os.path.join(path, "data.npz"), allow_pickle=True)
        x, y = data["x"], data["y"]
        x = np.asarray(x).astype(np.float32)
        y = np.asarray(y).astype(np.int_)
        dataset = keras.preprocessing.timeseries_dataset_from_array(
            x, y, sequence_length=seqlen, batch_size=128
        )
        return dataset

    if not os.path.exists(model_dir):
        os.makedirs(model_dir)

    train_dataset = load_dataset(train_dataset_dir)
    test_dataset = load_dataset(validation_dataset_dir)

    for batch in train_dataset.take(1):
        input_d, targets = batch
    print("Input shape:", input_d.numpy().shape, "Target shape:", targets.numpy().shape)

    input_shape = (input_d.shape[1], input_d.shape[2])
    inputs = Input(shape=input_shape)
    lstm_in = LSTM(100, batch_size=7, return_sequences=True)(inputs)
    lstm_out = LSTM(100, batch_size=7)(lstm_in)
    outputs = Dense(1, activation="sigmoid")(lstm_out)
    model = keras.Model(inputs=inputs, outputs=outputs)

    metrics = [
        "accuracy",
        TruePositives(name="tp"),
        FalsePositives(name="fp"),
        FalseNegatives(name="fn"),
        TrueNegatives(name="tn"),
    ]
    # loss = keras.losses.BinaryFocalCrossentropy(apply_class_balancing=True)
    model.compile(optimizer="adam", loss="binary_crossentropy", metrics=metrics)
    print(model.summary())

    print("Initializing training callbacks...")
    callbacks = [
        EarlyStopping(monitor="loss", patience=20, verbose=0, mode="min"),
        ModelCheckpoint(
            f"{model_dir}/best_model.keras",
            monitor="loss",
            save_best_only=True,
            save_weights_only=True,
            mode="min",
        ),
        ReduceLROnPlateau(
            monitor="loss",
            factor=0.1,
            patience=7,
            verbose=1,
            min_delta=0.0001,
            mode="min",
        ),
        TensorBoard(
            log_dir=os.environ["TENSORBOARD_S3_ADDRESS"],
            histogram_freq=1,
        ),
    ]

    model.fit(
        train_dataset,
        epochs=epochs,
        verbose=3,
        callbacks=callbacks,
    )

    results = model.evaluate(test_dataset)
    print("Evaluation Loss, Accuracy, TP, FP, FN, TN:", results)
    TP, FP, FN, TN = results[2:]
    if TP != 0:
        PR = TP / (FP + TP)
        RE = TP / (FN + TP)
        print("F1 Measure:", 2 * (PR * RE / (PR + RE)))

    model.save(model_dir)


train_specification = kfp.components.func_to_component_text(func=train_model)

## 2.3) Component: Prediction on Test Set for Confusion Matrix

In [7]:
def predict(
    model_dir: InputPath(str),
    test_dataset_dir: InputPath(str),
    predictions_dir: OutputPath(str),
    seq_len: int = 7,
):

    from tensorflow import keras
    import numpy as np
    import os

    data = np.load(os.path.join(test_dataset_dir, "data.npz"), allow_pickle=True)
    x, y = data["x"], data["y"]
    x = np.asarray(x).astype(np.float32)
    y = np.asarray(y).astype(np.int_)
    test_dataset = keras.preprocessing.timeseries_dataset_from_array(
        x, y, sequence_length=seq_len, batch_size=128
    )
    y = np.concatenate([b for a, b in test_dataset], axis=0)
    model = keras.models.load_model(model_dir)
    preds = model.predict(test_dataset)
    preds = [str(int(pred[0] > 0.5)) + "\n" for pred in preds]
    y = [str(x[0].item()) + "\n" for x in y]

    if not os.path.exists(predictions_dir):
        os.makedirs(predictions_dir)
    with open(os.path.join(predictions_dir, "ytrue.txt"), "w") as f:
        f.writelines(y)
    with open(os.path.join(predictions_dir, "ypred.txt"), "w") as f:
        f.writelines(preds)


predict_comp = kfp.components.create_component_from_func(
    func=predict, base_image=BASE_IMAGE
)

## 2.4) Component: Deploy to Linux

In [8]:
def deploy_to_aix(
    model_version: int = 1,
):
    import requests

    print("Updating model at AIX...")
    response = requests.get(
        f"http://p114oracle.pbm.ihost.com:3000/update?model_version={model_version}"
    )
    print(response.text)


deploy_to_aix_comp = kfp.components.create_component_from_func(
    func=deploy_to_aix, base_image=BASE_IMAGE
)

# 3.) Create the actual pipeline by combining the components

In [9]:
@dsl.pipeline(
    name="Fraud detection",
    description="An example pipeline that tries to predict fraudulent credit card transactions",
)
def fraud_pipeline(
    blackboard: str,
    model_name: str,
    cluster_configuration_secret: str,
    training_gpus: int,
    training_node_selector: str,
):
    create_blackboard = dsl.VolumeOp(
        name="Create Artefacts Blackboard",
        resource_name=blackboard,
        modes=dsl.VOLUME_MODE_RWO,
        size="4Gi",
        set_owner_reference=True,
    )

    load_dataframe_via_trino_task = load_dataframe_via_trino_comp(
        query="SELECT * FROM  jtopen.demo.fraud",
        columns=None,
        columns_query="SHOW COLUMNS FROM jtopen.demo.fraud",
    )
    load_dataframe_via_trino_task.after(create_blackboard)

    CATALOG.create_dataset_quality_report(
        dataset_dir=load_dataframe_via_trino_task.outputs["dataframe"],
        dataset_type="df/feather",
    )

    preprocess_dataset_task = preprocess_dataset_comp(
        dataframe=load_dataframe_via_trino_task.outputs["dataframe"]
    )

    monitor_training_task = CATALOG.monitor_training_comp()

    train_parameters = {
        "train_dataset_dir": "train_dataset_dir",
        "validation_dataset_dir": "validation_dataset_dir",
        "model_dir": "model_dir",
        "epochs": "2",
        "seqlen": "4",
    }

    train_model_task = CATALOG.train_model_comp(
        preprocess_dataset_task.outputs["train_dataset_dir"],
        preprocess_dataset_task.outputs["validation_dataset_dir"],
        train_specification,
        train_parameters,
        model_name=model_name,
        gpus=training_gpus,
        node_selector=training_node_selector,
        tensorboard_s3_address=monitor_training_task.outputs["tensorboard_s3_address"],
        cluster_configuration_secret=cluster_configuration_secret,
    )

    # plot_confusion_matrix_task = CATALOG.plot_confusion_matrix_comp(
    #    input_columns=preprocess_dataset_task.outputs["output"],
    #    label_columns={"is fraud": [0, 1]},
    #    test_dataset_dir=preprocess_dataset_task.outputs["validation_dataset_dir"],
    #    model_dir=train_model_task.outputs["model_dir"],
    #    seq_len=int(train_parameters["seqlen"]),
    # )

    predict_task = predict_comp(
        test_dataset_dir=preprocess_dataset_task.outputs["validation_dataset_dir"],
        model_dir=train_model_task.outputs["model_dir"],
    )

    CATALOG.plot_confusion_matrix_predictions_comp(
        predictions_dir=predict_task.outputs["predictions_dir"]
    )

    convert_model_to_onnx_task = CATALOG.convert_model_to_onnx_comp(
        train_model_task.outputs["model_dir"]
    )

    upload_model_task = CATALOG.upload_model_comp(
        file_dir=convert_model_to_onnx_task.outputs["onnx_model_dir"],
        project_name=model_name,
    )

    CATALOG.deploy_model_with_kserve_comp(
        project_name=model_name,
        model_version=upload_model_task.outputs["model_version"],
    )


# deploy_to_aix_comp(upload_model_task.outputs["model_version"])

# 4.) Run the pipeline within an experiment
reate a pipeline run, using a pipeline configuration that:

- enables data passing via persistent volumes (faster than the default MinIO-based passing)
- disables caching (which currently is not supported for data passing via volumes)

In [10]:
def disable_cache_transformer(op):
    if isinstance(op, dsl.ContainerOp):
        op.execution_options.caching_strategy.max_cache_staleness = "P0D"
    else:
        op.add_pod_annotation(
            name="pipelines.kubeflow.org/max_cache_staleness", value="P0D"
        )
    return op


pipeline_conf = PipelineConf()
# pipeline_conf.add_op_transformer(disable_cache_transformer)
# pipeline_conf.data_passing_method = data_passing_methods.KubernetesVolume(
#     volume=V1Volume(
#         name=ARGUMENTS["blackboard"],
#         persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
#             "{{workflow.name}}-%s" % ARGUMENTS["blackboard"]
#         ),
#     ),
#     path_prefix=f'{ARGUMENTS["blackboard"]}/',
# )

kfp.Client().create_run_from_pipeline_func(
    fraud_pipeline,
    arguments=ARGUMENTS,
    namespace=NAMESPACE,
    pipeline_conf=pipeline_conf,
)

RunPipelineResult(run_id=ea815019-3542-40fb-ae17-f22026d511c4)

# 5.) Inference 

In [11]:
HOST = f"{MODEL_NAME}-predictor-default.{NAMESPACE}"
HEADERS = {"Host": HOST}
MODEL_ENDPOINT = f"http://{MODEL_NAME}-predictor-default/v2/models/model"

res_svc = requests.get(MODEL_ENDPOINT, headers=HEADERS)
response_svc = json.loads(res_svc.text)
response_svc

{'name': 'model',
 'versions': ['53'],
 'platform': 'onnxruntime_onnx',
 'inputs': [{'name': 'input_1', 'datatype': 'FP32', 'shape': [-1, 4, 103]}],
 'outputs': [{'name': 'dense', 'datatype': 'FP32', 'shape': [-1, 1]}]}

In [12]:
def get_data_table():
    import pandas as pd
    from trino.dbapi import Connection

    with Connection(
        host="trino.trino",
        port="8080",
        user="anybody",
        catalog="jtopen",
        schema="demo",
    ) as conn:
        link = conn.cursor()
        link.execute("SELECT * FROM fraud LIMIT 20")
        return pd.DataFrame(link.fetchall())


vdf = get_data_table()
print(f"Retrieved {len(vdf)} rows")

Retrieved 20 rows


In [13]:
x, y = vdf.drop([0], axis=1).to_numpy(), vdf[0].to_numpy().reshape(len(vdf), 1)
dataset = keras.preprocessing.timeseries_dataset_from_array(
    x, y, sequence_length=response_svc["inputs"][0]["shape"][1], batch_size=128
)

PREDICT_ENDPOINT = MODEL_ENDPOINT + "/infer"

for batch in dataset.take(10):
    input_d, output_d = batch[0], batch[1]
    for in_x, out_y in zip(input_d, output_d):
        payload = {
            "inputs": [
                {
                    "name": response_svc["inputs"][0]["name"],
                    "shape": [
                        1,
                        4,
                        111,
                    ],  # has to match response_svc["inputs"][0]["shape"] (except for 1. dimension)
                    "datatype": response_svc["inputs"][0]["datatype"],
                    "data": in_x.numpy().tolist(),
                }
            ]
        }
        res = requests.post(PREDICT_ENDPOINT, headers=HEADERS, data=json.dumps(payload))
        response = json.loads(res.text)
        print(response["outputs"])
        pred = response["outputs"][0]["data"][0]
        print(
            f"Actual ({out_y.numpy()[0]}) vs. Prediction ({round(pred, 3)} => {int(round(pred, 0))})"
        )

2023-09-21 18:45:46.960170: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2023-09-21 18:45:46.960218: W tensorflow/stream_executor/cuda/cuda_driver.cc:263] failed call to cuInit: UNKNOWN ERROR (303)
2023-09-21 18:45:46.960286: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:163] no NVIDIA GPU device is present: /dev/nvidia0 does not exist


ValueError: Failed to convert a NumPy array to a Tensor (Unsupported object type decimal.Decimal).