# Test submitting jobs to a running Ray cluster

## Setup

In [None]:
!pip install ray

## Upload Data to S3

In [None]:
import os
import boto3
import botocore

aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
endpoint_url = os.environ.get('AWS_S3_ENDPOINT')
region_name = os.environ.get('AWS_DEFAULT_REGION')
bucket_name = os.environ.get('AWS_S3_BUCKET')

session = boto3.session.Session(aws_access_key_id=aws_access_key_id,
                                aws_secret_access_key=aws_secret_access_key)

s3_resource = session.resource(
    's3',
    config=botocore.client.Config(signature_version='s3v4'),
    endpoint_url=endpoint_url,
    region_name=region_name)

bucket = s3_resource.Bucket(bucket_name)


def upload_directory_to_s3(local_directory, s3_prefix):
    for root, dirs, files in os.walk(local_directory):
        for filename in files:
            file_path = os.path.join(root, filename)
            relative_path = os.path.relpath(file_path, local_directory)
            s3_key = os.path.join(s3_prefix, relative_path)
            print(f"{file_path} -> {s3_key}")
            bucket.upload_file(file_path, s3_key)

def list_objects(prefix):
    filter = bucket.objects.filter(Prefix=prefix)
    for obj in filter.all():
        print(obj.key)
        
upload_directory_to_s3("./data", "data")
list_objects("data")

### Use Kube service corresponding to the Ray cluster

In [None]:
RAY_DASHBOARD = "http://raycluster-codeflare-head-svc:8265"
SCRIPT = "fraud_detection_train.py"

## Testing TensorFlow Jobs

In [None]:
from ray.job_submission import JobSubmissionClient, JobStatus
import time
import os

aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
endpoint_url = os.environ.get('AWS_S3_ENDPOINT')
region_name = os.environ.get('AWS_DEFAULT_REGION')
bucket_name = os.environ.get('AWS_S3_BUCKET')

pip = [
    "boto3", 
    "botocore", 
    "tensorflow==2.13.1", 
    "pandas", 
    "scikit-learn",
    
]

runtime_env={
    "working_dir": "scripts/",
    "excludes": ["data/"],
    "pip": pip,
    "env_vars": {
        "AWS_ACCESS_KEY_ID": os.environ.get('AWS_ACCESS_KEY_ID'),
        "AWS_SECRET_ACCESS_KEY": os.environ.get('AWS_SECRET_ACCESS_KEY'),
        "AWS_S3_ENDPOINT": os.environ.get('AWS_S3_ENDPOINT'),
        "AWS_DEFAULT_REGION": os.environ.get('AWS_DEFAULT_REGION'),
        "AWS_S3_BUCKET": os.environ.get('AWS_S3_BUCKET')
    },
}

# If using a remote cluster, replace 127.0.0.1 with the head node's IP address.
client = JobSubmissionClient(RAY_DASHBOARD)
job_id = client.submit_job(
    # Entrypoint shell command to execute
    entrypoint=f"python {SCRIPT}",
    # Path to the local directory that contains the script.py file
    runtime_env=runtime_env
)
print(job_id)

def wait_until_status(job_id, status_to_wait_for, timeout_seconds=5):
    start = time.time()
    while time.time() - start <= timeout_seconds:
        status = client.get_job_status(job_id)
        print(f"status: {status}")
        if status in status_to_wait_for:
            break
        time.sleep(1)


wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})

In [None]:
logs = client.get_job_logs(job_id)
print(logs)