In this third demo we will go over the basics of the Ray Job Submission Client in the SDK

In [None]:
# Import pieces from codeflare-sdk
from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication, RayJobClient

In [None]:
# Create authentication object for user permissions
# IF unused, SDK will automatically check for default kubeconfig, then in-cluster config
# KubeConfigFileAuthentication can also be used to specify kubeconfig path manually

auth_token = "XXXXX" # The auth_token is used later for the RayJobClient
auth = TokenAuthentication(
    token = auth_token,
    server = "XXXXX",
    skip_tls=False
)
auth.login()

In [None]:
# Create and configure our cluster object
cluster = Cluster(ClusterConfiguration(
    name='jobtest',
    namespace='default',
    num_workers=2,
    min_cpus=1,
    max_cpus=1,
    min_memory=4,
    max_memory=4,
    num_gpus=0,
    image="quay.io/project-codeflare/ray:latest-py39-cu118"
))

In [None]:
# Bring up the cluster
cluster.up()
cluster.wait_ready()

In [None]:
cluster.details()

### Ray Job Submission - Authorized Ray Cluster

* Submit a job using an authorized Ray dashboard and the Job Submission Client
* Provide an entrypoint command directed to your job script
* Set up your runtime environment

In [None]:
# Gather the dashboard URL
ray_dashboard = cluster.cluster_dashboard_uri()

# Create the header for passing your bearer token
header = {
    'Authorization': f'Bearer {auth_token}'
}

# Initialize the RayJobClient
client = RayJobClient(address=ray_dashboard, headers=header, verify=True)

In [None]:
# Submit an example mnist job using the RayJobClient
submission_id = client.submit_job(
    entrypoint="python mnist.py",
    runtime_env={"working_dir": "./","pip": "requirements.txt"},
)
print(submission_id)

In [None]:
# Get the job's logs
client.get_job_logs(submission_id)

In [None]:
# Get the job's status
client.get_job_status(submission_id)

In [None]:
# Get job related info
client.get_job_info(submission_id)

In [None]:
# List all existing jobs
client.list_jobs()

In [None]:
# Iterate through the logs of a job 
async for lines in client.tail_job_logs(submission_id):
    print(lines, end="") 

In [None]:
# Delete a job
# Can run client.cancel_job(submission_id) first if job is still running
client.delete_job(submission_id)

### Unauthorized Ray Cluster with the Ray Job Client

In [None]:
"""
Initialise the RayJobClient with the Ray Dashboard
"""
ray_dashboard = cluster.cluster_dashboard_uri()
client = RayJobClient(address=ray_dashboard, verify=False)

In [None]:
# Submit an example mnist job using the RayJobClient
submission_id = client.submit_job(
    entrypoint="python mnist.py",
    runtime_env={"working_dir": "./","pip": "requirements.txt"},
)
print(submission_id)

In [None]:
# Stop the job 
client.stop_job(submission_id)

In [None]:
# Delete the job
client.delete_job(submission_id)

In [None]:
cluster.down()

In [None]:
auth.logout()