From 3237c7e31d008f73e6ba0ecc1f2331c7c80f0e17 Mon Sep 17 00:00:00 2001 From: Akshesh Doshi Date: Sun, 26 Apr 2020 23:12:36 +0700 Subject: [PATCH] [AIRFLOW-5850] Capture task logs in DockerSwarmOperator (#6552) * [AIRFLOW-5850] Capture task logs in DockerSwarmOperator * [AIRFLOW-5850] Fix the mock in the docker swarm tests * [AIRFLOW-5850] Squash me: Remove nested blocks in docker swarm operator --- .../docker/operators/docker_swarm.py | 63 ++++++++++++++++--- .../docker/operators/test_docker_swarm.py | 60 ++++++++++++++++-- 2 files changed, 109 insertions(+), 14 deletions(-) diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py index d13b210f5caa3..2caf6e587bfc8 100644 --- a/airflow/providers/docker/operators/docker_swarm.py +++ b/airflow/providers/docker/operators/docker_swarm.py @@ -16,6 +16,7 @@ # under the License. """Run ephemeral Docker Swarm services""" +import requests from docker import types from airflow.exceptions import AirflowException @@ -88,17 +89,22 @@ class DockerSwarmOperator(DockerOperator): :param tty: Allocate pseudo-TTY to the container of this service This needs to be set see logs of the Docker container / service. :type tty: bool + :param enable_logging: Show the application's logs in operator's logs. + Supported only if the Docker engine is using json-file or journald logging drivers. + The `tty` parameter should be set to use this with Python applications. + :type enable_logging: bool """ @apply_defaults def __init__( self, image, + enable_logging=True, *args, **kwargs): - super().__init__(image=image, *args, **kwargs) + self.enable_logging = enable_logging self.service = None def _run_image(self): @@ -122,24 +128,63 @@ def _run_image(self): self.log.info('Service started: %s', str(self.service)) - status = None # wait for the service to start the task while not self.cli.tasks(filters={'service': self.service['ID']}): continue - while True: - status = self.cli.tasks( - filters={'service': self.service['ID']} - )[0]['Status']['State'] - if status in ['failed', 'complete']: - self.log.info('Service status before exiting: %s', status) + if self.enable_logging: + self._stream_logs_to_output() + + while True: + if self._has_service_terminated(): + self.log.info('Service status before exiting: %s', self._service_status()) break if self.auto_remove: self.cli.remove_service(self.service['ID']) - if status == 'failed': + if self._service_status() == 'failed': raise AirflowException('Service failed: ' + repr(self.service)) + def _service_status(self): + return self.cli.tasks( + filters={'service': self.service['ID']} + )[0]['Status']['State'] + + def _has_service_terminated(self): + status = self._service_status() + return (status in ['failed', 'complete']) + + def _stream_logs_to_output(self): + logs = self.cli.service_logs( + self.service['ID'], follow=True, stdout=True, stderr=True, is_tty=self.tty + ) + line = '' + while True: + try: + log = next(logs) + # TODO: Remove this clause once https://github.com/docker/docker-py/issues/931 is fixed + except requests.exceptions.ConnectionError: + # If the service log stream stopped sending messages, check if it the service has + # terminated. + if self._has_service_terminated(): + break + except StopIteration: + # If the service log stream terminated, stop fetching logs further. + break + else: + try: + log = log.decode() + except UnicodeDecodeError: + continue + if log == '\n': + self.log.info(line) + line = '' + else: + line += log + # flush any remaining log stream + if line: + self.log.info(line) + def on_kill(self): if self.cli is not None: self.log.info('Removing docker service: %s', self.service['ID']) diff --git a/tests/providers/docker/operators/test_docker_swarm.py b/tests/providers/docker/operators/test_docker_swarm.py index e4cb6df351339..039b50b850fb3 100644 --- a/tests/providers/docker/operators/test_docker_swarm.py +++ b/tests/providers/docker/operators/test_docker_swarm.py @@ -19,6 +19,7 @@ import unittest import mock +import requests from docker import APIClient from airflow.exceptions import AirflowException @@ -36,10 +37,15 @@ def test_execute(self, types_mock, client_class_mock): def _client_tasks_side_effect(): for _ in range(2): yield [{'Status': {'State': 'pending'}}] - yield [{'Status': {'State': 'complete'}}] + while True: + yield [{'Status': {'State': 'complete'}}] + + def _client_service_logs_effect(): + yield b'Testing is awesome.' client_mock = mock.Mock(spec=APIClient) client_mock.create_service.return_value = {'ID': 'some_id'} + client_mock.service_logs.return_value = _client_service_logs_effect() client_mock.images.return_value = [] client_mock.pull.return_value = [b'{"status":"pull log"}'] client_mock.tasks.side_effect = _client_tasks_side_effect() @@ -70,6 +76,10 @@ def _client_tasks_side_effect(): base_url='unix://var/run/docker.sock', tls=None, version='1.19' ) + client_mock.service_logs.assert_called_once_with( + 'some_id', follow=True, stdout=True, stderr=True, is_tty=True + ) + csargs, cskwargs = client_mock.create_service.call_args_list[0] self.assertEqual( len(csargs), 1, 'create_service called with different number of arguments than expected' @@ -77,7 +87,7 @@ def _client_tasks_side_effect(): self.assertEqual(csargs, (mock_obj, )) self.assertEqual(cskwargs['labels'], {'name': 'airflow__adhoc_airflow__unittest'}) self.assertTrue(cskwargs['name'].startswith('airflow-')) - self.assertEqual(client_mock.tasks.call_count, 3) + self.assertEqual(client_mock.tasks.call_count, 5) client_mock.remove_service.assert_called_once_with('some_id') @mock.patch('airflow.providers.docker.operators.docker.APIClient') @@ -98,7 +108,7 @@ def test_no_auto_remove(self, types_mock, client_class_mock): client_class_mock.return_value = client_mock - operator = DockerSwarmOperator(image='', auto_remove=False, task_id='unittest') + operator = DockerSwarmOperator(image='', auto_remove=False, task_id='unittest', enable_logging=False) operator.execute(None) self.assertEqual( @@ -124,16 +134,56 @@ def test_failed_service_raises_error(self, types_mock, client_class_mock): client_class_mock.return_value = client_mock - operator = DockerSwarmOperator(image='', auto_remove=False, task_id='unittest') + operator = DockerSwarmOperator(image='', auto_remove=False, task_id='unittest', enable_logging=False) msg = "Service failed: {'ID': 'some_id'}" with self.assertRaises(AirflowException) as error: operator.execute(None) self.assertEqual(str(error.exception), msg) + @mock.patch('airflow.providers.docker.operators.docker.APIClient') + @mock.patch('airflow.providers.docker.operators.docker_swarm.types') + def test_logging_with_requests_timeout(self, types_mock, client_class_mock): + + mock_obj = mock.Mock() + + def _client_tasks_side_effect(): + for _ in range(2): + yield [{'Status': {'State': 'pending'}}] + while True: + yield [{'Status': {'State': 'complete'}}] + + def _client_service_logs_effect(): + yield b'Testing is awesome.' + raise requests.exceptions.ConnectionError('') + + client_mock = mock.Mock(spec=APIClient) + client_mock.create_service.return_value = {'ID': 'some_id'} + client_mock.service_logs.return_value = _client_service_logs_effect() + client_mock.images.return_value = [] + client_mock.pull.return_value = [b'{"status":"pull log"}'] + client_mock.tasks.side_effect = _client_tasks_side_effect() + types_mock.TaskTemplate.return_value = mock_obj + types_mock.ContainerSpec.return_value = mock_obj + types_mock.RestartPolicy.return_value = mock_obj + types_mock.Resources.return_value = mock_obj + + client_class_mock.return_value = client_mock + + operator = DockerSwarmOperator( + api_version='1.19', command='env', environment={'UNIT': 'TEST'}, image='ubuntu:latest', + mem_limit='128m', user='unittest', task_id='unittest', auto_remove=True, tty=True, + enable_logging=True + ) + operator.execute(None) + + client_mock.service_logs.assert_called_once_with( + 'some_id', follow=True, stdout=True, stderr=True, is_tty=True + ) + def test_on_kill(self): client_mock = mock.Mock(spec=APIClient) - operator = DockerSwarmOperator(image='', auto_remove=False, task_id='unittest') + operator = DockerSwarmOperator(image='', auto_remove=False, task_id='unittest', enable_logging=False) operator.cli = client_mock operator.service = {'ID': 'some_id'}