In [2]:
# Import pieces from codeflare-sdk
from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication
import os
import sys

In [3]:
# Authenticate the CodeFlare SDK
# On OpenShift, you can retrieve the token by running `oc whoami -t`,
# and the server with `oc cluster-info`.
auth = TokenAuthentication(
    token = '',
    server = 'https://api.sno.sandbox1990.opentlc.com:6443',
    skip_tls=True
)
auth.login()



'Logged into https://api.sno.sandbox1990.opentlc.com:6443'

In [4]:
cluster = Cluster(ClusterConfiguration(
    name='ray',
    namespace='llama-serving', # Update to your namespace
    num_workers=2,
    head_cpus=2,
    head_memory=8,
    min_cpus=1,
    max_cpus=2,
    min_memory=2,
    max_memory=8,
    num_gpus=1,
    #worker_extended_resource_requests={"nvidia.com/gpu": 1}, # gpu
    image="quay.io/eformat/ray-runner:latest", # cuda image
    write_to_file=False, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources 
    # local_queue="local-queue-name" # Specify the local queue manually
))


Yaml resources loaded for ray




In [5]:
# Bring up the cluster
cluster.up()
cluster.wait_ready()

Waiting for requested resources to be set up...
Requested cluster is up and running!
Dashboard is ready!


In [9]:
cluster.details()

RayCluster(name='ray', status=<RayClusterStatus.READY: 'ready'>, head_cpus=2, head_mem='8G', workers=2, worker_mem_min='2G', worker_mem_max='8G', worker_cpu=1, namespace='llama-serving', dashboard='https://ray-dashboard-ray-llama-serving.apps.sno.sandbox1990.opentlc.com', worker_extended_resources={'nvidia.com/gpu': 1}, head_extended_resources={})

In [10]:
# Initialize the Job Submission Client
"""
The SDK will automatically gather the dashboard address and authenticate using the Ray Job Submission Client
"""
client = cluster.job_client

In [11]:
# Submit an example mnist job using the Job Submission Client
submission_id = client.submit_job(
    entrypoint="python read-map.py",
    runtime_env={"working_dir": "./"}, # "pip": "requirements.txt"
)
print(submission_id)

2024-10-03 21:30:23,836	INFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_648c077d37ca2b5e.zip.
2024-10-03 21:30:23,838	INFO packaging.py:530 -- Creating a file package for local directory './'.


raysubmit_X1HxPUk95azHDjwh


In [13]:
# Get the job's logs
client.get_job_logs(submission_id)

"2024-10-03 21:30:23,920\tINFO job_manager.py:528 -- Runtime env is setting up.\n2024-10-03 21:30:25,595\tINFO worker.py:1461 -- Using address 10.128.1.89:6379 set in the environment variable RAY_ADDRESS\n2024-10-03 21:30:25,595\tINFO worker.py:1601 -- Connecting to existing Ray cluster at address: 10.128.1.89:6379...\n2024-10-03 21:30:25,610\tINFO worker.py:1777 -- Connected to Ray cluster. View the dashboard at \x1b[1m\x1b[32m10.128.1.89:8265 \x1b[39m\x1b[22m\n2024-10-03 21:30:25,638\tINFO context.py:344 -- Disabling operator-level progress bars by default in Ray Jobs. To enable progress bars for all operators, set `ray.data.DataContext.get_current().enable_operator_progress_bars = True`.\n2024-10-03 21:30:25,702\tINFO dataset.py:2416 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.\n2024-10-03 21:30:25,705\tINFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-03_07-57-45_8

In [14]:
# Get the job's status
client.get_job_status(submission_id)

<JobStatus.SUCCEEDED: 'SUCCEEDED'>

In [15]:
# Get job related info
client.get_job_info(submission_id)

JobDetails(type=<JobType.SUBMISSION: 'SUBMISSION'>, job_id='04000000', submission_id='raysubmit_X1HxPUk95azHDjwh', driver_info=DriverInfo(id='04000000', node_ip_address='10.128.1.89', pid='108351'), status=<JobStatus.SUCCEEDED: 'SUCCEEDED'>, entrypoint='python read-map.py', message='Job finished successfully.', error_type=None, start_time=1727991023918, end_time=1727991027681, metadata={}, runtime_env={'working_dir': 'gcs://_ray_pkg_648c077d37ca2b5e.zip'}, driver_agent_http_address='http://10.128.1.89:52365', driver_node_id='9598191802064e4d89cee3173edf94df592124a27c1c5a58aa1d77ab', driver_exit_code=0)

In [16]:
# List all existing jobs
client.list_jobs()

[JobDetails(type=<JobType.SUBMISSION: 'SUBMISSION'>, job_id='04000000', submission_id='raysubmit_X1HxPUk95azHDjwh', driver_info=DriverInfo(id='04000000', node_ip_address='10.128.1.89', pid='108351'), status=<JobStatus.SUCCEEDED: 'SUCCEEDED'>, entrypoint='python read-map.py', message='Job finished successfully.', error_type=None, start_time=1727991023918, end_time=1727991027681, metadata={}, runtime_env={'working_dir': 'gcs://_ray_pkg_648c077d37ca2b5e.zip'}, driver_agent_http_address='http://10.128.1.89:52365', driver_node_id='9598191802064e4d89cee3173edf94df592124a27c1c5a58aa1d77ab', driver_exit_code=0),
 JobDetails(type=<JobType.SUBMISSION: 'SUBMISSION'>, job_id='02000000', submission_id='raysubmit_6SQHFKaTgrq9pLU2', driver_info=DriverInfo(id='02000000', node_ip_address='10.128.1.89', pid='7615'), status=<JobStatus.SUCCEEDED: 'SUCCEEDED'>, entrypoint='python read-map.py', message='Job finished successfully.', error_type=None, start_time=1727945413278, end_time=1727945418754, metadata=

In [None]:
# Iterate through the logs of a job 
async for lines in client.tail_job_logs(submission_id):
    print(lines, end="")

In [None]:
# Delete a job
# Can run client.cancel_job(submission_id) first if job is still running
client.delete_job(submission_id)

In [None]:
cluster.down()