# Distributed PyTorch Training on OpenShift using KubeRay Operator and Intel® Extension for PyTorch*

This notebook demonstrates utilizing the Intel Extension for PyTorch to optimize distributed workloads  on Intel hardware with RedHat OpenShift AI and KubeRay operator. For this demo we finetune a Large Language Model from HuggingFace tranformers on 2 or more nodes. The notebook uses codeflare SDK to create a Ray cluster and launch a distributed training job on it.

## Install CodeFlare SDK

In [None]:
! pip install codeflare-sdk==0.14.1

## Importing necessary codeflare SDK modules

In [None]:
from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration
from codeflare_sdk.cluster.auth import TokenAuthentication
from codeflare_sdk.job.ray_jobs import RayJobClient

## Authenticating login to the OCP cluster


**NOTE: Please fill in the value of variable auth_token below.**

To find out the token please use the RedHat OpenShift Container Platform's online console.

In [None]:
#Variables for user to be set.

auth_token = "XXXX"
api_server = "XXXX"
registry = "XXXX"

In [None]:
auth = TokenAuthentication(
    token=auth_token,
    server=api_server,
    skip_tls=True)

In [None]:
auth.login()

## Launch a Ray cluster using Codeflare SDK.

In [None]:
cluster = Cluster(ClusterConfiguration(
    name='ray-ipex-demo',
    namespace='ray-ipex',
    num_workers=2,
    head_memory=20,
    head_cpus=32,
    min_cpus=32,
    max_cpus=32,
    min_memory=20,
    max_memory=20,
    num_gpus=0,
    image="{0}/ray-ipex/ray-ipex:latest".format(registry),
    instascale=False,
    openshift_oauth=True
))

In [None]:
cluster.up()

In [None]:
#This call waits for cluster to be ready before going to the next instruction
cluster.wait_ready()

## List the details of the created Ray cluster and the dashboard access link.

In [None]:
cluster.details()

## Launch the distributed job

In [None]:
# Gather the dashboard URL
ray_dashboard = cluster.cluster_dashboard_uri()

# Create the header for passing your bearer token
header = {
    'Authorization': f'Bearer {auth_token}'
}

# Initialize the RayJobClient
client = RayJobClient(address=ray_dashboard, headers=header, verify=False)

In [None]:
# Submit the LLM finetuning job using the RayJobClient
submission_id = client.submit_job(
    entrypoint="python LLM.py",
    runtime_env={"working_dir": "./","pip": "requirementsLLM.txt",
                "env_vars": {'CCL_WORKER_COUNT': '1'}},
)
print("The Job's submission ID is: {} which can be used to stop or delete the job.".format(submission_id))

## Print the logs from the running job

In [None]:
async for lines in client.tail_job_logs(submission_id):
    print(lines, end="") 

#### NOTE: IF YOU WANT TO STOP OR DELETE THE JOB PLEASE UNCOMMENT THE CODE.

In [None]:
#client.stop_job(submission_id)
#client.delete_job(submission_id)

## Stopping the cluster once all jobs are finished.

In [None]:
cluster.down()