Skip to content

Commit

Permalink
[AIRFLOW-5850] Capture task logs in DockerSwarmOperator (#6552)
Browse files Browse the repository at this point in the history
* [AIRFLOW-5850] Capture task logs in DockerSwarmOperator

* [AIRFLOW-5850] Fix the mock in the docker swarm tests

* [AIRFLOW-5850] Squash me: Remove nested blocks in docker swarm operator
  • Loading branch information
akki committed Apr 26, 2020
1 parent e8d0f8f commit 3237c7e
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 14 deletions.
63 changes: 54 additions & 9 deletions airflow/providers/docker/operators/docker_swarm.py
Expand Up @@ -16,6 +16,7 @@
# under the License.
"""Run ephemeral Docker Swarm services"""

import requests
from docker import types

from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -88,17 +89,22 @@ class DockerSwarmOperator(DockerOperator):
:param tty: Allocate pseudo-TTY to the container of this service
This needs to be set see logs of the Docker container / service.
:type tty: bool
:param enable_logging: Show the application's logs in operator's logs.
Supported only if the Docker engine is using json-file or journald logging drivers.
The `tty` parameter should be set to use this with Python applications.
:type enable_logging: bool
"""

@apply_defaults
def __init__(
self,
image,
enable_logging=True,
*args,
**kwargs):

super().__init__(image=image, *args, **kwargs)

self.enable_logging = enable_logging
self.service = None

def _run_image(self):
Expand All @@ -122,24 +128,63 @@ def _run_image(self):

self.log.info('Service started: %s', str(self.service))

status = None
# wait for the service to start the task
while not self.cli.tasks(filters={'service': self.service['ID']}):
continue
while True:

status = self.cli.tasks(
filters={'service': self.service['ID']}
)[0]['Status']['State']
if status in ['failed', 'complete']:
self.log.info('Service status before exiting: %s', status)
if self.enable_logging:
self._stream_logs_to_output()

while True:
if self._has_service_terminated():
self.log.info('Service status before exiting: %s', self._service_status())
break

if self.auto_remove:
self.cli.remove_service(self.service['ID'])
if status == 'failed':
if self._service_status() == 'failed':
raise AirflowException('Service failed: ' + repr(self.service))

def _service_status(self):
return self.cli.tasks(
filters={'service': self.service['ID']}
)[0]['Status']['State']

def _has_service_terminated(self):
status = self._service_status()
return (status in ['failed', 'complete'])

def _stream_logs_to_output(self):
logs = self.cli.service_logs(
self.service['ID'], follow=True, stdout=True, stderr=True, is_tty=self.tty
)
line = ''
while True:
try:
log = next(logs)
# TODO: Remove this clause once https://github.com/docker/docker-py/issues/931 is fixed
except requests.exceptions.ConnectionError:
# If the service log stream stopped sending messages, check if it the service has
# terminated.
if self._has_service_terminated():
break
except StopIteration:
# If the service log stream terminated, stop fetching logs further.
break
else:
try:
log = log.decode()
except UnicodeDecodeError:
continue
if log == '\n':
self.log.info(line)
line = ''
else:
line += log
# flush any remaining log stream
if line:
self.log.info(line)

def on_kill(self):
if self.cli is not None:
self.log.info('Removing docker service: %s', self.service['ID'])
Expand Down
60 changes: 55 additions & 5 deletions tests/providers/docker/operators/test_docker_swarm.py
Expand Up @@ -19,6 +19,7 @@
import unittest

import mock
import requests
from docker import APIClient

from airflow.exceptions import AirflowException
Expand All @@ -36,10 +37,15 @@ def test_execute(self, types_mock, client_class_mock):
def _client_tasks_side_effect():
for _ in range(2):
yield [{'Status': {'State': 'pending'}}]
yield [{'Status': {'State': 'complete'}}]
while True:
yield [{'Status': {'State': 'complete'}}]

def _client_service_logs_effect():
yield b'Testing is awesome.'

client_mock = mock.Mock(spec=APIClient)
client_mock.create_service.return_value = {'ID': 'some_id'}
client_mock.service_logs.return_value = _client_service_logs_effect()
client_mock.images.return_value = []
client_mock.pull.return_value = [b'{"status":"pull log"}']
client_mock.tasks.side_effect = _client_tasks_side_effect()
Expand Down Expand Up @@ -70,14 +76,18 @@ def _client_tasks_side_effect():
base_url='unix://var/run/docker.sock', tls=None, version='1.19'
)

client_mock.service_logs.assert_called_once_with(
'some_id', follow=True, stdout=True, stderr=True, is_tty=True
)

csargs, cskwargs = client_mock.create_service.call_args_list[0]
self.assertEqual(
len(csargs), 1, 'create_service called with different number of arguments than expected'
)
self.assertEqual(csargs, (mock_obj, ))
self.assertEqual(cskwargs['labels'], {'name': 'airflow__adhoc_airflow__unittest'})
self.assertTrue(cskwargs['name'].startswith('airflow-'))
self.assertEqual(client_mock.tasks.call_count, 3)
self.assertEqual(client_mock.tasks.call_count, 5)
client_mock.remove_service.assert_called_once_with('some_id')

@mock.patch('airflow.providers.docker.operators.docker.APIClient')
Expand All @@ -98,7 +108,7 @@ def test_no_auto_remove(self, types_mock, client_class_mock):

client_class_mock.return_value = client_mock

operator = DockerSwarmOperator(image='', auto_remove=False, task_id='unittest')
operator = DockerSwarmOperator(image='', auto_remove=False, task_id='unittest', enable_logging=False)
operator.execute(None)

self.assertEqual(
Expand All @@ -124,16 +134,56 @@ def test_failed_service_raises_error(self, types_mock, client_class_mock):

client_class_mock.return_value = client_mock

operator = DockerSwarmOperator(image='', auto_remove=False, task_id='unittest')
operator = DockerSwarmOperator(image='', auto_remove=False, task_id='unittest', enable_logging=False)
msg = "Service failed: {'ID': 'some_id'}"
with self.assertRaises(AirflowException) as error:
operator.execute(None)
self.assertEqual(str(error.exception), msg)

@mock.patch('airflow.providers.docker.operators.docker.APIClient')
@mock.patch('airflow.providers.docker.operators.docker_swarm.types')
def test_logging_with_requests_timeout(self, types_mock, client_class_mock):

mock_obj = mock.Mock()

def _client_tasks_side_effect():
for _ in range(2):
yield [{'Status': {'State': 'pending'}}]
while True:
yield [{'Status': {'State': 'complete'}}]

def _client_service_logs_effect():
yield b'Testing is awesome.'
raise requests.exceptions.ConnectionError('')

client_mock = mock.Mock(spec=APIClient)
client_mock.create_service.return_value = {'ID': 'some_id'}
client_mock.service_logs.return_value = _client_service_logs_effect()
client_mock.images.return_value = []
client_mock.pull.return_value = [b'{"status":"pull log"}']
client_mock.tasks.side_effect = _client_tasks_side_effect()
types_mock.TaskTemplate.return_value = mock_obj
types_mock.ContainerSpec.return_value = mock_obj
types_mock.RestartPolicy.return_value = mock_obj
types_mock.Resources.return_value = mock_obj

client_class_mock.return_value = client_mock

operator = DockerSwarmOperator(
api_version='1.19', command='env', environment={'UNIT': 'TEST'}, image='ubuntu:latest',
mem_limit='128m', user='unittest', task_id='unittest', auto_remove=True, tty=True,
enable_logging=True
)
operator.execute(None)

client_mock.service_logs.assert_called_once_with(
'some_id', follow=True, stdout=True, stderr=True, is_tty=True
)

def test_on_kill(self):
client_mock = mock.Mock(spec=APIClient)

operator = DockerSwarmOperator(image='', auto_remove=False, task_id='unittest')
operator = DockerSwarmOperator(image='', auto_remove=False, task_id='unittest', enable_logging=False)
operator.cli = client_mock
operator.service = {'ID': 'some_id'}

Expand Down

0 comments on commit 3237c7e

Please sign in to comment.