Skip to content

Commit

Permalink
Append CWL log to the end of Airflow log
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-kotliar committed Oct 8, 2021
1 parent 35b1c6e commit f04577c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
2 changes: 1 addition & 1 deletion cwl_airflow/utilities/loggers.py
Expand Up @@ -14,7 +14,7 @@ def setup_cwl_logger(ti, level=None):
logs but not in the separate files.
"""

level = conf_get("core", "LOGGING_LEVEL", "INFO").upper() if level is None else level
level = conf_get("logging", "LOGGING_LEVEL", "INFO").upper() if level is None else level
cwl_logger = logging.getLogger("cwltool")
for handler in cwl_logger.handlers:
try:
Expand Down
17 changes: 16 additions & 1 deletion cwl_airflow/utilities/report.py
Expand Up @@ -49,6 +49,19 @@ def resend_reports():
logging.debug(f"Failed to POST value from {var.key} variable. Will retry in the next run \n {err}")


def append_cwl_log(context):
"""
Appends the latest CWL log to the end of the Airflow log.
"""

ti = context["ti"]
cwl_log_handler = get_log_handler("cwltool", "cwltool")
cwl_logs, _ = cwl_log_handler.read(ti)
latest_cwl_log_content = cwl_logs[-1][0][1] # [-1] - to get only the last task retry, [0] - there is only one item in array. [1] - to get the actual log content as a string
ti.log.info("CWL LOGS")
ti.log.info(latest_cwl_log_content)


def get_error_info(context):
"""
This function should be called only from the dag_run failure callback.
Expand Down Expand Up @@ -96,7 +109,7 @@ def get_error_info(context):
try: # in case log files were deleted or unavailable
logs, _ = airflow_handler.read(ti) # logs is always a list, so we need to take [0]
for marker, category in ERROR_MARKERS.items():
if marker in logs[0][-1][1]: # [-1] - to get only the last task retry, [1] - to get the actual log string with "\n"
if marker in logs[-1][0][1]: # [-1] - to get only the last task retry, [0] - there is only one item in array. [1] - to get the actual log content as a string
categories.add(category)
break
except Exception as err:
Expand Down Expand Up @@ -307,11 +320,13 @@ def clean_up(context):
def task_on_success(context):
report_progress(context, True)
report_status(context)
append_cwl_log(context)


def task_on_failure(context):
# no need to report progress as it hasn't been changed
report_status(context)
append_cwl_log(context)


def task_on_retry(context):
Expand Down

0 comments on commit f04577c

Please sign in to comment.