# Prepare and launch a DPS batch of jobs for a particular algorithm

**Goal**
Provide a template for DPS job submission which will be changed/adapted according to specific algorithms being run in DPS.

**Motivation**  
It's easier to learn how to run many jobs of your script (where for each job there is some input that changes) if you can first see an example.

Paul Montesano, PhD  
paul.m.montesano@nasa.gov  
June 2024

In [126]:
from maap.maap import MAAP
maap = MAAP()

In [127]:
maap._MAAP_HOST

'api.maap-project.org'

In [None]:
import os
import pandas as pd
import glob
import datetime
import sys

### Use MAAP Registration call in notebook chunk to register DPS algorithm
 - You need to register the DPS algorithm before first before you loop over jobs that will use it.
 - If you register your algorithm using the Register Algorithm UI in Jupyter, a configuration file (in yml format) will be placed in your workspace home folder, which can then be used as a template for reuse

In [None]:
maap.register_algorithm_from_yaml_file("/projects/.../.../<my_algorithms_yaml_file>.yml").text

### Build a dictionary of the argument names and values needed to run the algorithm in the way you want

This can be called a `parameters dictionary`  

 - These will be arguments that the `.sh` wrapper (which calls your `.py` or `.R` code) is hard-coded to accept.  
 - The `.yml` file that you use to Register your algorithm is what connects this `parameters dictionary` to your `.sh` wrapper.  
 - This combo of the `parameters dictionary`, the `.yml`, and the `.sh` provides a specific (and repeatable) way of running your `.py` or `.R` code.

#### Note: make sure the `in_params_dict` coincides with the args of your underlying Python/R code

In [3]:
in_params_dict = {
            'arg name_1': 'some_value',
            'arg_name_2': 'another_value',
            'in_tile_num': 1
        }

### Set up a list of items you want to run across - this is an example of some algorithm input that will vary according to job

In this example, we are using geographic `tiles` to break up our processing. These tiles are defined by vector polygons and have ids that our `.sh`, `.py`, and `.yml` files are set up to take in as arguments. We use these ids as the basis for a loop that will sequentially submit our jobs to DPS. 

There are many ways one could decide to split up their DPS jobs - so this use of tiles here is just for the purposes of this example.

In [15]:
# Just an example of a list of some input parameter to your script that needs to vary for each job, thus creating multiple jobs
DPS_INPUT_TILE_NUM_LIST = [1,3,5,7,13,17,19]

### Set up the general submission variables that will be applied to all runs of this DPS batch

These will also determine the look of path of the DPS output (`/projects/my-private-bucket/dps_output`):  
`/projects/my-private-bucket/dps_output/<ALGO_ID>/<ALGO_VERSION>/<IDENTIFIER>`

In [7]:
# MAAP algorithm version name
IDENTIFIER='BIOMASS_2020'
ALGO_VERSION = 'my_biomass_algorithm_v2024_1'
ALGO_ID = "run_my_biomass_algorithm"
USER = 'montesano'
WORKER_TYPE = 'maap-dps-worker-8gb'

In [8]:
RUN_NAME = IDENTIFIER
RUN_NAME

'BIOMASS_2020'

In [13]:
DPS_INPUT_TILE_NUM_LIST[0:2]

[1, 3]

### Set up a dir to hold the metadata output table from the DPS submission

In [None]:
DPS_SUBMISSION_RESULTS_DIR = '/projects/my-public-bucket/dps_submission_results'
!mkdir -p $DPS_SUBMISSION_RESULTS_DIR

## Run a DPS job across the list

The submission is done as a loop.  

Since submission is fast, this doesn't need to be parallelized. The jobs will start soon after submission and will be processed in parallel depending on administrator settings.

In [None]:
%%time

import json

submit_results_df_list = []
len_input_list = len(DPS_INPUT_TILE_NUM_LIST)
print(f"# of input tiles for DPS: {len_input_list}")

for i, INPUT_TILE_NUM in enumerate(DPS_INPUT_TILE_NUM_LIST):
    
    # Just a way to keep track of the job number associated with this submission's loop
    DPS_num = i+1
    
    # Update the in_params_dict with the current INPUT_TILE_NUM from this loop
    in_params_dict['in_tile_num'] = INPUT_TILE_NUM
    
    submit_result = maap.submitJob(
            identifier=IDENTIFIER,
            algo_id=ALGO_ID,
            version=ALGO_VERSION,
            username=USER, # username needs to be the same as whoever created the workspace
            queue=WORKER_TYPE,
            **in_params_dict
        )
    
    # Build a dataframe of submission details - this holds metadata about your DPS job
    submit_result_df = pd.DataFrame( 
        {
                'dps_num':[DPS_num],
                'tile_num':[INPUT_TILE_NUM],
                'submit_time':[datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%s')],
                'dbs_job_hour': [datetime.datetime.now().hour],
                'algo_id': [ALGO_ID],
                'user': [USER],
                'worker_type': [WORKER_TYPE],
                'job_id': [submit_result.id],
                'submit_status': [submit_result.status],
            
        } 
    )
    
    # Append to a list of data frames of DPS submission results
    submit_results_df_list.append(submit_result_df)
    
    if DPS_num in [1, 5, 10, 50, 100, 250, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 7000, 9000, 11000, 13000, 15000, 17000, 19000, 21000, 24000, len_input_list]:
        print(f"DPS run #: {DPS_num}\t| tile num: {INPUT_TILE_NUM}\t| submit status: {submit_result.status}\t| job id: {submit_result.id}") 
        
# Build a final submission results data frame and save
submit_results_df = pd.concat(submit_results_df_list)
submit_results_df['run_name'] = RUN_NAME
nowtime = pd.Timestamp.now().strftime('%Y%m%d%H%M')
print(f"Current time:\t{nowtime}")

# This creates a CSV of the metadata associated with the DPS jobs you have just submitted
submit_results_df.to_csv(f'{DPS_SUBMISSION_RESULTS_DIR}/DPS_{ALGO_ID}_{RUN_NAME}_submission_results_{len_input_list}_{nowtime}.csv')
submit_results_df.info()