/
docker_operator.py
236 lines (211 loc) · 9.17 KB
/
docker_operator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from airflow.hooks.docker_hook import DockerHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.file import TemporaryDirectory
from docker import Client, tls
import ast
class DockerOperator(BaseOperator):
"""
Execute a command inside a docker container.
A temporary directory is created on the host and mounted into a container to allow storing files
that together exceed the default disk size of 10GB in a container. The path to the mounted
directory can be accessed via the environment variable ``AIRFLOW_TMP_DIR``.
If a login to a private registry is required prior to pulling the image, a
Docker connection needs to be configured in Airflow and the connection ID
be provided with the parameter ``docker_conn_id``.
:param image: Docker image from which to create the container.
:type image: str
:param api_version: Remote API version. Set to ``auto`` to automatically
detect the server's version.
:type api_version: str
:param command: Command to be run in the container.
:type command: str or list
:param cpus: Number of CPUs to assign to the container.
This value gets multiplied with 1024. See
https://docs.docker.com/engine/reference/run/#cpu-share-constraint
:type cpus: float
:param docker_url: URL of the host running the docker daemon.
Default is unix://var/run/docker.sock
:type docker_url: str
:param environment: Environment variables to set in the container.
:type environment: dict
:param force_pull: Pull the docker image on every run. Default is false.
:type force_pull: bool
:param mem_limit: Maximum amount of memory the container can use. Either a float value, which
represents the limit in bytes, or a string like ``128m`` or ``1g``.
:type mem_limit: float or str
:param network_mode: Network mode for the container.
:type network_mode: str
:param tls_ca_cert: Path to a PEM-encoded certificate authority to secure the docker connection.
:type tls_ca_cert: str
:param tls_client_cert: Path to the PEM-encoded certificate used to authenticate docker client.
:type tls_client_cert: str
:param tls_client_key: Path to the PEM-encoded key used to authenticate docker client.
:type tls_client_key: str
:param tls_hostname: Hostname to match against the docker server certificate or False to
disable the check.
:type tls_hostname: str or bool
:param tls_ssl_version: Version of SSL to use when communicating with docker daemon.
:type tls_ssl_version: str
:param tmp_dir: Mount point inside the container to a temporary directory created on the host by
the operator. The path is also made available via the environment variable
``AIRFLOW_TMP_DIR`` inside the container.
:type tmp_dir: str
:param user: Default user inside the docker container.
:type user: int or str
:param volumes: List of volumes to mount into the container, e.g.
``['/host/path:/container/path', '/host/path2:/container/path2:ro']``.
:param working_dir: Working directory to set on the container (equivalent to the -w switch
the docker client)
:type working_dir: str
:param xcom_push: Does the stdout will be pushed to the next step using XCom.
The default is False.
:type xcom_push: bool
:param xcom_all: Push all the stdout or just the last line. The default is False (last line).
:type xcom_all: bool
:param docker_conn_id: ID of the Airflow connection to use
:type docker_conn_id: str
"""
template_fields = ('command',)
template_ext = ('.sh', '.bash',)
@apply_defaults
def __init__(
self,
image,
api_version=None,
command=None,
cpus=1.0,
docker_url='unix://var/run/docker.sock',
environment=None,
force_pull=False,
mem_limit=None,
network_mode=None,
tls_ca_cert=None,
tls_client_cert=None,
tls_client_key=None,
tls_hostname=None,
tls_ssl_version=None,
tmp_dir='/tmp/airflow',
user=None,
volumes=None,
working_dir=None,
xcom_push=False,
xcom_all=False,
docker_conn_id=None,
*args,
**kwargs):
super(DockerOperator, self).__init__(*args, **kwargs)
self.api_version = api_version
self.command = command
self.cpus = cpus
self.docker_url = docker_url
self.environment = environment or {}
self.force_pull = force_pull
self.image = image
self.mem_limit = mem_limit
self.network_mode = network_mode
self.tls_ca_cert = tls_ca_cert
self.tls_client_cert = tls_client_cert
self.tls_client_key = tls_client_key
self.tls_hostname = tls_hostname
self.tls_ssl_version = tls_ssl_version
self.tmp_dir = tmp_dir
self.user = user
self.volumes = volumes or []
self.working_dir = working_dir
self.xcom_push_flag = xcom_push
self.xcom_all = xcom_all
self.docker_conn_id = docker_conn_id
self.cli = None
self.container = None
def get_hook(self):
return DockerHook(
docker_conn_id=self.docker_conn_id,
base_url=self.base_url,
version=self.api_version,
tls=self.__get_tls_config()
)
def execute(self, context):
self.log.info('Starting docker container from image %s', self.image)
tls_config = self.__get_tls_config()
if self.docker_conn_id:
self.cli = self.get_hook().get_conn()
else:
self.cli = Client(
base_url=self.docker_url,
version=self.api_version,
tls=tls_config
)
if ':' not in self.image:
image = self.image + ':latest'
else:
image = self.image
if self.force_pull or len(self.cli.images(name=image)) == 0:
self.log.info('Pulling docker image %s', image)
for l in self.cli.pull(image, stream=True):
output = json.loads(l.decode('utf-8'))
self.log.info("%s", output['status'])
cpu_shares = int(round(self.cpus * 1024))
with TemporaryDirectory(prefix='airflowtmp') as host_tmp_dir:
self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir
self.volumes.append('{0}:{1}'.format(host_tmp_dir, self.tmp_dir))
self.container = self.cli.create_container(
command=self.get_command(),
cpu_shares=cpu_shares,
environment=self.environment,
host_config=self.cli.create_host_config(binds=self.volumes,
network_mode=self.network_mode),
image=image,
mem_limit=self.mem_limit,
user=self.user,
working_dir=self.working_dir
)
self.cli.start(self.container['Id'])
line = ''
for line in self.cli.logs(container=self.container['Id'], stream=True):
line = line.strip()
if hasattr(line, 'decode'):
line = line.decode('utf-8')
self.log.info(line)
exit_code = self.cli.wait(self.container['Id'])
if exit_code != 0:
raise AirflowException('docker container failed')
if self.xcom_push_flag:
return self.cli.logs(container=self.container['Id']) if self.xcom_all else str(line)
def get_command(self):
if self.command is not None and self.command.strip().find('[') == 0:
commands = ast.literal_eval(self.command)
else:
commands = self.command
return commands
def on_kill(self):
if self.cli is not None:
self.log.info('Stopping docker container')
self.cli.stop(self.container['Id'])
def __get_tls_config(self):
tls_config = None
if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key:
tls_config = tls.TLSConfig(
ca_cert=self.tls_ca_cert,
client_cert=(self.tls_client_cert, self.tls_client_key),
verify=True,
ssl_version=self.tls_ssl_version,
assert_hostname=self.tls_hostname
)
self.docker_url = self.docker_url.replace('tcp://', 'https://')
return tls_config