In [1]:
!pip install torch==1.13.1 mlflow==2.3.1 dill==0.3.6 numpy==1.23.5 pandas==2.0.1  tqdm==4.64.1 boto3 kagglehub 

Collecting mlflow
  Downloading mlflow-2.19.0-py3-none-any.whl.metadata (30 kB)
Collecting kfp-kubernetes
  Using cached kfp_kubernetes-1.4.0-py3-none-any.whl
Collecting boto3
  Downloading boto3-1.35.78-py3-none-any.whl.metadata (6.7 kB)
Collecting mlflow-skinny==2.19.0 (from mlflow)
  Downloading mlflow_skinny-2.19.0-py3-none-any.whl.metadata (31 kB)
Collecting Flask<4 (from mlflow)
  Using cached flask-3.1.0-py3-none-any.whl.metadata (2.7 kB)
Collecting alembic!=1.10.0,<2 (from mlflow)
  Using cached alembic-1.14.0-py3-none-any.whl.metadata (7.4 kB)
Collecting docker<8,>=4.0.0 (from mlflow)
  Using cached docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting graphene<4 (from mlflow)
  Using cached graphene-3.4.3-py2.py3-none-any.whl.metadata (6.9 kB)
Collecting gunicorn<24 (from mlflow)
  Using cached gunicorn-23.0.0-py3-none-any.whl.metadata (4.4 kB)
Collecting pyarrow<19,>=4.0.0 (from mlflow)
  Using cached pyarrow-18.1.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (3.3 kB

In [2]:
from kfp import dsl, compiler
from kfp.dsl import InputPath, OutputPath, Output
import os

@dsl.component(base_image="python:3.9", packages_to_install=["kagglehub", "pandas", "scikit-learn"])
def download_and_split_data(output_train: OutputPath(), output_val: OutputPath(), output_test: OutputPath()):
    import kagglehub
    import pandas as pd
    from sklearn.model_selection import train_test_split
    
    path = kagglehub.dataset_download("nelgiriyewithana/credit-card-fraud-detection-dataset-2023")
    df = pd.read_csv(f"{path}/creditcard_2023.csv")
    
    train_df, temp_df = train_test_split(df, train_size=0.6, random_state=42)
    val_df, test_df = train_test_split(temp_df, train_size=0.5, random_state=42)
    
    train_df.to_csv(output_train, index=False)
    val_df.to_csv(output_val, index=False)
    test_df.to_csv(output_test, index=False)

@dsl.component(
    base_image="python:3.9",
    packages_to_install=[
        "torch==1.13.1",
        "scikit-learn",
        "pandas",
        "mlflow==2.3.1",
        "boto3"
    ],
)
def train_model(
    train_data: InputPath(),
    val_data: InputPath(),
    model_output: OutputPath(),
    mlflow_tracking_uri: str,
    minio_endpoint: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    import pandas as pd
    import torch
    import torch.nn as nn
    import numpy as np
    import torch.optim as optim
    from torch.utils.data import DataLoader, Dataset
    from sklearn.preprocessing import StandardScaler
    import mlflow
    from mlflow.models import infer_signature
    import os
    
    mlflow.pytorch.autolog()
    
    class FraudDataset(Dataset):
        def __init__(self, file_path):
            data = pd.read_csv(file_path)
            self.X = data.drop(['Class', 'id'], axis=1).values
            self.y = data['Class'].values
            
            self.sample_input = self.X[0:1].astype(np.float32)
            
            self.X = torch.tensor(self.X, dtype=torch.float32)
            self.y = torch.tensor(self.y, dtype=torch.float32).unsqueeze(1)
        
        def __len__(self):
            return len(self.y)
        
        def __getitem__(self, idx):
            return self.X[idx], self.y[idx]

    class FraudDetectionModel(nn.Module):
        def __init__(self):
            super(FraudDetectionModel, self).__init__()
            self.model = nn.Sequential(
                nn.Linear(29, 64),
                nn.ReLU(),
                nn.Dropout(0.3),
                nn.Linear(64, 32),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(32, 16),
                nn.ReLU(),
                nn.Linear(16, 1),
                nn.Sigmoid()
            )
        
        def forward(self, x):
            if not isinstance(x, torch.Tensor):
                x = torch.tensor(x, dtype=torch.float32)
            elif x.dtype != torch.float32:
                x = x.float()
            return self.model(x)

    mlflow.set_tracking_uri(mlflow_tracking_uri)
    os.environ['MLFLOW_S3_ENDPOINT_URL'] = minio_endpoint
    os.environ['AWS_ACCESS_KEY_ID'] = aws_access_key_id
    os.environ['AWS_SECRET_ACCESS_KEY'] = aws_secret_access_key

    with mlflow.start_run(run_name="fraud-detection-training") as run:
        train_dataset = FraudDataset(train_data)
        val_dataset = FraudDataset(val_data)
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=32)

        model = FraudDetectionModel()
        criterion = nn.BCELoss()
        optimizer = optim.Adam(model.parameters(), lr=0.0001)

        # Training
        model.train()
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

        # Validation
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                outputs = model(X_batch)
                loss = criterion(outputs, y_batch)
                val_loss += loss.item()
            val_loss /= len(val_loader)
            mlflow.log_metric("val_loss", val_loss)

        # Get model signature
        with torch.no_grad():
            sample_output = model(torch.tensor(train_dataset.sample_input, dtype=torch.float32)).numpy()
        signature = infer_signature(train_dataset.sample_input, sample_output)

        # Save as TorchScript model
        scripted_model = torch.jit.script(model)
        
        # Log model
        mlflow.pytorch.log_model(
            scripted_model,
            "model",
            registered_model_name="fraud-detection-training"
        )

@dsl.component(
    base_image="python:3.9",
    packages_to_install=["mlflow==2.3.1", "kserve==0.14.0", "kubernetes==25.3.0"]
)
def deploy_model_kserve(
    mlflow_tracking_uri: str,
    minio_endpoint: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
    namespace: str = "admin"
) -> str:
    import mlflow
    from kubernetes.client import V1ObjectMeta
    from kubernetes import client as k8s_client, config as k8s_config
    from kserve import (
        constants,
        KServeClient,
        V1beta1InferenceService,
        V1beta1InferenceServiceSpec,
        V1beta1PredictorSpec,
        V1beta1ModelSpec,
        V1beta1ModelFormat,
    )
    import time

    # Initialize KServe client
    #config.load_incluster_config()
    kserve_client = KServeClient()

    # Get latest model version
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    mlflow_client = mlflow.MlflowClient()
    model_name = "fraud-detection-training"
    versions = mlflow_client.search_model_versions(f"name='{model_name}'")
    latest_version = sorted(versions, key=lambda x: x.creation_timestamp, reverse=True)[0]
    
    service_name = f"fraud-detection-{latest_version.version}"
    storage_uri = f"s3://mlflow/0/{latest_version.run_id}/artifacts/model/"

    # Create InferenceService
    isvc = V1beta1InferenceService(
        api_version=constants.KSERVE_V1BETA1,
        kind=constants.KSERVE_KIND,
        metadata=V1ObjectMeta(
            name=service_name,
            namespace=namespace,
            annotations={
                "sidecar.istio.io/inject": "false",
                "serving.kserve.io/deploymentMode": "Serverless"
            },
        ),
        spec=V1beta1InferenceServiceSpec(
            predictor=V1beta1PredictorSpec(
                model=V1beta1ModelSpec(
                    model_format=V1beta1ModelFormat(name="mlflow"),
                    storage_uri=storage_uri,
                    protocol_version="v2",
                    resources={
                        "requests": {
                            "cpu": "100m",
                            "memory": "1Gi"
                        },
                        "limits": {
                            "cpu": "1",
                            "memory": "2Gi"
                        }
                    }
                ),
                service_account_name="kserve-controller-s3",
            )
        ),
    )

    # Deploy service
    try:
        # Delete existing service if it exists
        try:
            kserve_client.delete(name=service_name, namespace=namespace)
            print(f"Deleted existing service {service_name}")
            time.sleep(30)
        except RuntimeError as e:
            if "NotFound" in str(e):
                print(f"No existing service {service_name} found")
            else:
                raise e

        # Create new service
        response = kserve_client.create(isvc)
        print(f"Created new service {service_name}")
        
        # Wait for the service to be ready
        kserve_client.wait_isvc_ready(
            name=service_name,
            namespace=namespace,
            timeout_seconds=300
        )
        
        # Return deployment status
        return f"Model deployed as: {service_name}"


    except Exception as e:
        print(f"Full error details: {str(e)}")
        raise e
@dsl.pipeline(name='fraud-detection-pipeline')
def fraud_detection_pipeline():
    mlflow_tracking_uri = os.getenv('MLFLOW_TRACKING_URI')
    minio_endpoint = os.getenv('MLFLOW_S3_ENDPOINT_URL')
    aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
    aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
    
    data_op = download_and_split_data()
    
    train_op = train_model(
        train_data=data_op.outputs['output_train'],
        val_data=data_op.outputs['output_val'],
        mlflow_tracking_uri=mlflow_tracking_uri,
        minio_endpoint=minio_endpoint,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key
    )
    
    deploy_op = deploy_model_kserve(
        mlflow_tracking_uri=mlflow_tracking_uri,
        minio_endpoint=minio_endpoint,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key
    )

if __name__ == '__main__':
    compiler.Compiler().compile(
        pipeline_func=fraud_detection_pipeline,
        package_path='fraud_detection_pipeline.yaml'
    )

In [3]:
import kfp
client = kfp.Client()
client.create_run_from_pipeline_package(
    'fraud_detection_pipeline.yaml',
    arguments={},
    experiment_name='Fraud Detection'
)

RunPipelineResult(run_id=fb54f8d9-b1d1-4464-b643-646175011809)