diff --git a/cwl_airflow/utilities/report.py b/cwl_airflow/utilities/report.py index 72fdb92..68141b2 100644 --- a/cwl_airflow/utilities/report.py +++ b/cwl_airflow/utilities/report.py @@ -126,7 +126,7 @@ def post_progress(context, from_task=None): ) } try: - http_hook.run(endpoint=ROUTES["progress"], json=message) + http_hook.run(endpoint=ROUTES["progress"], json=message, extra_options={"timeout": 30}) except AirflowNotFoundException as err: logging.debug(f"Failed to POST progress updates. Skipping \n {err}") except Exception as err: @@ -174,7 +174,7 @@ def post_results(context): ) } try: - http_hook.run(endpoint=ROUTES["results"], json=message) + http_hook.run(endpoint=ROUTES["results"], json=message, extra_options={"timeout": 30}) except AirflowNotFoundException as err: logging.debug(f"Failed to POST results. Skipping \n {err}") except Exception as err: @@ -203,7 +203,7 @@ def post_status(context): ) } try: - http_hook.run(endpoint=ROUTES["status"], json=message) + http_hook.run(endpoint=ROUTES["status"], json=message, extra_options={"timeout": 30}) except Exception as err: logging.debug(f"Failed to POST status updates. \n {err}") @@ -213,7 +213,9 @@ def clean_up(context): Loads "cwl" arguments from the DAG, just in case updates them with all required defaults, and, unless "keep_tmp_data" was set to True, tries to remove all remporary data and related records in the XCom - table + table. If this function is called from clean_dag_run or any other + DAG that doesn't have "cwl" in the "default_args" we will catch + KeyError exception """ try: default_cwl_args = get_default_cwl_args( @@ -221,10 +223,10 @@ def clean_up(context): ) if not default_cwl_args["keep_tmp_data"]: dag_run = context["dag_run"] - remove_dag_run_tmp_data(dag_run) + remove_dag_run_tmp_data(dag_run) # safe to run as it has its own exception handling for ti in dag_run.get_task_instances(): ti.clear_xcom_data() - except KeyError as err: # other than CWLDAG non of our DAGs should have "cwl" field in default_args + except KeyError as err: # will catch if called from clean_dag_run logging.info(f"Failed to clean up data for current DAG, due to \n {err}")