In [1]:
import glob
import os
import fsspec
import pandas as pd
import xarray as xr
from upath import UPath
import json
import time
import pathlib
from prefect.client import Client
from prefect.backend.flow import FlowView
from prefect.backend.flow_run import FlowRunView, watch_flow_run
from prefect.backend import FlowRunView
from cmip6_downscaling.methods.common.containers import RunParameters
from cmip6_downscaling.utils import str_to_hash
from cmip6_downscaling import __version__ as version, config
from cmip6_downscaling.methods.bcsd.flow import flow as bcsd_flow

config_file_dir = '../../configs/bcsd_test_subset/'
results_dir = UPath(config.get("storage.results.uri").split('az://')[1]) / version
finalize_dir = results_dir / 'runs'
connection_string = os.environ.get("AZURE_STORAGE_CONNECTION_STRING")
fs = fsspec.filesystem('az', connection_string=connection_string)



<class 'cmip6_downscaling.runtimes.CloudRuntime'>
  Storage    : <class 'prefect.storage.azure.Azure'>
  Run Config : <class 'prefect.run_configs.kubernetes.KubernetesRun'>
  Executor   : <class 'prefect.executors.dask.DaskExecutor'>



In [2]:
def retrieve_test_parms():
    """retrieve list of all .json param files in method subdir"""
    return glob.glob(f'{config_file_dir}*.json')


def create_run_params_from_json(parameter_fpath: str) -> RunParameters:

    df = pd.read_json(parameter_fpath)
    run_parameters = RunParameters(
        method=df.method.iloc[0],
        obs=df.obs.iloc[0],
        model=df.model.iloc[0],
        member=df.member.iloc[0],
        grid_label=df.grid_label.iloc[0],
        table_id=df.table_id.iloc[0],
        scenario=df.scenario.iloc[0],
        variable=df.variable.iloc[0],
        latmin=df.latmin.iloc[0],
        latmax=df.latmax.iloc[0],
        lonmin=df.lonmin.iloc[0],
        lonmax=df.lonmax.iloc[0],
        train_dates=[df.train_period.iloc[0], df.train_period.iloc[1]],
        predict_dates=[df.predict_period.iloc[0], df.predict_period.iloc[1]],
    )

    return run_parameters


def run_flow(flow_id: str, param_file_path: str) -> list[str]:

    json_path = pathlib.Path(param_file_path).read_text()
    flow_hash = str_to_hash(json_path)
    param_dict = json.loads(json_path)
    param_dict["predict_period"] = param_dict.pop("predict_dates")
    param_dict["train_period"] = param_dict.pop("train_dates")

    client = Client()
    flow_run_id = client.create_flow_run(flow_id=flow_id, parameters=param_dict)
    flow_run = FlowRunView.from_flow_run_id(flow_run_id)
    run_url = client.get_cloud_url("flow-run", flow_run_id)
    print(run_url)
    return flow_run_id, flow_run, run_url, flow_hash, param_file_path


def run_all_params(param_list: list, flow_id: str):
    for param_file_path in param_list:
        run_flow(flow_id, param_file_path)


def check_flow_status(flow_run):
    while not flow_run.state.is_finished():
        time.sleep(3)
        flow_run = flow_run.get_latest()
    return flow_run


def check_finalized_json(param_file_list: list[str]) -> list[str]:
    """Takes list of parameter files, checks if finalize json files exist for input params. Returns list of matching finalized files."""
    finalized_file_list = []
    for fil in param_file_list:
        run_params = create_run_params_from_json(fil)
        json_fpath = finalize_dir.as_posix() + '/' + run_params.run_id + '/' + 'latest.json'
        if fs.exists(json_fpath):
            # appends the .json path formatted to match the input param path
            finalized_file_list.append(
                config_file_dir + json_fpath.split('/latest.json')[0].split('/')[-1] + '.json'
            )

    return finalized_file_list

## Register flow

In [3]:
flow_id = bcsd_flow.register(project_name='cmip6')

[2022-05-13 06:19:27+0000] INFO - prefect.Azure | Uploading bcsd/2022-05-13t06-19-27-356653-00-00 to prefect
Flow URL: https://cloud.prefect.io/carbonplan/flow/0f2a250a-5f25-4a27-ab7a-7a6d8899c00e
 └── ID: bc4280b3-1432-4d6b-9161-f21dcd65ea48
 └── Project: cmip6
 └── Labels: ['az-eu-west']


## Run all parameter files on prefect cloud

In [4]:
all_parameter_files = retrieve_test_parms()
for param_file_path in all_parameter_files:
    run_flow(flow_id, param_file_path)

https://cloud.prefect.io/carbonplan/flow-run/72213004-9f14-49f1-83df-a8651ac1ae01
https://cloud.prefect.io/carbonplan/flow-run/bfcba53b-8908-46e6-81a8-e7353cca6d35
https://cloud.prefect.io/carbonplan/flow-run/dadf192f-87b7-4050-9b3a-261de605e660
https://cloud.prefect.io/carbonplan/flow-run/a9e9b557-8769-4719-bbd4-fd9926df7974


### Wait n hours until all flows are finished/failed

## Rerun flows that have not been finalized

In [None]:
successful_flows = check_finalized_json(all_parameter_files)
failed_flows = set(all_parameter_files) ^ set(successful_flows)

#### etc.

In [None]:
for param_file_path in failed_flows:
    run_flow(flow_id, param_file_path)

In [None]:
# def rerun_flow(flow_run, flow_run_id, param_file):

#     flow_run = check_flow_status(flow_run)

#     if flow_run.state.is_successful():
#         return ''
#     else:
#         # we can add more retries here -- more complex for now
#         flow_run_id, flow_run, run_url, flow_hash, param_file_path = run_flow(
#             flow_run_id, param_file
#         )
#         flow_run = check_flow_status(flow_run)

#         if flow_run.state.is_failed():
#             return param_file_path
#         else:
#             return ''

# failed_param_file_list = []
# for flow_run_id, flow_run, run_url, flow_hash, param_file_path in runs:
#     rerun_flow_status = rerun_flow(flow_run, flow_run_id, param_file_path)
#     if len(rerun_flow_status) != 0:
#         failed_param_file_list.append(rerun_flow_status)

In [None]:
results_dir = UPath(config.get("storage.results.uri")) / version

downscaling_methods = ['bcsd']
method = 'bcsd'
_prefect_register_str = (
    """prefect register --project "cmip6" -p ../methods/{downscaling_method}/flow.py"""
)
_prefect_run_str = """prefect run -i "{flow_run_id}" --param-file {param_file}"""

In [None]:
def register_flow(method: str) -> str:
    """Register flow with prefect cloud and return flow_run_id for running flows"""

    print('registering flow on prefect cloud')
    flow_id = bcsd_flow.register(project_name='cmip6')
    return flow_id


def check_run_failed(run_id: str) -> bool:
    flow_run = FlowRunView.from_flow_run_id(run_id)
    flow_status_is_failed = FlowRunView.from_flow_run_id(run_id).state.is_failed()

    return flow_status_is_failed


def check_run_status(run_id: str) -> bool:
    flow_run = FlowRunView.from_flow_run_id(run_id)
    flow_state_finished_status = flow_run.state.is_finished()

    return flow_state_finished_status


# failed_runs = []
# while flow_state_finished_status == False:
#         flow_run = FlowRunView.from_flow_run_id(run_id)
#         flow_state_finished_status = flow_run.state.is_finished()
#         time.sleep(60)
#     flow_status_is_failed = FlowRunView.from_flow_run_id(run_id).state.is_failed()
#     if flow_status_is_failed: #if True, run has failed
#         failed_runs.append(run_id)


# def run_flow(param_file: str, flow_id: str) -> list:

#         print(param_file)
#         sys_output = os.popen(
#             _prefect_run_str.format(flow_run_id=flow_id, param_file=param_file)
#         ).read()
#         run_id = sys_output.split('UUID: ')[1].split('\n')[0]
#         run_url = sys_output.split('URL: ')[1].split('\n')[0]
#         print(run_url)

#         return [run_id, run_url]


def check_run_status(run_id_list: list) -> list:
    failed_runs = []
    for run_id in run_id_list:
        flow_state_finished_status = check_run_status(run_id)
        flow_status_is_failed = check_run_failed(run_id)

        if flow_status_is_failed:
            failed_runs.append(run_id)
    return failed_runs


def run_all_param_files(param_file_list: list) -> list:
    run_id_list = []
    flow_id = register_flow(method)
    for param_file in param_file_list:
        flow_return = run_flow(param_file, flow_id)
        run_id_list.append(flow_return[0])
    print(run_id_list)

In [None]:
# first pass
param_file_list = retrieve_test_parms()
run_id_list = run_all_param_files(param_file_list)

In [None]:
run_id_list

In [None]:
# second pass
failed_runs = check_run_status(run_id_list)
second_failed_runs = run_all_param_files(failed_runs)