## SAMPLE - Generate AWS Session Tokens from a AWS Profile

In [None]:
import os
import boto3
import boto3.session
AWS_ACCOUNT_NO=os.environ['AWS_ACCOUNT_NO']
AWS_ROLE_NAME=os.environ['AWS_ROLE_NAME']
AWS_ROLE_ARN=f'arn:aws:iam::{AWS_ACCOUNT_NO}:role/{AWS_ROLE_NAME}'
os.environ['AWS_ROLE_ARN'] = AWS_ROLE_ARN
session = boto3.session.Session()
sts_client = session.client('sts')
sts_client.get_caller_identity()

## SAMPLE - Ray Worker method

1. You pass the AWS Config File Contents directly to the worker
2. The worker writes the contents to the path AWS_CONFIG_FILE
3. The worker generates AWS Credentials based on AWS PROFILE
4. The worker creates a 100 MB file
5. The worker writes the 100 MB file to a custom bucket which is accessible by the chosen AWS Profile

Note - The experiment is created in the workspace. It is passed directly to the worker as a parameter

In [None]:
import os
import time
import ray
import mlflow
import boto3
import multiprocessing
import uuid
import math
import numpy as np
bucket = os.environ['MLFLOW_ARTIFACTS_BUCKET_NAME']
def generate_cpu_load(interval=1*10, utilization=100):
    "Generate a CPU utilization % for a duration of interval seconds"
    start_time = time.time()
    for i in range(0,int(interval)):
        while time.time()-start_time < utilization/100.0:
            a = math.sqrt(64*64*64*64*64)
        time.sleep(1-utilization/100.0)
        start_time += 1
        
def get_aws_credentials(aws_role_arn):
    os.environ['AWS_ROLE_ARN']=aws_role_arn
    session = boto3.Session()
    region_name=session.region_name
    aws_access_key_id=session.get_credentials().access_key
    aws_secret_access_key=session.get_credentials().secret_key
    aws_session_token= session.get_credentials().token
    return aws_access_key_id,aws_secret_access_key,aws_session_token

def create_file():
    filename = f"/tmp/f1.txt"
    with open(filename, "wb") as f:
        f.seek((1024 * 1024 * (i+1)*100) - 1)
        f.write(b"\0")
    return filename

@ray.remote
def mlflow_write_remote(exp_name,idx,project_id,aws_role_arn,direct_s3_write=False):
    key,secret,session = get_aws_credentials(aws_role_arn)
    #MLFLOW Client needs these env variables set
    os.environ['AWS_ACCESS_KEY_ID'] = key
    os.environ['AWS_SECRET_ACCESS_KEY'] = secret
    os.environ['AWS_SESSION_TOKEN'] = session
    # Spin CPU for a bit
    num_cpus = multiprocessing.cpu_count()
    print(f"Spinning CPUs (num_cpus={num_cpus})")
    processes = []
    for _ in range (num_cpus):
        p = multiprocessing.Process(target=generate_cpu_load)
        p.start()
        processes.append(p)
    for process in processes:
        process.join()
    uuid_str = str(uuid.uuid4())
    filename = create_file()
    st = time.time()
    exp = mlflow.get_experiment_by_name(exp_name)    
    EXPERIMENT_ID = exp.experiment_id    
    artifact_location = exp.artifact_location
    # Generate a large (~35M) dataframe and upload it to S3
    print(f"Uploading stuff to S3")
    # TODO: REPLACE WITH CORRESPONDING S3 BUCKET URL
    #s3_project_prefix = "s3://<bucket_name>/mlflow/project-test/secure"
    s3_project_prefix = artifact_location
    uuid_str = str(uuid.uuid4())
    
    df = pd.DataFrame(np.random.randint(0, 100, size=(1000000, 10)), columns=list('ABCDEFGHIJ'))
    df.to_csv(f"/tmp/{idx}.csv")
    s3_client = boto3.client('s3')  
    uuid_str = str(uuid.uuid4())
    
    retry = 0
    st = time.time()
    
    with mlflow.start_run(experiment_id=EXPERIMENT_ID) as run:
        retry = 0
        run_id=run.info.run_id
        while(retry < 5):
            try:
                print(direct_s3_write)
                if (direct_s3_write):
                    object_name = f'mlflow/{project_id}/{run_id}/artifacts/large_files-direct/{uuid_str}.txt'
                    print(object_name)
                    response = s3_client.upload_file(f"/tmp/{idx}.csv", bucket, object_name)
                    
                else:
                    mlflow.log_artifact(f"/tmp/{idx}.csv", artifact_path='large_files')
                break
            except:
                retry = retry + 1
        
    end = time.time()
    duration = str(end-st)
    return duration

## SAMPLE - Initialize the Ray Connection

In [None]:
import os
import time
import ray
import mlflow
import boto3
if not ray.is_initialized():
    service_host = os.environ["RAY_HEAD_SERVICE_HOST"]
    service_port = os.environ["RAY_HEAD_SERVICE_PORT"]
    address=f"ray://{service_host}:{service_port}"
    temp_dir='/mnt/data//{}/'.format(os.environ['DOMINO_PROJECT_NAME']) #set to a dataset
    ray.init(address=address)

## SAMPLE - Configure number of workers and start job

In [None]:
no_of_workers=1
cols = []
for i in range(no_of_workers):
    cols.append(str(i+1))
cols.append('Total Run Duration')

In [None]:
import pandas as pd
user_name = os.environ['DOMINO_USER_NAME']
project_id = os.environ['DOMINO_PROJECT_ID'] 
exp_name = f'Exp-{user_name}'
exp = None
try:
    exp = mlflow.get_experiment_by_name(exp_name)    
except:
    print('Experiment Not Found Create it')
    mlflow.create_experiment(exp_name)    
    exp = mlflow.get_experiment_by_name(exp_name)    
print(exp)


In [None]:

start_time = time.time()
results=[]
direct_s3_write = True
results_s4_direct = ray.get([mlflow_write_remote.remote(exp_name,worker_id,project_id,AWS_ROLE_ARN,direct_s3_write) for worker_id in range(no_of_workers)])
duration = time.time() - start_time
results_s4_direct.append(str(duration))
s4_pd_direct = pd.DataFrame.from_dict({'worker_index': cols, 'durations': results_s4_direct})


start_time = time.time()
results=[]
direct_s3_write = False
results_s4_mlflow = ray.get([mlflow_write_remote.remote(exp_name,worker_id,project_id,AWS_ROLE_ARN,direct_s3_write) for worker_id in range(no_of_workers)])
duration = time.time() - start_time
results_s4_mlflow.append(str(duration))
s4_pd_mlflow = pd.DataFrame.from_dict({'worker_index': cols, 'durations': results_s4_mlflow})




In [None]:
print('Using Boto3 - Explicit boto3 based writes to S3 from Ray Workers')
#display(s4_pd_direct)
print(s4_pd_direct.to_markdown())
print('\n\n\n------------------------------------------------\n\n\n')
print('Using MLFLOW client - MLFLOW client uses boto3 to write to S3 from Ray Workers')
#display(s4_pd_mlflow)
print(s4_pd_mlflow.to_markdown())

## Scaling Ray Workers

In [None]:
!curl http://rayclusterscaler-svc.domino-field/healthz

In [None]:
import requests
import os
access_token_endpoint='http://localhost:8899/access-token'
resp = requests.get(access_token_endpoint)


token = resp.text
headers = {
             "Content-Type": "application/json",
             "Authorization": "Bearer " + token,
        }
#Example
endpoint='http://rayclusterscaler-svc.domino-field/rayclusterscaler/list'
resp = requests.get(endpoint,headers=headers)

In [None]:
#Example
replicas=850
endpoint='http://rayclusterscaler-svc.domino-field/rayclusterscaler/scale'
run_id = os.environ['DOMINO_RUN_ID']
ray_cluster_id = f'ray-{run_id}'
body = {
        "cluster_name":ray_cluster_id,
        "replicas" : replicas
    }
resp = requests.post(endpoint,headers=headers,json=body)
