# Training the Fraud Detection model with Kubeflow Training Operator

The example fraud detection model is very small and quickly trained. However, for many large models, training requires multiple GPUs and often multiple machines. In this notebook, you learn how to train a model by using Kubeflow Training Operator on OpenShift AI to scale out the model training. You use the Training Operator SDK to create a PyTorchJob executing the provided training script.

### Preparing Training Operator SDK

Training operator SDK is not available by default on Tensorflow notebooks.Therefore it needs to be installed first.

In [None]:
%pip install -qqU kubeflow-training==1.9.2

### Preparing the data

Normally, the training data for your model would be available in a shared location. For this example, the data is local. You must upload it to your object storage so that you can see how data loading from a shared data source works. Training data is downloaded via the training script and distributed among workers by DistributedSampler.

In [None]:
import sys
sys.path.append('./utils')

import utils.s3

utils.s3.upload_directory_to_s3("data", "data")
print("---")
utils.s3.list_objects("data")

### Authenticate to the cluster by using the OpenShift console login

Training Operator SDK requires authenticated access to the OpenShift cluster to create PyTorchJobs. The easiest way to get access details is through the OpenShift web console. 
 

1. To generate the command, select **Copy login command** from the username drop-down menu at the top right of the web console.

    <figure>
        <img src="./assets/copy-login.png"  alt="copy login"  >
    <figure/>

2. Click **Display token**.

3. Below **Log in with this token**, take note of the parameters for token and server.
   For example:
    ```
    oc login --token=sha256~LongString --server=https://api.your-cluster.domain.com:6443
    ```    
    - token: `sha256~LongString`
    - server: `https://api.your-cluster.domain.com:6443`
    
4. In the following code cell replace the token and server values with the values that you noted in Step 3.
   For example:
   ```
   api_server = "https://api.your-cluster.domain.com:6443"
   token = "sha256~LongString"
   ```


In [None]:
from kubernetes import client

api_server = "https://XXXX"
token = "sha256~XXXX"

configuration = client.Configuration()
configuration.host = api_server
configuration.api_key = {"authorization": f"Bearer {token}"}
# Un-comment if your cluster API server uses a self-signed certificate or an un-trusted CA
#configuration.verify_ssl = False

## Running the distributed training

### Initialize Training client

Initialize Training client using provided user credentials.

In [None]:
from kubeflow.training import TrainingClient

client = TrainingClient(client_configuration=configuration)

### Create PyTorchJob

Submit PyTorchJob using Training Operator SDK client.

Training script is imported from `kfto-scripts` folder.

Training script loads and distributes training dataset among nodes, performs distributed training, evaluation using test dataset, exports the trained model to onnx format and uploads it to the S3 bucket specified in provided connection.

Important note - If Kueue component is enabled in RHOAI then you must create all Kueue related resources (ResourceFlavor, ClusterQueue and LocalQueue) and provide LocalQueue name in the script below, also uncomment label declaration in create_job function.

In [None]:
import sys
import os
sys.path.append("./kfto-scripts")  # needed to make training function available in the notebook
from train_pytorch_cpu import train_func
from kubernetes.client import (
    V1EnvVar,
    V1EnvVarSource,
    V1SecretKeySelector
)

# Job name serves as unique identifier to retrieve job related informations using SDK
job_name = "fraud-detection"

# Specifies Kueue LocalQueue name.
# If Kueue component is enabled then you must create all Kueue related resources (ResourceFlavor, ClusterQueue and LocalQueue) and provide LocalQueue name here.
local_queue_name = "local-queue"

client.create_job(
    job_kind="PyTorchJob",
    name=job_name,
    train_func=train_func,
    num_workers=2,
    num_procs_per_worker="1",
    resources_per_worker={
        "memory": "4Gi",
        "cpu": 1,
    },
    base_image="quay.io/modh/training:py311-cuda124-torch251",
    # Uncomment the following line to add the queue-name label if Kueue component is enabled in RHOAI and all Kueue related resources are created. Replace `local_queue_name` with the name of your LocalQueue
#    labels={"kueue.x-k8s.io/queue-name": local_queue_name},
    env_vars=[
        V1EnvVar(name="AWS_ACCESS_KEY_ID", value=os.environ.get("AWS_ACCESS_KEY_ID")),
        V1EnvVar(name="AWS_S3_BUCKET", value=os.environ.get("AWS_S3_BUCKET")),
        V1EnvVar(name="AWS_S3_ENDPOINT", value=os.environ.get("AWS_S3_ENDPOINT")),
        V1EnvVar(name="AWS_SECRET_ACCESS_KEY", value=os.environ.get("AWS_SECRET_ACCESS_KEY")),
    ],
    packages_to_install=[
        "s3fs",
        "boto3",
        "scikit-learn",
        "onnx",
    ],
)

### Query important job information

In [None]:
import time


# Wait until the job finishes
print(f"PyTorchJob '{job_name}' is running.", end='')
while True:
    try:
        if client.is_job_running(name=job_name):
            print(".", end='')
        elif client.is_job_succeeded(name=job_name):
            print(".")
            print([x.message for x in client.get_job_conditions(name=job_name) if x.type == "Succeeded"][0])
            break
        elif client.is_job_failed(name=job_name):
            print(".")
            print([x.message for x in client.get_job_conditions(name=job_name) if x.type == "Failed"][0])
            break
        else:
            print(f"PyTorchJob '{job_name}' status not available or no conditions found.")
            break

    except Exception as e:
        print(f"Error getting PyTorchJob status: {e}.")

    time.sleep(3)

In [None]:
# Get the job logs
print(client.get_job_logs(name=job_name)[0]["fraud-detection-master-0"])

### Delete jobs

When finished you can delete the PyTorchJob.

In [None]:
client.delete_job(name=job_name)