# task-utils for computing CSG metrics from Climate Impacts Lab downscaled data


In [1]:
import pandas as pd
import yaml
import s3fs
import numpy as np
from jupiter.task_utils import TaskSet, TaskInventory, TaskInstructions, TaskLauncher
from jupiter.aws.s3 import upload_s3_file
!pip install pyarrow

ModuleNotFoundError: No module named 'jupiter.task_utils'

In [None]:
# Set up some parameters

params = {
    # Path to inventory file
    'inventory_path' : 's3://jupiter-reference-data/CIL-Downscaled-CMIP/cmip6/cil_downscaled_cmip6_inventory.pq',
}


batch_queue = 'csg-global-dev-multi' #special queue for me
taskset_name = 'test-runs-milwaukee-domains' # this is called aarona-noah-mp-hue



## Find expected output files
Load the inventory file to figure out how many output files we need to loop over/should expect.

In [None]:
invdf = pd.read_parquet(params['inventory_path'])
invdf.head()

In [None]:
# Build a giant list of every GCM, experiment_id and year combination
all_gcms = []
all_scenarios = []
all_years = []
all_outpaths = []

for dex, rowdat in invdf.iterrows():
    if rowdat['experiment_id'] == 'historical':
        years = list(np.arange(1950,2015,5))
    else:
        years = list(np.arange(2015,2101,5))
    nyears = len(years)
    all_gcms += [rowdat['source_id']]*nyears
    all_scenarios += [rowdat['experiment_id']]*nyears
    all_years += years
    
    outpaths = [f's3://jupiter-reference-data/CIL-Downscaled-CMIP/cmip6/{rowdat["experiment_id"]}/{rowdat["source_id"]}/{rowdat["source_id"]}_{rowdat["experiment_id"]}_{year}_csg_metrics.nc' for year in years]
    all_outpaths += outpaths
    
params['job_indices'] = [str(x) for x in list(np.arange(len(all_gcms)))]
params['gcms'] = all_gcms
params['scenarios'] = all_scenarios
params['years'] = [str(x) for x in all_years]
params['outpaths'] = all_outpaths
print(len(all_gcms))
print(all_outpaths[:10])
    

## Formatting task-utils YAML

This dictionary should include the key components to the task-utils yaml file.  The lists of download parameters from the dataframe above are programmatically inserted.

In [None]:
cmd_str = 'python cil_downscaled_projections/run.py --gcm ${gcm} --scenario ${scenario} --year ${year} --outpath ${outpath}'
print(cmd_str)



In [None]:
output_dict = {
    'name' : taskset_name,
    'labels' : {
        'env' : 'dev',
        'project' : 'csg',
        'S3_ROOT' : 's3://jupiter-intern-projects/aarona', #going to point to my s3 space
    },
    'definitions' : params,
    'launch_settings' : {
        'batch' : {
            'job_def': f'aarona-noah-mp-hue:1',
            'queue' : batch_queue,
            #'overrides' : {
            #    'vcpus' : 4,
            #    'memory' : 16000,
            #},
            'overrides' : {
                "resourceRequirements": [
                    {
                        "type": "MEMORY",
                        "value" : "16000" #MB
                    },
                    {
                        "type": "VCPU",
                        "value" : "4"
                    },
                ]
            }
        },
        'run_keys' : {
            'command_string' : cmd_str
        }
    },
    'indicators' : {
        'completed' : {
            'components' : ['${outpath}'],
            'method' : 's3_sensor'
        },
    },
    'loops': {
        'job_index' : 'job_indices',
    },
    'mapped_loops' : {
        'outpath' : {'job_index' : 'outpaths'},
        'gcm' : {'job_index' : 'gcms'},
        'scenario' : {'job_index' : 'scenarios'},
        'year' : {'job_index' : 'years'}
    }
        
    }
    


In [None]:
# Write this to a yaml file
with open(f'{taskset_name}.yaml', 'w') as outfile:
    docs = yaml.dump(output_dict, outfile)

## Run with task-utils

This follows the normal task-utils sequence.  Start by making an instruction list (which should also help confirm our YAML was formatted correctly).

In [None]:
DRY_RUN = False# if True, no batch runs will actually be launched


ts = TaskSet(f'{taskset_name}.yaml')
instr = TaskInstructions(ts)

In [None]:
instr.df.shape

In [None]:
instr.df.head()

## Build the test set

In [None]:
# use this for the basic testing mode; just do the first X runs
test_mode = 'basic'
number_of_tests = 5

# use this if you want your tests to comprise of one run for each value of a specified loop (or multiple loops)
#test_mode = "one_per_loop"
#loop_names = ["peril"]

# use this if you want to specify a custom query to define your test set
#test_mode = "query"
#test_query = f"(tileid in {testset}) and (projection_scenario == 'ssp585') and (metric == 'windSpeed500yr')"#" and scenario == 'worst_one'"


###################  ↑↑   OPTIONS   ↑↑  ###################
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
###################  ↓↓ DON'T TOUCH ↓↓  ###################
load_existing_inventory_for_tests = True

task_inv = TaskInventory(ts, load_existing = load_existing_inventory_for_tests)
if test_mode == 'basic': task_inv.assign_basic_test_set(number_in_set = number_of_tests)
elif test_mode == 'one_per_loop': task_inv.assign_test_set_by_loop(loops=loop_names, combinations=True)
elif test_mode == 'query': task_inv.assign_test_set_by_query(query_str=test_query)
else: raise ValueError(f"test_mode {test_mode} not valid")
task_inv.save()

task_instructions_test = TaskInstructions(ts)
task_instructions_test.filter_tests_only()
task_instructions_test.preview()
print(f'Notebook DRY_RUN value is set to {DRY_RUN}')

## Launch test jobs here

Please use job arrays to make this easier!

In [None]:
use_job_arrays = True
job_array_split_criteria = None #'peril'

## Advanced options, please do not use rashly!
num_attempts = None
timeout = None

# See step 4 for details
jobs_per_execution = 1
group_criteria = None


###################  ↑↑   OPTIONS   ↑↑  ###################
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
###################  ↓↓ DON'T TOUCH ↓↓  ###################

instructions_file_test = task_instructions_test.write_instructions()
tl_test = TaskLauncher(instructions_file_test)
if use_job_arrays: tl_test.launch_via_aws_job_array(dry_run = DRY_RUN, split_on = job_array_split_criteria, jobs_per_execution = jobs_per_execution, group_on = group_criteria, num_attempts = num_attempts, timeout = timeout)
else: tl_test.launch_via_aws_batch(dry_run = DRY_RUN, num_attempts = num_attempts, timeout = timeout)

## Check status of test runs

In [None]:
# No options here, just execute it!

###################  ↑↑   OPTIONS   ↑↑  ###################
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
###################  ↓↓ DON'T TOUCH ↓↓  ###################

load_existing_inventory = True
skip_s3_datacheck = {"completed":1}
also_update_batch_status = True
batch_skip_statuses = ["SUCCEEDED"]
num_processors = None

task_inv = TaskInventory(ts, load_existing = load_existing_inventory)
task_inv.update_status(skip_values = skip_s3_datacheck, also_update_batch_status = also_update_batch_status, nproc=num_processors, batch_skip_statuses = batch_skip_statuses, tests_only = True)
task_inv.print_summary(tests_only = True)
if also_update_batch_status: task_inv.print_batch_summary(tests_only = True)
task_inv.save()
#task_inv.apply_style(task_inv.test_df)

In [None]:
task_inv.apply_style(task_inv.test_df)

## Rerun failures of test runs

In [None]:
#filter_query = 'completed != 1 and LAST_KNOWN_JOB_STATUS not in ["SUBMITTED","PENDING","STARTING","RUNNABLE","RUNNING"] and IN_TEST_SET == True'
filter_query = 'LAST_KNOWN_JOB_STATUS == "FAILED" and IN_TEST_SET == True and completed != 1'

update_first = True
skip_s3_datacheck = {'completed': 1}
also_update_batch_status = True
batch_skip_statuses = ['SUCCEEDED']
num_processors = None

###################  ↑↑   OPTIONS   ↑↑  ###################
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
###################  ↓↓ DON'T TOUCH ↓↓  ###################

rerun_instructions = TaskInstructions(ts)
rerun_instructions.filter_on_inventory(query = filter_query, update_first=update_first, skip_values = skip_s3_datacheck, also_update_batch_status = also_update_batch_status, nproc=num_processors, batch_skip_statuses = batch_skip_statuses)
#rerun_instructions.filter_tests_only()
rerun_instructions.preview()
print(f'Notebook DRY_RUN value is set to {DRY_RUN}')

In [None]:
rerun_instructions_file_test = rerun_instructions.write_instructions()
tl_test = TaskLauncher(rerun_instructions_file_test)
use_job_arrays = False
if use_job_arrays: tl_test.launch_via_aws_job_array(dry_run = DRY_RUN, split_on = job_array_split_criteria, jobs_per_execution = jobs_per_execution, group_on = group_criteria, num_attempts = num_attempts, timeout = timeout)
else: tl_test.launch_via_aws_batch(dry_run = DRY_RUN, num_attempts = num_attempts, timeout = timeout)
use_job_arrays = True
# Go back up and re-check the status

# FOR RUNNING ALL TASKS
## Create instructions for all remaining runs

In [None]:
## Create instructions and verify preview
task_instructions = TaskInstructions(ts)
task_instructions.filter_tests_excluded()  # comment this line if you want to run EVERYTHING, even previous tests
#task_instructions.preview()
print(f'Notebook DRY_RUN value is set to {DRY_RUN}')

## Launch all runs
Please use job arrays!

In [None]:
use_job_arrays = True
job_array_split_criteria = None # example only, adjust for your use case

## Advanced options, please do not use rashly!
num_attempts = None
timeout = None

# Set one of these options to run multiple commands in a loop
# within a single container execution. Your container may need 
# special code to handle this properly!
jobs_per_execution = 5
group_criteria = None


###################  ↑↑   OPTIONS   ↑↑  ###################
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
###################  ↓↓ DON'T TOUCH ↓↓  ###################

instructions_file = task_instructions.write_instructions()
tl = TaskLauncher(instructions_file)

if use_job_arrays: tl.launch_via_aws_job_array(dry_run = DRY_RUN, split_on = job_array_split_criteria, jobs_per_execution = jobs_per_execution, group_on = group_criteria, num_attempts = num_attempts, timeout = timeout)
else: tl.launch_via_aws_batch(dry_run = DRY_RUN, num_attempts = num_attempts, timeout = timeout)

In [None]:
load_existing_inventory = True
skip_s3_datacheck = {'completed': 1}
also_update_batch_status = True
batch_skip_statuses = ['SUCCEEDED']
num_processors = None

###################  ↑↑   OPTIONS   ↑↑  ###################
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
###################  ↓↓ DON'T TOUCH ↓↓  ###################

task_inv = TaskInventory(ts, load_existing = load_existing_inventory)
task_inv.update_status(skip_values = skip_s3_datacheck, also_update_batch_status = also_update_batch_status, nproc=num_processors, batch_skip_statuses = batch_skip_statuses)
task_inv.print_summary()
if also_update_batch_status: task_inv.print_batch_summary()
task_inv.save()
#.styled_df

In [None]:
task_inv.apply_style(task_inv.df[task_inv.df['LAST_KNOWN_JOB_STATUS']=='FAILED'])

In [None]:
task_inv.df[task_inv.df['LAST_KNOWN_JOB_STATUS']=='FAILED']['tileid'].unique()

## Rerun failures

In [None]:
#filter_query = 'succeeded != 1 and LAST_KNOWN_JOB_STATUS not in ["SUBMITTED","PENDING","STARTING","RUNNABLE","RUNNING"]'
#filter_query = '(completed != 1)'
filter_query = 'LAST_KNOWN_JOB_STATUS in ["FAILED"]'

update_first = False
skip_s3_datacheck = {'completed': 1}
also_update_batch_status = False
batch_skip_statuses = ['SUCCEEDED']
num_processors = None

###################  ↑↑   OPTIONS   ↑↑  ###################
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
###################  ↓↓ DON'T TOUCH ↓↓  ###################

rerun_instructions = TaskInstructions(ts)
rerun_instructions.filter_on_inventory(query = filter_query, update_first=update_first, skip_values = skip_s3_datacheck, also_update_batch_status = also_update_batch_status, nproc=num_processors, batch_skip_statuses = batch_skip_statuses)
rerun_instructions.preview()
print(f'Notebook DRY_RUN value is set to {DRY_RUN}')

In [None]:
use_job_arrays = True
job_array_split_criteria = None # example only, adjust for your use case

## Advanced options, please do not use rashly!
num_attempts = None
timeout = None

# Set one of these options to run multiple commands in a loop
# within a single container execution. Your container may need 
# special code to handle this properly!
jobs_per_execution = 1
group_criteria = None

In [None]:
rerun_instructions_file = rerun_instructions.write_instructions()
tl_test = TaskLauncher(rerun_instructions_file)

if use_job_arrays: tl_test.launch_via_aws_job_array(dry_run = DRY_RUN, split_on = job_array_split_criteria, jobs_per_execution = jobs_per_execution, group_on = group_criteria, num_attempts = num_attempts, timeout = timeout)
else: tl_test.launch_via_aws_batch(dry_run = DRY_RUN, num_attempts = num_attempts, timeout = timeout)
    
# Go back up and re-check the status