From 9fc893d5bc8e2a126df688f820673e360c36eeb0 Mon Sep 17 00:00:00 2001 From: Bolke de Bruin Date: Sat, 2 Dec 2017 09:25:56 +0100 Subject: [PATCH] [AIRFLOW-1878] Fix stderr/stdout redirection for tasks logging.StreamHandler keeps a reference to the initial stream it has been assigned. This prevents redirection after initalization to a logging facility. --- airflow/bin/cli.py | 2 +- .../airflow_local_settings.py | 4 +-- airflow/utils/log/logging_mixin.py | 29 +++++++++++++++++++ 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 782e58d96a3c6..1367362fe630f 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -367,7 +367,7 @@ def run(args, dag=None): hostname = socket.getfqdn() log.info("Running %s on host %s", ti, hostname) - with redirect_stdout(log, logging.INFO), redirect_stderr(log, logging.WARN): + with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN): if args.local: run_job = jobs.LocalTaskJob( task_instance=ti, diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index 020df8dbdeb51..aa5b8daabbe05 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -43,9 +43,9 @@ }, 'handlers': { 'console': { - 'class': 'logging.StreamHandler', + 'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler', 'formatter': 'airflow.task', - 'stream': 'ext://sys.stdout' + 'stream': 'sys.stdout' }, 'file.task': { 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index 03437bf740f69..fb3b85fe6f50b 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -21,8 +21,11 @@ import sys import warnings +import six + from builtins import object from contextlib import contextmanager +from logging import Handler, StreamHandler class LoggingMixin(object): @@ -103,6 +106,32 @@ def isatty(self): return False +class RedirectStdHandler(StreamHandler): + """ + This class is like a StreamHandler using sys.stderr/stdout, but always uses + whatever sys.stderr/stderr is currently set to rather than the value of + sys.stderr/stdout at handler construction time. + """ + def __init__(self, stream): + if not isinstance(stream, six.string_types): + raise Exception("Cannot use file like objects. Use 'stdout' or 'stderr'" + " as a str and without 'ext://'.") + + self._use_stderr = True + if 'stdout' in stream: + self._use_stderr = False + + # StreamHandler tries to set self.stream + Handler.__init__(self) + + @property + def stream(self): + if self._use_stderr: + return sys.stderr + + return sys.stdout + + @contextmanager def redirect_stdout(logger, level): writer = StreamLogWriter(logger, level)