This repository has been archived by the owner on Feb 8, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 24
/
mysos_scheduler.py
332 lines (276 loc) · 11.2 KB
/
mysos_scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
import os
import tempfile
from mysos.common import pkgutil, zookeeper
from .http import MysosServer
from .scheduler import MysosScheduler
from .state import LocalStateProvider, Scheduler, StateProvider
from .zk_state import ZooKeeperStateProvider
from kazoo.client import KazooClient
import mesos.interface
from mesos.interface.mesos_pb2 import Credential, FrameworkInfo
import mesos.native
from twitter.common import app, log
from twitter.common.exceptions import ExceptionalThread
from twitter.common.http import HttpServer
from twitter.common.log.options import LogOptions
from twitter.common.metrics import MetricSampler, RootMetrics
from twitter.common.quantity import Time
from twitter.common.quantity.parse_simple import InvalidTime, parse_time
import yaml
FRAMEWORK_NAME = 'mysos'
MYSOS_MODULE = 'mysos.scheduler'
ASSET_RELPATH = 'assets'
LogOptions.disable_disk_logging()
LogOptions.set_stderr_log_level('google:INFO')
def proxy_main():
app.add_option(
'--port',
dest='api_port',
type='int',
default=None,
help='Port for the HTTP API server')
app.add_option(
'--mesos_master',
dest='mesos_master',
default=None,
help='Mesos master address. It can be a ZooKeeper URL through which the master can be '
'detected')
app.add_option(
'--framework_user',
dest='framework_user',
help='The Unix user that Mysos executor runs as')
app.add_option(
'--framework_role',
dest='framework_role',
default='*',
help="The role that Mysos framework runs as. If set, Mysos only uses Mesos pool resources "
"with that role. The default value '*' is what Mesos considers as the default role.\n"
"NOTE: Mesos master needs to be configured to allow the specified role. See its --roles "
"flag")
app.add_option(
'--executor_uri',
dest='executor_uri',
default=None,
help='URI for the Mysos executor package')
app.add_option(
'--executor_cmd',
dest='executor_cmd',
default=None,
help='Command to execute the executor package')
app.add_option(
'--executor_environ',
dest='executor_environ',
default=None,
help="Environment variables for the executors (and the tasks) as a list of dicts keyed by "
"{name, value} in JSON. Note that these variables don't affect Mesos slave components "
"such as the fetcher")
app.add_option(
'--zk_url',
dest='zk_url',
default=None,
help='ZooKeeper URL for various Mysos operations, in the form of '
'"zk://username:password@servers/path". The sub-directory <zk_url>/discover is used for '
'communicating MySQL cluster information between Mysos scheduler and executors')
# TODO(jyx): This could also be made a per-cluster configuration.
app.add_option(
'--election_timeout',
dest='election_timeout',
default='60s',
help='The amount of time the scheduler waits for all slaves to respond during a MySQL master '
'election, e.g., 60s. After the timeout the master is elected from only the slaves that '
'have responded')
app.add_option(
'--admin_keypath',
dest='admin_keypath',
default=None,
help='The path to the key file with MySQL admin credentials on Mesos slaves')
app.add_option(
'--work_dir',
dest='work_dir',
default=os.path.join(tempfile.gettempdir(), 'mysos'),
help="Directory path to place Mysos work directories, e.g., web assets, state files if "
"--state_storage=local. Default to a system temp directory.")
app.add_option(
'--state_storage',
dest='state_storage',
default='zk',
help="Mechanism to persist scheduler state. Available options are 'zk' and 'local'. If 'zk' "
"is chosen, the scheduler state is stored under <zk_url>/state; see --zk_url. Otherwise "
"'local' is chosen and the state is persisted under <work_dir>/state; see --work_dir")
app.add_option(
'--scheduler_keypath',
dest='scheduler_keypath',
help="Path to the key file that the scheduler uses to store secrets such as MySQL "
"cluster passwords. This key must be exactly 32 bytes long")
app.add_option(
'--framework_failover_timeout',
dest='framework_failover_timeout',
default='14d',
help='Time after which Mysos framework is considered deleted. This implies losing all tasks. '
'SHOULD BE VERY HIGH')
# TODO(jyx): Flags like this are generally optional but specific executor implementations may
# require them. Consider adding validators that can be plugged in so configuration errors can be
# caught in the scheduler.
app.add_option(
'--installer_args',
dest='installer_args',
default=None,
help='Arguments for MySQL installer directly passed along to and parsed by the installer. '
'e.g., a serialized JSON string'
)
app.add_option(
'--backup_store_args',
dest='backup_store_args',
default=None,
help="Arguments for the store for MySQL backups. Its use and format are defined by the "
"backup store implementation. e.g., It can be a serialized JSON string"
)
app.add_option(
'--framework_authentication_file',
dest='framework_authentication_file',
default=None,
help="Path to the key file for authenticating the framework against Mesos master. Framework "
"will fail to register with Mesos if authentication is required by Mesos and this "
"option is not provided"
)
app.add_option(
'--executor_source_prefix',
dest='executor_source_prefix',
default=None,
help="Mysos uses the 'source' field in ExecutorInfo (See Mesos documentation) to group tasks "
"to support metrics tracking by external utilities. The format of ExecutorInfo.source "
"is '<prefix>.<cluster_name>.<server_id>'. This flag specifies the prefix to use in the "
"'source' field. e.g., it can be '<availability_zone>.<mesos_cluster>'. There is no "
"preceding period if <prefix> is empty"
)
def main(args, options):
log.info("Options in use: %s", options)
if not options.api_port:
app.error('Must specify --port')
if not options.mesos_master:
app.error('Must specify --mesos_master')
if not options.framework_user:
app.error('Must specify --framework_user')
if not options.executor_uri:
app.error('Must specify --executor_uri')
if not options.executor_cmd:
app.error('Must specify --executor_cmd')
if not options.zk_url:
app.error('Must specify --zk_url')
if not options.admin_keypath:
app.error('Must specify --admin_keypath')
if not options.scheduler_keypath:
app.error('Must specify --scheduler_keypath')
try:
election_timeout = parse_time(options.election_timeout)
framework_failover_timeout = parse_time(options.framework_failover_timeout)
except InvalidTime as e:
app.error(e.message)
try:
_, zk_servers, zk_root = zookeeper.parse(options.zk_url)
except Exception as e:
app.error("Invalid --zk_url: %s" % e.message)
web_assets_dir = os.path.join(options.work_dir, "web")
pkgutil.unpack_assets(web_assets_dir, MYSOS_MODULE, ASSET_RELPATH)
log.info("Extracted web assets into %s" % options.work_dir)
fw_principal = None
fw_secret = None
if options.framework_authentication_file:
try:
with open(options.framework_authentication_file, "r") as f:
cred = yaml.load(f)
fw_principal = cred["principal"]
fw_secret = cred["secret"]
log.info("Loaded credential (principal=%s) for framework authentication" % fw_principal)
except IOError as e:
app.error("Unable to read the framework authentication key file: %s" % e)
except (KeyError, yaml.YAMLError) as e:
app.error("Invalid framework authentication key file format %s" % e)
scheduler_key = None
try:
with open(options.scheduler_keypath, 'rb') as f:
scheduler_key = f.read().strip()
if not scheduler_key:
raise ValueError("The key file is empty")
except Exception as e:
app.error("Cannot read --scheduler_keypath: %s" % e)
log.info("Starting Mysos scheduler")
kazoo = KazooClient(zk_servers)
kazoo.start()
if options.state_storage == 'zk':
log.info("Using ZooKeeper (path: %s) for state storage" % zk_root)
state_provider = ZooKeeperStateProvider(kazoo, zk_root)
else:
log.info("Using local disk for state storage")
state_provider = LocalStateProvider(options.work_dir)
try:
state = state_provider.load_scheduler_state()
except StateProvider.Error as e:
app.error(e.message)
if state:
log.info("Successfully restored scheduler state")
framework_info = state.framework_info
if framework_info.HasField('id'):
log.info("Recovered scheduler's FrameworkID is %s" % framework_info.id.value)
else:
log.info("No scheduler state to restore")
framework_info = FrameworkInfo(
user=options.framework_user,
name=FRAMEWORK_NAME,
checkpoint=True,
failover_timeout=framework_failover_timeout.as_(Time.SECONDS),
role=options.framework_role)
if fw_principal:
framework_info.principal = fw_principal
state = Scheduler(framework_info)
state_provider.dump_scheduler_state(state)
scheduler = MysosScheduler(
state,
state_provider,
options.framework_user,
options.executor_uri,
options.executor_cmd,
kazoo,
options.zk_url,
election_timeout,
options.admin_keypath,
scheduler_key,
installer_args=options.installer_args,
backup_store_args=options.backup_store_args,
executor_environ=options.executor_environ,
executor_source_prefix=options.executor_source_prefix,
framework_role=options.framework_role)
RootMetrics().register_observable('scheduler', scheduler)
if fw_principal and fw_secret:
cred = Credential(principal=fw_principal, secret=fw_secret)
scheduler_driver = mesos.native.MesosSchedulerDriver(
scheduler,
framework_info,
options.mesos_master,
cred)
else:
scheduler_driver = mesos.native.MesosSchedulerDriver(
scheduler,
framework_info,
options.mesos_master)
scheduler_driver.start()
metric_sampler = MetricSampler(RootMetrics())
metric_sampler.start()
server = HttpServer()
server.mount_routes(MysosServer(scheduler, web_assets_dir, metric_sampler))
et = ExceptionalThread(
target=server.run, args=('0.0.0.0', options.api_port, 'cherrypy'))
et.daemon = True
et.start()
try:
# Wait for the scheduler to stop.
# The use of 'stopped' event instead of scheduler_driver.join() is necessary to stop the
# process with SIGINT.
while not scheduler.stopped.wait(timeout=0.5):
pass
except KeyboardInterrupt:
log.info('Interrupted, exiting.')
else:
log.info('Scheduler exited.')
app.shutdown(1) # Mysos scheduler is supposed to be long-running thus the use of exit status 1.
app.main()