From 4e4dd91b47c74e743d6e56fea51547f089128ae2 Mon Sep 17 00:00:00 2001 From: Ping Zhang Date: Fri, 20 Sep 2019 14:08:56 -0700 Subject: [PATCH] [AIRFLOW-5528] end_of_log_mark should not be a log record --- airflow/utils/log/es_task_handler.py | 2 +- tests/utils/log/test_es_task_handler.py | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/airflow/utils/log/es_task_handler.py b/airflow/utils/log/es_task_handler.py index 9bf6720f04db8..054fafce3f123 100644 --- a/airflow/utils/log/es_task_handler.py +++ b/airflow/utils/log/es_task_handler.py @@ -249,7 +249,7 @@ def close(self): # Mark the end of file using end of log mark, # so we know where to stop while auto-tailing. - self.handler.emit(logging.makeLogRecord({'msg': self.end_of_log_mark})) + self.handler.stream.write(self.end_of_log_mark) if self.write_stdout: self.handler.close() diff --git a/tests/utils/log/test_es_task_handler.py b/tests/utils/log/test_es_task_handler.py index eb69d83df5f72..11b58c51dfa08 100644 --- a/tests/utils/log/test_es_task_handler.py +++ b/tests/utils/log/test_es_task_handler.py @@ -20,6 +20,7 @@ import os import shutil import unittest +import logging import elasticsearch from unittest import mock @@ -260,12 +261,19 @@ def test_set_context_w_json_format_and_write_stdout(self): self.es_task_handler.set_context(self.ti) def test_close(self): + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + self.es_task_handler.formatter = formatter + self.es_task_handler.set_context(self.ti) self.es_task_handler.close() with open(os.path.join(self.local_log_location, self.filename_template.format(try_number=1)), 'r') as log_file: - self.assertIn(self.end_of_log_mark, log_file.read()) + # end_of_log_mark may contain characters like '\n' which is needed to + # have the log uploaded but will not be stored in elasticsearch. + # so apply the strip() to log_file.read() + log_line = log_file.read().strip() + self.assertEqual(self.end_of_log_mark.strip(), log_line) self.assertTrue(self.es_task_handler.closed) def test_close_no_mark_end(self):