### DPS Coordinator

This notebook interacts with the MAAP API. It submits and runs a single coordinating DPS dask job that manages jobs
(see `fireatlas.FireRunDaskCoordinator.py`).

The `poll_on_job_status` and `wait_for_job` allow us to block and get DPS job status for muliptle jobs before we continue on

In [None]:
#!pip install -e ..

In [None]:
import json
import os
from typing import Tuple
import concurrent
from concurrent.futures import ThreadPoolExecutor

from fireatlas.FireLog import logger


from maap.maap import MAAP
from maap.dps.dps_job import DPSJob
from maap.utils import algorithm_utils


class JobSubmissionException(Exception):
    pass


def get_algorithm_config_filepath(dir_names):
    current_file_dir = os.path.dirname(os.path.abspath(__name__))
    return [
        os.path.join(current_file_dir, f'{dir_name}', 'algorithm_config.yaml')
        for dir_name in dir_names
    ]


def validate_job_submission(submitted_jobs: Tuple[DPSJob]) -> Tuple[DPSJob]:
    """we don't retry job submissions, they should ideally always work

    validate status of job submission results and return result 'job_id'
    """
    failed_statuses = [result for result in submitted_jobs if result.status == 'failed']
    if any(failed_statuses):
        raise JobSubmissionException(f"[ SUBMISSION FAILED ]: the following jobs failed to submit {failed_statuses}")
    return submitted_jobs


def wait_for_job(dps_job: DPSJob) -> DPSJob:
    """this internal DPSJob function will block until job completes and use exponential backoff
    https://github.com/MAAP-Project/maap-py/blob/master/maap/dps/dps_job.py#L80C9-L80C28

    it seems the statuses.lower() are: ['failed', 'succeeded', 'accepted', 'running']
    https://github.com/MAAP-Project/maap-py/blob/master/maap/dps/dps_job.py
    """
    return dps_job.wait_for_completion()


def poll_on_job_status(jobs: Tuple[DPSJob]) -> Tuple[DPSJob]:
    failed_jobs = []
    # don't want to overwhelm the MAAP api so keeping max_workers relatively small
    with ThreadPoolExecutor(max_workers=5) as executor:
        dps_job_futures = [executor.submit(wait_for_job, dps_job) for dps_job in jobs]
        for dps_job in concurrent.futures.as_completed(dps_job_futures):
            try:
                if dps_job.result().retrieve_status().lower() != 'succeeded':
                    failed_jobs.append(dps_job)
            except Exception as e:
                logger.exception(f"'poll_on_jobs_status' failed with {e}")
    return failed_jobs


def track_submitted_jobs(submitted_jobs: Tuple[DPSJob]) -> Tuple[DPSJob]:
    queued_jobs = validate_job_submission(submitted_jobs)
    failed_jobs = poll_on_job_status(queued_jobs)
    return failed_jobs

#### Dask Coodinator

In [None]:
tst = [2023,1,1,'AM']
ted = [2023,7,1,'PM']
region = ["Oregon", [-124.925537,41.672912,-115.565186,46.513516]]

In [None]:
configs = get_algorithm_config_filepath(['coordinator',])
maap_api = MAAP(maap_host='api.maap-project.org')
algo_config = algorithm_utils.read_yaml_file(configs[0])
algo_config.pop('inputs')
print(algo_config)

submitted_jobs = []
submit_job_kwargs = {
    "identifier": f"job-{algo_config['algorithm_name']}:{algo_config['algorithm_version']}",
    "algo_id": algo_config["algorithm_name"],
    "version": algo_config["algorithm_version"],
    "username": "gcorradini",
    "queue": algo_config["queue"],
}
param_kwargs = {"regnm": region[0], "tst":  json.dumps(tst), 
                "bbox": region[1], "ted": json.dumps(ted), "operation": "--coordinate-all"}

result = maap_api.submitJob(**submit_job_kwargs, **param_kwargs)
submitted_jobs.append(result)
queued_jobs = validate_job_submission(submitted_jobs)

In [None]:
%%time
failed_jobs = poll_on_job_status(queued_jobs)

In [None]:
failed_jobs[0].result().retrieve_result()