# Training Fraud Detection using Codeflare

In this demo we will go over the basics of the Ray Job Submission Client in the SDK

### Authenticate to the cluster either using the SDK or OpenShift console login

In [None]:
# To launch a Ray cluster, you will need to authenticate yourself against the OpenShift cluster.
# Run this cell to get the full instructions.

import re
import os

NOTEBOOK_ARGS = os.environ.get('NOTEBOOK_ARGS', '')
match = re.search(r'"hub_host":"https://.*?(apps\.[^"]+)"', NOTEBOOK_ARGS)
hub_host_value = match.group(1)

login_url = 'https://oauth-openshift.' + hub_host_value + "/oauth/token/request"

print('Open the following URL to get your authentication token.')
print('Authenticate, then click on "Display token", and copy the content of the line under "Log in with this token"')
print('You can then come back here and paste this content in the next cell.')
print('Login URL: '+login_url)

In [None]:
#!oc login --token=sha256~XXXX --server=https://XXXX 
!

### Create Cluster

In [None]:
# Configuration of our Ray cluster
name = "raycluster-cpu"
namespace = !cat /var/run/secrets/kubernetes.io/serviceaccount/namespace
namespace = namespace[0]

ray_version = "2.33.0"
python_version = "py311"
cuda_version = "cu118"
image = f"docker.io/rayproject/ray:{ray_version}-{python_version}-{cuda_version}"
print(name, namespace, image)

The SDK will try to find the name of your default local queue based on the annotation "kueue.x-k8s.io/default-queue": "true" unless you specify the local queue manually below


In [None]:
from codeflare_sdk import Cluster, ClusterConfiguration

cluster = Cluster(ClusterConfiguration(
    name=name,
    namespace=namespace,
    head_gpus=0,
    num_gpus=0,
    num_workers=2,
    min_cpus=1,
    max_cpus=6,
    min_memory=4,
    max_memory=28,
    image=image,
    write_to_file=True, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources 
    # local_queue="local-queue-name" # Specify the local queue manually
))


### Bring up the cluster

In [None]:
cluster.up()
cluster.wait_ready()

### Alternatively, get a running cluster object

In [None]:
from codeflare_sdk import get_cluster

cluster = get_cluster(name, namespace=namespace)

In [None]:
cluster.details()

### Upload data to S3


In [None]:
import sys
sys.path.append('./utils')

import utils.s3

utils.s3.upload_directory_to_s3("data", "data")
print("---")
utils.s3.list_objects("data")

### Ray Job Submission

* Initialize the Cluster Job Client 
* Provide an entrypoint command directed to your job script
* Set up your [runtime environment](https://docs.ray.io/en/latest/ray-core/handling-dependencies.html#runtime-environments)

Some common runtime environment configurations include:

```python
runtime_env={
    "working_dir": "./", # relative path to files uploaded to the job
    "excludes": ["local_data/"], # directories and files to exclude from being uploaded to the job
    "pip": ["boto3", "botocore"], # can also be a string path to a requirements.txt file
    "env_vars": {
        "MY_ENV_VAR": "MY_ENV_VAR_VALUE",
        "MY_ENV_VAR_2": os.environ.get("MY_ENV_VAR_2"),
    },
}
```

Initialize the Job Submission Client


In [None]:
client = cluster.job_client

See if there are any existing jobs

In [None]:
client.list_jobs()

#### Some Sample Runtime Environments

In [None]:
for job_details in client.list_jobs():
    print(job_details.submission_id)
    client.delete_job(job_details.submission_id)

In [None]:
import os

# script = "test_data_loader.py"
script = "train_cpu.py"
runtime_env = {
    "working_dir": "./ray-scripts",
    "excludes": [],
    "pip": "./ray-scripts/requirements.txt",
    "env_vars": {
        "AWS_ACCESS_KEY_ID": os.environ.get("AWS_ACCESS_KEY_ID"),
        "AWS_SECRET_ACCESS_KEY": os.environ.get("AWS_SECRET_ACCESS_KEY"),
        "AWS_S3_ENDPOINT": os.environ.get("AWS_S3_ENDPOINT"),
        "AWS_DEFAULT_REGION": os.environ.get("AWS_DEFAULT_REGION"),
        "AWS_S3_BUCKET": os.environ.get("AWS_S3_BUCKET"),
        "TRAIN_DATA": "data/train.csv",
        "VALIDATE_DATA": "data/validate.csv",
        "MODEL_OUTPUT": "models/fraud/1/",
    },
}

### Submit the configured job

In [None]:
submission_id = client.submit_job(
    entrypoint=f"python {script}",
    runtime_env=runtime_env,
)

print(submission_id)

### Query Important Job Information

In [None]:
# Get the job's status
print(client.get_job_status(submission_id), "\n")

# Get job related info
print(client.get_job_info(submission_id), "\n")

# Get the job's logs
print(client.get_job_logs(submission_id))

In [None]:
# Iterate through the logs of a job 
async for lines in client.tail_job_logs(submission_id):
    print(lines, end="")

### Delete a job

In [None]:
client.stop_job(submission_id)
client.delete_job(submission_id)

### Delete the Cluster

In [None]:
cluster.down()