Skip to content

Commit

Permalink
Reports only the first found error per failed task.
Browse files Browse the repository at this point in the history
Error categories are sorted by priorities. If high level
error occur it might make the lower error categories
markers to appear in the log, so we need to skip them.
  • Loading branch information
michael-kotliar committed Dec 4, 2020
1 parent d77989a commit 4f709de
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions cwl_airflow/utilities/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ def get_error_category(context):
retry, because the get_error_category function is called when the dag_run
has failed, so all previous task retries didn't bring any positive results.
We load logs only for the actually failed task, not for upstream_failed
tasks. All error categories should be mutullly exclusive so they are not
supposed to appear in the same log file all at once. We report only unique
error categories that's why we use set. The "Failed to run workflow step"
category additionally is filled with failed task ids. The returned value is
always a string.
tasks. All error categories are sorted by priority from higher level to the
lower one. We report only one (the highest, the first found) error category
per failed task. Error categories from all failed tasks are combined and
deduplicated. The "Failed to run workflow step" category additionally is
filled with failed task ids. The returned value is always a string.
"""

ERROR_MARKERS = {
Expand Down Expand Up @@ -73,16 +73,16 @@ def get_error_category(context):
for ti in failed_tis:
ti.task = context["dag"].get_task(ti.task_id) # for some reasons when retreived from DagRun we need to set "task" property from the DAG
try: # in case log files were deleted or unavailable
logs, _ = log_handler.read(ti) # logs is always a list
for line in logs[-1].split("\n"): # first need to take the latest log and split it by "\n"
for marker, category in ERROR_MARKERS.items():
if marker in line:
categories.add(category)
logs, _ = log_handler.read(ti) # logs is always a list.
for marker, category in ERROR_MARKERS.items():
if marker in logs[-1]: # logs[-1] is a string with \n from the last task retry
categories.add(category)
break
except Exception as err:
logging.debug(f"Failed to define the error category for task {ti.task_id}. \n {err}")

if categories:
return "\n".join(categories).format(", ".join( [ti.task_id for ti in failed_tis] )) # mainly to fill in the placeholder with failed task ids
return ". ".join(categories).format(", ".join( [ti.task_id for ti in failed_tis] )) # mainly to fill in the placeholder with failed task ids
return "Unknown error. Contact support team"


Expand Down

0 comments on commit 4f709de

Please sign in to comment.