## Setup

Add your model JAR file to the following location
```
domino_project_name=os.environ['DOMINO_PROJECT_NAME']
root_folder = f'/domino/datasets/local/{domino_project_name}/jar/model.jar'
```

The input files will be placed in the folder 
```
/mnt/input/
          /high_r0.yaml
          /large_experiment_1.yaml
          /low_r0.yaml
```

When the worker process is called `train_function` the yaml is sent to it as a parameter (`dict`) along with the path to the executable jar.
The worker writes the output locally to its `/tmp/` folder and writes the output as artifacts to the experiment run. Except for the executable jar
no other data location is shared between the workspace and the ray workers. Method to distribute the JAR to ray workers also exist. We chose this for simplicity


## Best Practices for Ray Worker Configuration

Say you want to process a 1000 files (1000 trials). Starting 1000 worker Ray cluster on a small HW Tier is not the best strategy for the following reason - You might need a 1000 nodes. K8s clusters are complex to manage operationally for such a large number of nodes. Typically 100-200 is a reasonable number. More than that, needs finer tuning of your K8s cluster for stability


Instead use a larger HW Tier say 64 cores. And choose 16 workers each using up all of the HW Tier (Ask our PS team how). Inside each worker process 64 files in 64 threads (processes) and return 64 results from each worker. You will achieve a high degree of parallelism (1024) with just 16 nodes added to your Ray cluster.

If you want to use GPU a similar strategy applies. NVIDIA now supports the MIG architecture which allows for GPU sharing. You could divide a single GPU into 56 logical GPU's. This allows for parallelism of upto 56. Ask our PS Team for more details


In [None]:
import subprocess
import os
import uuid
import pandas
import yaml
import json
from mlflow.utils.mlflow_tags import MLFLOW_PARENT_RUN_ID
import ray
import yaml

domino_project_name=os.environ['DOMINO_PROJECT_NAME']
root_folder = f'/domino/datasets/local/{domino_project_name}/'


def write_yaml_file(base_path,f_name,obj):
    input_file = f'{base_path}/{f_name}'
    with open(f'{base_path}/{f_name}', 'w') as f:        
        yaml.dump(obj, f, allow_unicode=True, default_flow_style=False)
    return input_file

from pathlib import Path
@ray.remote
def train_function(iteration_no,exp_id,parent_run_id,input_yaml,executable):    
    root, extension  = os.path.splitext(os.path.basename(input_yaml[0]))
    with mlflow.start_run(experiment_id=exp_id,run_name=f"iteration{iteration_no}-{root}",tags={
            MLFLOW_PARENT_RUN_ID: parent_run_id
        },nested=True) as run:
        input_folder = f'/tmp/input/'
        
        Path(input_folder).mkdir(parents=True, exist_ok=True)
        
        
        yaml_f_name = input_yaml[0]
        yaml_o = input_yaml[1]
        input_file = write_yaml_file(input_folder,yaml_f_name,yaml_o)
      
        run_params = yaml_o['baseScenario']        
        
        mlflow.log_params(run_params)
        
        o = str(uuid.uuid4())
        output_folder = f'/tmp/output/{o}'
        os.makedirs(output_folder)
    
        out = subprocess.run(["java", "-jar", executable, "-o", output_folder, "-t=1", input_file ]) 
        
        if out.returncode==0:        
            experiment_report=pandas.read_csv(f'{output_folder}/experiment_report.tsv',sep='\t').to_json()                        
            person_property_report=pandas.read_csv(f'{output_folder}/person_property_report.tsv',sep='\t').to_json()                        
            
            mlflow.log_artifact(f'{output_folder}/experiment_report.tsv',"results")                        
            mlflow.log_artifact(f'{output_folder}/person_property_report.tsv',"results")
            
            data=pandas.read_csv(f'{output_folder}/person_property_report.tsv',sep='\t')


            results = {}
            days=[]
            for index, row in data.iterrows():
                scenario = row['scenario']
                day = row['day']
                region = row['region']
                infection_status = row['value']
                person_count = row['person_count']
                if day not in results:
                    results[day]={}
                    days.append(day)
                results[day][f"Scenario-{scenario}-Infection_Status-{infection_status}"]=person_count
            for day in days:
                 mlflow.log_metrics(results[day],step=day)
        print('Done logging metrics')
        return {"success": (out.returncode==0), "i": input_file, "e":experiment_report, "p":person_property_report}


## Create a MLFLOW Experiment

You can call it anything you like. I simply created a name using the following :

```
        user_name = os.environ['DOMINO_USER_NAME']
        exp_name = f'DEMO-{user_name}'
```
    

In [None]:
import pandas as pd
import os
import mlflow

user_name = os.environ['DOMINO_USER_NAME']
project_id = os.environ['DOMINO_PROJECT_ID'] 
project_name = os.environ['DOMINO_PROJECT_NAME'] 
exp_name = f'demov2-{project_name}-{user_name}'
exp = mlflow.get_experiment_by_name(exp_name)    
if not exp:
    print('Experiment Not Found Create it')
    mlflow.create_experiment(exp_name)    
    exp = mlflow.get_experiment_by_name(exp_name) 
exp

In [None]:
### Initialize the Ray Cluster

In [None]:
import os
import time
import ray
import mlflow
import boto3
if not ray.is_initialized():
    service_host = os.environ["RAY_HEAD_SERVICE_HOST"]
    service_port = os.environ["RAY_HEAD_SERVICE_PORT"]
    address=f"ray://{service_host}:{service_port}"
    temp_dir='/mnt/data//{}/'.format(os.environ['DOMINO_PROJECT_NAME']) #set to a dataset
    ray.init(address=address)

### Add your input files to the folder `/mnt/input` and create a list of these file. 

In [None]:
import yaml
import json
from os import listdir
from os.path import isfile, join


def get_input_files():
    input_folder = f"/mnt/input"
    input_files=[]
    for file in listdir(input_folder):
        if isfile(join(input_folder, file)):
            with open(f'{input_folder}/{file}', 'r') as i:
                data = yaml.safe_load(i)
                input_files.append((file,data))

    return input_files
            

In [None]:
import yaml
import json
from os import listdir
from os.path import isfile, join
d = get_input_files()
def update_input_parameters(input_files):
    new_files = []
    for i in input_files:    
        new_i = []
        
        x = i[1]            
        l = x['experimentDimensions']
        ll = []
        for l in x['experimentDimensions']:
            if (l['name']=='r0'):
                updated_val = l['levels'][0]['parameters']['r0'] + 0.1      
                l['levels'][0]['parameters']['r0'] = updated_val
                l['levels'][0]['label'] = str(updated_val)
            ll.append(l)
        x['experimentDimensions'] = ll    
        
        new_i.append(x)
        new_files.append((i[0],x))
    return new_files
#d = update_input_parameters(d)
#print(d)

In [None]:
#Start Experiment
import mlflow

from datetime import datetime
now = datetime.now() # current date and time
now_str = now.strftime("%m-%d-%Y-%H-%M-%S")

#End any previous run if any
mlflow.end_run()

data_set_folder='/domino/datasets/local/{}'.format(os.environ['DOMINO_PROJECT_NAME']) #set to a dataset
model_executable=f'{data_set_folder}/jar/model.jar'

input_files = get_input_files()

with mlflow.start_run(experiment_id=exp.experiment_id,run_name=f"root-{now_str}") as parent_run:
    parent_run_id=parent_run.info.run_id    
    mlflow.log_param(f"model_executable", model_executable)
    parent_run_id = parent_run.info.run_id
    iteration_max=2
    iteration_no=0
    while iteration_no < iteration_max:        
        print(f'\n\nStarting iteration {iteration_no}')
        for index, f in enumerate(input_files, start=0):       
            mlflow.log_param(f"iteration_{iteration_no}_input_{index}", f[0])
        results = ray.get([train_function.remote(iteration_no,exp.experiment_id,parent_run_id,input_yaml,model_executable) for input_yaml in input_files])
        for r in results:            
            if r["success"]:
                print(f'Trial successful for input {r["i"]}')
                print("You can evaluate here, change input files and rerun. We always assume success for the demo")
                
                print("We update the input parameters here")
                input_files = update_input_parameters(input_files)
            else:
                print(f'Trial unsuccessful for input {r["i"]} may want to retry')
                print("You can retry failed trials here, change input files and rerun")                
                #For now we break
                print("For the demo we break")                
                iteration_no = iteration_max 
        
        iteration_no = iteration_no + 1
