Skip to content

Commit

Permalink
Add timeout to all http_hook run otherwise may stuck for long time
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-kotliar committed Jan 29, 2021
1 parent 403a1b0 commit f72ee23
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions cwl_airflow/utilities/report.py
Expand Up @@ -126,7 +126,7 @@ def post_progress(context, from_task=None):
)
}
try:
http_hook.run(endpoint=ROUTES["progress"], json=message)
http_hook.run(endpoint=ROUTES["progress"], json=message, extra_options={"timeout": 30})
except AirflowNotFoundException as err:
logging.debug(f"Failed to POST progress updates. Skipping \n {err}")
except Exception as err:
Expand Down Expand Up @@ -174,7 +174,7 @@ def post_results(context):
)
}
try:
http_hook.run(endpoint=ROUTES["results"], json=message)
http_hook.run(endpoint=ROUTES["results"], json=message, extra_options={"timeout": 30})
except AirflowNotFoundException as err:
logging.debug(f"Failed to POST results. Skipping \n {err}")
except Exception as err:
Expand Down Expand Up @@ -203,7 +203,7 @@ def post_status(context):
)
}
try:
http_hook.run(endpoint=ROUTES["status"], json=message)
http_hook.run(endpoint=ROUTES["status"], json=message, extra_options={"timeout": 30})
except Exception as err:
logging.debug(f"Failed to POST status updates. \n {err}")

Expand All @@ -213,18 +213,20 @@ def clean_up(context):
Loads "cwl" arguments from the DAG, just in case updates them with
all required defaults, and, unless "keep_tmp_data" was set to True,
tries to remove all remporary data and related records in the XCom
table
table. If this function is called from clean_dag_run or any other
DAG that doesn't have "cwl" in the "default_args" we will catch
KeyError exception
"""
try:
default_cwl_args = get_default_cwl_args(
context["dag"].default_args["cwl"]
)
if not default_cwl_args["keep_tmp_data"]:
dag_run = context["dag_run"]
remove_dag_run_tmp_data(dag_run)
remove_dag_run_tmp_data(dag_run) # safe to run as it has its own exception handling
for ti in dag_run.get_task_instances():
ti.clear_xcom_data()
except KeyError as err: # other than CWLDAG non of our DAGs should have "cwl" field in default_args
except KeyError as err: # will catch if called from clean_dag_run
logging.info(f"Failed to clean up data for current DAG, due to \n {err}")


Expand Down

0 comments on commit f72ee23

Please sign in to comment.