This repository has been archived by the owner on May 12, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 235
/
thermos_runner.py
273 lines (221 loc) · 7.34 KB
/
thermos_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import functools
import getpass
import os
import pwd
import signal
import sys
import traceback
from twitter.common import app, log
from twitter.common.log.options import LogOptions
from apache.thermos.common.excepthook import ExceptionTerminationHandler
from apache.thermos.common.options import add_port_to
from apache.thermos.common.planner import TaskPlanner
from apache.thermos.common.statuses import (
INTERNAL_ERROR,
INVALID_TASK,
TERMINAL_TASK,
UNKNOWN_ERROR,
UNKNOWN_USER
)
from apache.thermos.config.loader import ThermosConfigLoader
from apache.thermos.core.process import Process
from apache.thermos.core.runner import TaskRunner
app.add_option(
"--thermos_json",
dest="thermos_json",
default=None,
help="read a thermos Task from a serialized json blob")
app.add_option(
"--sandbox",
dest="sandbox",
metavar="PATH",
default=None,
help="The path on the host filesystem to the sandbox in which this task should run.")
app.add_option(
'--container_sandbox',
dest='container_sandbox',
type=str,
default=None,
help='If running in an isolated filesystem, the path within that filesystem where the sandbox '
'is mounted.')
app.add_option(
"--checkpoint_root",
dest="checkpoint_root",
metavar="PATH",
default=None,
help="the path where we will store checkpoints")
app.add_option(
"--task_id",
dest="task_id",
metavar="STRING",
default=None,
help="The id to which this task should be bound, created if it does not exist.")
app.add_option(
"--setuid",
dest="setuid",
metavar="USER",
default=None,
help="setuid tasks to this user, requires superuser privileges.")
app.add_option(
"--enable_chroot",
dest="chroot",
default=False,
action='store_true',
help="chroot tasks to the sandbox before executing them.")
app.add_option(
"--mesos_containerizer_path",
dest="mesos_containerizer_path",
metavar="PATH",
default=None,
help="The path to the mesos-containerizer executable that will be used to isolate the task's "
"filesystem (if using a filesystem image).")
app.add_option(
"--preserve_env",
dest="preserve_env",
default=False,
action='store_true',
help="Preserve thermos runners' environment variables for the task being run.")
app.add_option(
"--port",
type='string',
nargs=1,
action='callback',
callback=add_port_to('prebound_ports'),
dest='prebound_ports',
default={},
metavar="NAME:PORT",
help="bind a numbered port PORT to name NAME")
app.add_option(
"--hostname",
default=None,
dest="hostname",
help="The hostname to advertise in ZooKeeper and the thermos observer instead of "
"the locally-resolved hostname.")
app.add_option(
'--process_logger_destination',
dest='process_logger_destination',
type=str,
default=None,
help='The destination of logger to use for all processes run by thermos.')
app.add_option(
'--process_logger_mode',
dest='process_logger_mode',
type=str,
default=None,
help='The logger mode to use for all processes run by thermos.')
app.add_option(
'--rotate_log_size_mb',
dest='rotate_log_size_mb',
type=int,
default=None,
help='Maximum size of the rotated stdout/stderr logs emitted by the thermos runner in MiB.')
app.add_option(
'--rotate_log_backups',
dest='rotate_log_backups',
type=int,
default=None,
help='Maximum number of rotated stdout/stderr logs emitted by the thermos runner.')
def get_task_from_options(opts):
tasks = ThermosConfigLoader.load_json(opts.thermos_json)
if len(tasks.tasks()) == 0:
app.error("No tasks specified!")
if len(tasks.tasks()) > 1:
app.error("Multiple tasks in config but no task name specified!")
task = tasks.tasks()[0]
if not task.task.check().ok():
app.error(task.task.check().message())
return task
def runner_teardown(runner, sig=signal.SIGUSR1, frame=None):
"""Destroy runner on SIGUSR1 (kill) or SIGUSR2 (lose)"""
op = 'kill' if sig == signal.SIGUSR1 else 'lose'
log.info('Thermos runner got signal %s, shutting down.', sig)
log.info('Interrupted frame:')
if frame:
for line in ''.join(traceback.format_stack(frame)).splitlines():
log.info(line)
runner.close_ckpt()
log.info('Calling runner.%s()', op)
getattr(runner, op)()
sys.exit(0)
class CappedTaskPlanner(TaskPlanner):
TOTAL_RUN_LIMIT = 100
def proxy_main(args, opts):
assert opts.thermos_json and os.path.exists(opts.thermos_json)
assert opts.sandbox
assert opts.checkpoint_root
thermos_task = get_task_from_options(opts)
prebound_ports = opts.prebound_ports
missing_ports = set(thermos_task.ports()) - set(prebound_ports)
if missing_ports:
log.error('ERROR! Unbound ports: %s', ' '.join(port for port in missing_ports))
sys.exit(INTERNAL_ERROR)
if opts.setuid:
user = opts.setuid
else:
user = getpass.getuser()
# if we cannot get the uid, this is an unknown user and we should fail
try:
pwd.getpwnam(user).pw_uid
except KeyError:
log.error('Unknown user: %s', user)
sys.exit(UNKNOWN_USER)
task_runner = TaskRunner(
thermos_task.task,
opts.checkpoint_root,
opts.sandbox,
task_id=opts.task_id,
user=opts.setuid,
portmap=prebound_ports,
chroot=opts.chroot,
planner_class=CappedTaskPlanner,
hostname=opts.hostname,
process_logger_destination=opts.process_logger_destination,
process_logger_mode=opts.process_logger_mode,
rotate_log_size_mb=opts.rotate_log_size_mb,
rotate_log_backups=opts.rotate_log_backups,
preserve_env=opts.preserve_env,
mesos_containerizer_path=opts.mesos_containerizer_path,
container_sandbox=opts.container_sandbox)
for sig in (signal.SIGUSR1, signal.SIGUSR2):
signal.signal(sig, functools.partial(runner_teardown, task_runner))
try:
task_runner.run()
except TaskRunner.InternalError as err:
log.error('Internal error: %s', err)
sys.exit(INTERNAL_ERROR)
except TaskRunner.InvalidTask as err:
log.error('Invalid task: %s', err)
sys.exit(INVALID_TASK)
except TaskRunner.StateError as err:
log.error('Checkpoint error: %s', err)
sys.exit(TERMINAL_TASK)
except Process.UnknownUserError as err:
log.error('User ceased to exist: %s', err)
sys.exit(UNKNOWN_USER)
except KeyboardInterrupt:
log.info('Caught ^C, tearing down runner.')
runner_teardown(task_runner)
except Exception as e:
log.error('Unknown exception: %s', e)
for line in traceback.format_exc().splitlines():
log.error(line)
sys.exit(UNKNOWN_ERROR)
def main(args, opts):
return proxy_main(args, opts)
LogOptions.set_simple(True)
LogOptions.set_disk_log_level('DEBUG')
app.register_module(ExceptionTerminationHandler())
app.main()