# Financial Fraud Detection - Interactive Kubeflow Pipeline

This notebook runs inside a Kubeflow Notebook Server and lets you:

1. Define the complete pipeline inline
2. Submit runs directly via KFP SDK (no YAML files)
3. Monitor progress in real-time
4. Query the trained model via Triton

**Run this from a Kubeflow Notebook Server in the `team-1` namespace.**

---
## Section 1: Setup & Imports

In [None]:
# Install required packages (run once)
!pip install -q kfp==2.10.1 kfp-kubernetes==1.4.0 boto3 requests

In [None]:
import json
import time
from typing import Dict

import kfp
from kfp import compiler, dsl
from kfp import kubernetes

print(f"KFP SDK version: {kfp.__version__}")

In [None]:
# Configuration
AWS_REGION = "<YOUR REGION>"
AWS_ACCOUNT = "<YOUR ACCOUNT NUMBER>"

# S3 paths
S3_BUCKET = f"ml-on-containers-{AWS_ACCOUNT}"
MODEL_BUCKET = f"ml-on-containers-{AWS_ACCOUNT}-model-registry"
RAW_DATA_PATH = "data/TabFormer/raw/card_transaction.v1.csv"
SCRIPT_URL = "https://raw.githubusercontent.com/aws-samples/amazon-eks-machine-learning-with-terraform-and-kubeflow/main/examples/fraud-detection/workflows/src/workflows/components/preprocess_tabformer.py"

# Container images
RAPIDS_IMAGE = "rapidsai/base:25.12-cuda13-py3.12"
TRAINING_IMAGE = f"{AWS_ACCOUNT}.dkr.ecr.{AWS_REGION}.amazonaws.com/nvidia-training-repo:latest"

# PVC settings
DATA_PVC_SIZE = "100Gi"
MODEL_PVC_SIZE = "10Gi"
STORAGE_CLASS = "gp3"

# Kubeflow
KFP_NAMESPACE = "team-1"
TRITON_ENDPOINT = "http://triton-server-triton-inference-server.triton.svc.cluster.local:8005"

print("Configuration loaded.")

---
## Section 2: Connect to Kubeflow Pipelines

In [None]:
# Connect to the KFP API (auto-discovers in-cluster endpoint)
client = kfp.Client()

print(f"KFP API endpoint: {client.get_kfp_healthz_endpoint()}")
print(f"UI: {client.get_kfp_ui_endpoint()}")

In [None]:
# List recent runs in our namespace
runs = client.list_runs(namespace=KFP_NAMESPACE, page_size=5)
for run in runs.runs or []:
    print(f"{run.display_name}: {run.state}")

---
## Section 3: Define Pipeline Components

We define 5 components that make up the pipeline:
1. **download_raw_data_to_pvc** - Fetch CSV and preprocessing script from S3
2. **run_cudf_preprocessing** - GPU-accelerated preprocessing with RAPIDS
3. **prepare_training_config** - Write training hyperparameters
4. **run_nvidia_training** - GNN+XGBoost training on GPU
5. **upload_model_to_s3** - Push trained model to S3 for Triton

In [None]:
@dsl.component(
    base_image="python:3.11",
    packages_to_install=["boto3", "botocore", "s3transfer", "jmespath", "python-dateutil", "urllib3", "six", "requests"],
)
def download_raw_data_to_pvc(
    s3_bucket: str,
    s3_region: str,
    raw_data_path: str,
    script_url: str,
    data_mount_path: str = "/data",
):
    """Download raw CSV from S3 and preprocessing script from GitHub to PVC."""
    import os
    import boto3
    import requests

    s3 = boto3.client("s3", region_name=s3_region)

    raw_dir = os.path.join(data_mount_path, "raw")
    os.makedirs(raw_dir, exist_ok=True)
    local_csv = os.path.join(raw_dir, "card_transaction.v1.csv")
    print(f"Downloading s3://{s3_bucket}/{raw_data_path} to {local_csv}")
    s3.download_file(s3_bucket, raw_data_path, local_csv)
    print(f"Downloaded {os.path.getsize(local_csv)} bytes")

    os.chmod(data_mount_path, 0o777)
    os.chmod(raw_dir, 0o777)
    os.chmod(local_csv, 0o666)

    # Download preprocessing script from GitHub
    local_script = os.path.join(data_mount_path, "preprocess_tabformer.py")
    print(f"Downloading {script_url} to {local_script}")
    resp = requests.get(script_url)
    resp.raise_for_status()
    with open(local_script, "w") as f:
        f.write(resp.text)
    os.chmod(local_script, 0o755)
    print(f"Downloaded preprocessing script ({len(resp.text)} bytes)")

print("Component 1/5 defined: download_raw_data_to_pvc")

In [None]:
@dsl.container_component
def run_cudf_preprocessing():
    """Run cuDF preprocessing on GPU using RAPIDS container."""
    return dsl.ContainerSpec(
        image=RAPIDS_IMAGE,
        command=["/bin/bash", "-c"],
        args=[
            "pip install category_encoders scikit-learn && "
            "echo '=== Starting cuDF preprocessing ===' && "
            "python /data/preprocess_tabformer.py /data && "
            "echo '=== Preprocessing complete ===' && "
            "find /data/gnn -type f | head -20"
        ],
    )

print("Component 2/5 defined: run_cudf_preprocessing")

In [None]:
@dsl.component(base_image="python:3.11")
def prepare_training_config(
    data_mount_path: str = "/data",
    gnn_hidden_channels: int = 32,
    gnn_n_hops: int = 2,
    gnn_num_epochs: int = 8,
    xgb_max_depth: int = 6,
    xgb_num_boost_round: int = 512,
):
    """Write training config JSON to PVC."""
    import json
    import os

    config = {
        "paths": {"data_dir": "/data/gnn", "output_dir": "/trained_models"},
        "models": [{
            "kind": "GNN_XGBoost",
            "gpu": "single",
            "hyperparameters": {
                "gnn": {
                    "hidden_channels": gnn_hidden_channels,
                    "n_hops": gnn_n_hops,
                    "layer": "SAGEConv",
                    "dropout_prob": 0.1,
                    "batch_size": 4096,
                    "fan_out": 10,
                    "num_epochs": gnn_num_epochs,
                },
                "xgb": {
                    "max_depth": xgb_max_depth,
                    "learning_rate": 0.2,
                    "num_parallel_tree": 3,
                    "num_boost_round": xgb_num_boost_round,
                    "gamma": 0.0,
                },
            },
        }],
    }

    config_path = os.path.join(data_mount_path, "config.json")
    with open(config_path, "w") as f:
        json.dump(config, f, indent=2)
    print(f"Config written to {config_path}")

print("Component 3/5 defined: prepare_training_config")

In [None]:
@dsl.container_component
def run_nvidia_training():
    """Run NVIDIA GNN+XGBoost training container."""
    return dsl.ContainerSpec(
        image=TRAINING_IMAGE,
        command=["/bin/bash", "-c"],
        args=[
            "ln -sf /data/config.json /app/config.json && "
            "cat /app/config.json && "
            "echo '=== GNN Data ===' && ls -la /data/gnn/ && "
            "cd /app && "
            "torchrun --standalone --nnodes=1 --nproc-per-node=1 main.py --config /app/config.json && "
            "echo '=== Training Complete ===' && ls -la /trained_models/"
        ],
    )

print("Component 4/5 defined: run_nvidia_training")

In [None]:
@dsl.component(
    base_image="python:3.11",
    packages_to_install=["boto3", "botocore", "s3transfer", "jmespath", "python-dateutil", "urllib3", "six"],
)
def upload_model_to_s3(
    model_bucket: str,
    model_mount_path: str = "/trained_models",
    s3_prefix: str = "model-repository",
    s3_region: str = "us-west-2",
) -> dict:
    """Upload trained model from PVC to S3."""
    import os
    from pathlib import Path
    import boto3

    s3 = boto3.client("s3", region_name=s3_region)
    model_repo = Path(model_mount_path) / "python_backend_model_repository"
    if not model_repo.exists():
        model_repo = Path(model_mount_path)

    uploaded = 0
    total_bytes = 0
    for root, dirs, files in os.walk(model_repo):
        for f in files:
            local_path = os.path.join(root, f)
            rel_path = os.path.relpath(local_path, model_repo)
            s3_key = f"{s3_prefix}/{rel_path}"
            size = os.path.getsize(local_path)
            print(f"Uploading {rel_path} ({size} bytes)")
            s3.upload_file(local_path, model_bucket, s3_key)
            uploaded += 1
            total_bytes += size

    s3_uri = f"s3://{model_bucket}/{s3_prefix}"
    print(f"Uploaded {uploaded} files ({total_bytes} bytes) to {s3_uri}")
    return {"s3_uri": s3_uri, "files": uploaded, "bytes": total_bytes}

print("Component 5/5 defined: upload_model_to_s3")

---
## Section 4: Define the Pipeline

In [None]:
@dsl.pipeline(
    name="fraud-detection-cudf-pipeline",
    description="End-to-end fraud detection with RAPIDS/cuDF preprocessing.",
)
def fraud_detection_pipeline(
    s3_bucket: str = S3_BUCKET,
    model_bucket: str = MODEL_BUCKET,
    s3_region: str = AWS_REGION,
    raw_data_path: str = RAW_DATA_PATH,
    script_url: str = SCRIPT_URL,
    gnn_num_epochs: int = 8,
    xgb_num_boost_round: int = 512,
):
    """End-to-end fraud detection pipeline."""
    # Create PVCs
    data_pvc = kubernetes.CreatePVC(
        pvc_name_suffix="-fraud-data",
        access_modes=["ReadWriteOnce"],
        size=DATA_PVC_SIZE,
        storage_class_name=STORAGE_CLASS,
    )
    model_pvc = kubernetes.CreatePVC(
        pvc_name_suffix="-fraud-model",
        access_modes=["ReadWriteOnce"],
        size=MODEL_PVC_SIZE,
        storage_class_name=STORAGE_CLASS,
    )

    # Step 1: Download
    download_task = download_raw_data_to_pvc(
        s3_bucket=s3_bucket,
        s3_region=s3_region,
        raw_data_path=raw_data_path,
        script_url=script_url,
    )
    download_task.after(data_pvc)
    download_task.set_caching_options(False)
    kubernetes.mount_pvc(download_task, pvc_name=data_pvc.outputs["name"], mount_path="/data")

    # Step 2: Preprocess (GPU)
    preprocess_task = run_cudf_preprocessing()
    preprocess_task.after(download_task)
    kubernetes.mount_pvc(preprocess_task, pvc_name=data_pvc.outputs["name"], mount_path="/data")
    kubernetes.add_node_selector(preprocess_task, label_key="nvidia.com/gpu", label_value="true")
    preprocess_task.set_memory_request("16Gi").set_memory_limit("50Gi")
    preprocess_task.set_cpu_request("4").set_cpu_limit("8")
    preprocess_task.set_accelerator_limit(1)
    preprocess_task.set_accelerator_type("nvidia.com/gpu")
    preprocess_task.set_caching_options(False)

    # Step 3: Config
    config_task = prepare_training_config(
        gnn_num_epochs=gnn_num_epochs,
        xgb_num_boost_round=xgb_num_boost_round,
    )
    config_task.after(preprocess_task)
    kubernetes.mount_pvc(config_task, pvc_name=data_pvc.outputs["name"], mount_path="/data")
    config_task.set_caching_options(False)

    # Step 4: Train (GPU)
    train_task = run_nvidia_training()
    train_task.after(config_task)
    train_task.after(model_pvc)
    kubernetes.mount_pvc(train_task, pvc_name=data_pvc.outputs["name"], mount_path="/data")
    kubernetes.mount_pvc(train_task, pvc_name=model_pvc.outputs["name"], mount_path="/trained_models")
    kubernetes.add_node_selector(train_task, label_key="nvidia.com/gpu", label_value="true")
    train_task.set_memory_request("16Gi").set_memory_limit("32Gi")
    train_task.set_cpu_request("4").set_cpu_limit("8")
    train_task.set_accelerator_limit(1)
    train_task.set_accelerator_type("nvidia.com/gpu")
    train_task.set_caching_options(False)

    # Step 5: Upload
    upload_task = upload_model_to_s3(
        model_bucket=model_bucket,
        s3_region=s3_region,
    )
    upload_task.after(train_task)
    kubernetes.mount_pvc(upload_task, pvc_name=model_pvc.outputs["name"], mount_path="/trained_models")
    upload_task.set_caching_options(False)

    # Cleanup PVCs
    kubernetes.DeletePVC(pvc_name=data_pvc.outputs["name"]).after(upload_task)
    kubernetes.DeletePVC(pvc_name=model_pvc.outputs["name"]).after(upload_task)

print("Pipeline defined: fraud_detection_pipeline")

---
## Section 5: Submit & Monitor Pipeline Run

In [None]:
# Submit the pipeline run
run = client.create_run_from_pipeline_func(
    fraud_detection_pipeline,
    arguments={
        "gnn_num_epochs": 8,
        "xgb_num_boost_round": 512,
    },
    run_name="notebook-demo-run",
    namespace=KFP_NAMESPACE,
    experiment_name="fraud-detection-demos",
)

print(f"Run submitted: {run.run_id}")
print(f"View in UI: {client.get_kfp_ui_endpoint()}/#/runs/details/{run.run_id}")

In [None]:
# Monitor progress
def monitor_run(run_id, poll_interval=30):
    while True:
        run_detail = client.get_run(run_id)
        state = run_detail.state
        print(f"[{time.strftime('%H:%M:%S')}] Status: {state}")
        if state in ["SUCCEEDED", "FAILED", "SKIPPED", "ERROR"]:
            return state
        time.sleep(poll_interval)

# Uncomment to monitor:
# final_state = monitor_run(run.run_id)

In [None]:
# Check run status manually
run_detail = client.get_run(run.run_id)
print(f"State: {run_detail.state}")
print(f"Created: {run_detail.created_at}")

---
## Section 6: Query Triton Inference Server

In [None]:
import requests

# Check Triton health
try:
    resp = requests.get(f"{TRITON_ENDPOINT}/v2/health/ready", timeout=5)
    print(f"Triton ready: {resp.status_code == 200}")
except Exception as e:
    print(f"Cannot reach Triton: {e}")

In [None]:
# List loaded models
try:
    resp = requests.get(f"{TRITON_ENDPOINT}/v2/models", timeout=5)
    models = resp.json()
    print("Loaded models:")
    for model in models.get("models", []):
        print(f"  - {model['name']} (v{model.get('version', '?')})")
except Exception as e:
    print(f"Error: {e}")

In [None]:
# Get model metadata
MODEL_NAME = "prediction_and_shapley"
try:
    resp = requests.get(f"{TRITON_ENDPOINT}/v2/models/{MODEL_NAME}", timeout=5)
    print(json.dumps(resp.json(), indent=2))
except Exception as e:
    print(f"Error: {e}")

---
## Cleanup

Delete old pipeline runs to free resources.

In [None]:
# List all runs
runs = client.list_runs(namespace=KFP_NAMESPACE)
for r in runs.runs or []:
    print(f"{r.run_id[:8]}... {r.display_name}: {r.state}")

In [None]:
# Delete a specific run (uncomment and set run_id)
# client.delete_run(run_id="...")