This repository has been archived by the owner on Jan 26, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 351
/
app_manager.rb
584 lines (501 loc) · 18.7 KB
/
app_manager.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
class AppManager
attr_reader :app
DEFAULT_MAX_CONCURRENT_STAGERS = 10
DEFAULT_MAX_STAGING_RUNTIME = 60
class << self
def max_running
AppConfig[:staging][:max_concurrent_stagers] || DEFAULT_MAX_CONCURRENT_STAGERS
end
def max_runtime
AppConfig[:staging][:max_staging_runtime] || DEFAULT_MAX_STAGING_RUNTIME
end
def staging_manifest_directory
AppConfig[:directories][:staging_manifests]
end
def pending
@pending ||= []
end
def running
@running ||= {}
end
def secure_staging_dir(user, dir)
CloudController.logger.debug("Securing directory '#{dir}'", :tags => [:staging])
system("chown -R #{user[:user]} #{dir}")
system("chgrp -R #{user[:group]} #{dir}")
system("chmod -R o-rwx #{dir}")
system("chmod -R g-rwx #{dir}")
end
def queue_staging_job(job)
pending << job
VCAP::Component.varz[:pending_stage_cmds] = pending.length
process_queue
end
def staging_job_expired(job)
CloudController.logger.warn("Killing long running staging process: #{job.inspect}", :tags => [:staging])
job.delete(:expire_timer)
# Forcefully take out long running stager
`kill -9 #{job[:pid]}`
complete_running(job)
end
def mark_running(job, pid)
job[:pid] = pid
job[:start] = Time.now
job[:expire_timer] = EM.add_timer(max_runtime) { AppManager.staging_job_expired(job) }
running[pid] = job
VCAP::Component.varz[:running_stage_cmds] = running.length
end
def complete_running(job)
EM.cancel_timer(job[:expire_timer]) if job[:expire_timer]
if job[:user]
CloudController.logger.debug("Checking in user #{job[:user]}", :tags => [:staging, :secure])
`sudo -u '##{job[:user][:uid]}' pkill -9 -U #{job[:user][:uid]} 2>&1`
SecureUserManager.instance.return_secure_user(job[:user])
job[:user] = nil
end
running.delete job[:pid]
VCAP::Component.varz[:running_stage_cmds] = running.length
process_queue
end
def process_queue
return if running.length >= max_running
return if pending.empty?
job = pending.shift
VCAP::Component.varz[:pending_stage_cmds] = pending.length
CloudController.logger.debug("Starting staging command: #{job[:cmd]}", :tags => [:staging])
if AppConfig[:staging][:secure]
job[:user] = user = SecureUserManager.instance.grab_secure_user
CloudController.logger.debug("Checked out user #{user.inspect}", :tags => [:staging, :secure])
job[:cmd] = "#{job[:cmd]} #{user[:uid]} #{user[:gid]}"
CloudController.logger.debug("Command changed to '#{job[:cmd]}'", :tags => [:staging, :secure])
AppManager.secure_staging_dir(job[:user], job[:staging_dir])
AppManager.secure_staging_dir(job[:user], job[:exploded_dir])
end
Bundler.with_clean_env do
pid = EM.system(job[:cmd]) do |output, status|
if status.exitstatus != 0
CloudController.logger.debug("Staging command FAILED with #{status.exitstatus}: #{output}", :tags => [:staging])
else
CloudController.logger.debug('Staging command SUCCEEDED', :tags => [:staging])
end
Fiber.new do
# Finalize staging here if all is well.
manager = AppManager.new(job[:app])
begin
if manager.app_still_exists? # Will reload app
# Save the app even if staging failed to display the log to the user
manager.package_staged_app(job[:staging_dir])
job[:app].package_state = 'FAILED' if status.exitstatus != 0
manager.save_staged_app_state
end
rescue => e
CloudController.logger.warn("Exception after return from staging: #{e}", :tags => [:staging])
CloudController.logger.error(e, :tags => [:staging])
ensure
FileUtils.rm_rf(job[:staging_dir])
FileUtils.rm_rf(job[:exploded_dir])
end
end.resume
# Clean up running reference
complete_running(job)
end
mark_running(job, pid)
end
end
end
def initialize(app)
@app = app
end
def run_staging_command(script, exploded_dir, staging_dir, env_json)
job = {
:app => @app,
:cmd => "#{script} #{exploded_dir} #{staging_dir} #{env_json} #{AppManager.staging_manifest_directory}",
:staging_dir => staging_dir,
:exploded_dir => exploded_dir
}
CloudController.logger.debug("Queueing staging command #{job[:cmd]}", :tags => [:staging])
AppManager.queue_staging_job(job)
end
def health_manager_message_received(payload)
CloudController.logger.debug("[HealthManager] Received #{payload[:op]} request for app #{app.id} - #{app.name}")
indices = payload[:indices]
message = new_message
case payload[:op]
when /START/i
# Check if App is started.
unless app.started?
CloudController.logger.debug("[HealthManager] App no longer running, ignoring")
return
end
# Only process start requests for current version.
unless app.generate_version == payload[:version]
CloudController.logger.debug("[HealthManager] Request for older version of app, ignoring")
return
end
CloudController.logger.debug("[HealthManager] Starting #{indices.length} missing instances for app: #{app.id}")
# FIXME - Check capacity
indices.each { |i| start_instance(message, i) }
when /STOP/i
# If HM detects older versions, let's clean up here versus suppressing
# and leaving old versions in the system. HM will start new ones if needed.
if payload[:last_updated] == app.last_updated
stop_msg = { :droplet => app.id, :instances => payload[:instances] }
NATS.publish('dea.stop', Yajl::Encoder.encode(stop_msg))
end
end
end
def once_app_is_staged
elapsed = perform_deferred_app_operation(360.0) do |app|
if app.staged? || app.staging_failed?
yield
true
end
end
end
def start_instances(start_message, index, max_to_start)
EM.next_tick do
f = Fiber.new do
message = start_message.dup
message[:executableUri] = download_app_uri(message[:executableUri])
message[:debug] = @app.metadata[:debug]
(index...max_to_start).each do |i|
message[:index] = i
dea_id = find_dea_for(message)
if dea_id
json = Yajl::Encoder.encode(message)
CloudController.logger.debug("Sending start message #{json} to DEA #{dea_id}")
NATS.publish("dea.#{dea_id}.start", json)
else
CloudController.logger.warn("No resources available to start instance #{json}")
end
end
end
f.resume
end
end
def started
once_app_is_staged do
save_staged_app_state # Bumps runcount
message = new_message
# Start a single instance on staging failure to display staging errors to user
num_to_start = app.staging_failed? ? 1 : app.instances
start_instances(message, 0, num_to_start)
end
end
def stopped
stop_all
end
def change_running_instances(delta)
return unless app.started?
message = new_message
if (delta > 0)
start_instances(message, app.instances - delta, app.instances)
else
indices = (app.instances...(app.instances - delta)).collect { |i| i }
stop_instances(indices)
end
end
def update_uris
return unless app.staged?
message = new_message
json = Yajl::Encoder.encode(message)
NATS.publish('dea.update', json)
end
def updated
once_app_is_staged do
unless app.staging_failed?
NATS.publish('droplet.updated', Yajl::Encoder.encode(:droplet => app.id))
end
end
end
def update_run_count
if app.staged_package_hash_changed?
app.run_count = 0 # reset
else
app.run_count += 1
end
end
def save_staged_app_state
update_run_count
if !app.save
errors = app.errors.full_messages
CloudController.logger.warn("App #{app.id} was not valid after attempted staging: #{errors.join(',')}", :tags => [:staging])
end
end
def manifest_for_framework(framework)
manifest_path = File.join(AppManager.staging_manifest_directory, "#{framework}.yml")
if File.exists?(manifest_path)
return StagingPlugin.load_manifest(manifest_path)
else
nil
end
end
def stage
return if app.package_hash.blank? || app.staged?
CloudController.logger.debug "app: #{app.id} Staging starting"
manifest = manifest_for_framework(app.framework)
unless manifest && manifest['runtimes']
raise CloudError.new(CloudError::APP_INVALID_FRAMEWORK, app.framework)
end
runtime = nil
manifest['runtimes'].each do |hash|
runtime ||= hash[app.runtime]
end
unless runtime
raise CloudError.new(CloudError::APP_INVALID_RUNTIME, app.runtime, app.framework)
end
env_json = Yajl::Encoder.encode(app.staging_environment)
app_source_dir = Dir.mktmpdir
app.explode_into(app_source_dir)
output_dir = Dir.mktmpdir
# Call the selected staging script without changing directories.
run_plugin_path = Rails.root.join('script', 'run_plugin.rb')
staging_script = "#{CloudController.current_ruby} #{run_plugin_path} #{app.framework}"
# Perform staging command
run_staging_command(staging_script, app_source_dir, output_dir, env_json)
once_app_is_staged do
CloudController.logger.debug "app: #{app.id} Staging complete"
end
rescue => e
CloudController.logger.error("Failed on exception! #{e}", :tags => [:staging])
CloudController.logger.error(e, :tags => [:staging])
app.package_state = 'FAILED'
save_staged_app_state
raise e # re-raise here to propogate to the API call.
end
# Returns an array of hashes containing 'index', 'state', 'since'(timestamp),
# 'debug_ip', and 'debug_port' for all instances running, or trying to run,
# the app.
def find_instances
return [] unless app.started?
instances = app.instances
indices = []
message = {
:droplet => app.id,
:version => app.generate_version,
:state => :FLAPPING
}
flapping_indices_json = NATS.timed_request('healthmanager.status', message.to_json, :timeout => 2).first
flapping_indices = Yajl::Parser.parse(flapping_indices_json, :symbolize_keys => true) rescue nil
if flapping_indices && flapping_indices[:indices]
flapping_indices[:indices].each do |entry|
index = entry[:index]
if index >= 0 && index < instances
indices[index] = {
:index => index,
:state => :FLAPPING,
:since => entry[:since]
}
end
end
end
message = {
:droplet => app.id,
:version => app.generate_version,
:states => ['STARTING', 'RUNNING']
}
expected_running_instances = instances - indices.length
if expected_running_instances > 0
opts = { :timeout => 2, :expected => expected_running_instances }
running_instances = NATS.timed_request('dea.find.droplet', message.to_json, opts)
running_instances.each do |instance|
instance_json = Yajl::Parser.parse(instance, :symbolize_keys => true) rescue nil
next unless instance_json
index = instance_json[:index] || instances
if index >= 0 && index < instances
indices[index] = {
:index => index,
:state => instance_json[:state],
:since => instance_json[:state_timestamp],
:debug_ip => instance_json[:debug_ip],
:debug_port => instance_json[:debug_port]
}
end
end
end
instances.times do |index|
index_entry = indices[index]
unless index_entry
indices[index] = { :index => index, :state => :DOWN, :since => Time.now.to_i }
end
end
indices
end
def find_crashes
crashes = []
message = {:droplet => app.id, :state => :CRASHED}
crashed_indices_json = NATS.timed_request('healthmanager.status', message.to_json, :timeout => 2).first
crashed_indices = Yajl::Parser.parse(crashed_indices_json, :symbolize_keys => true) rescue nil
crashes = crashed_indices[:instances] if crashed_indices
crashes
end
# TODO, this should be calling one generic find_instances
def find_specific_instance(options)
message = { :droplet => app.id }
message.merge!(options)
instance_json = NATS.timed_request('dea.find.droplet', message.to_json, :timeout => 2).first
instance = Yajl::Parser.parse(instance_json, :symbolize_keys => true) rescue nil
end
# TODO - This has a lot in common with 'find_instances'; at the very
# least, the 'fill remaining slots with 'DOWN' instances' code should
# be refactored out.
def find_stats
indices = {}
return indices if (app.nil? || !app.started?)
message = { :droplet => app.id, :version => app.generate_version,
:states => ['RUNNING'], :include_stats => true }
opt = { :timeout => 2, :expected => app.instances }
running_instances = NATS.timed_request('dea.find.droplet', message.to_json, opt)
running_instances.each do |instance|
instance_json = Yajl::Parser.parse(instance, :symbolize_keys => true)
index = instance_json[:index]
if index >= 0 && index < app.instances
indices[index] = {
:state => instance_json[:state],
:stats => instance_json[:stats]
}
end
end
app.instances.times do |index|
index_entry = indices[index]
unless index_entry
indices[index] = {
:state => :DOWN,
:since => Time.now.to_i
}
end
end
indices
end
def download_app_uri(path)
['http://', "#{CloudController.bind_address}:#{CloudController.external_port}", path].join
end
# start_instance involves several moving pieces, from sending requests for help to the
# dea_pool, to sending the actual start messages. In addition, many of these can be
# triggered by one update call, so we simply queue them for the next go around through
# the event loop with their own fiber context
def start_instance(message, index)
# Release any pending api call.
EM.next_tick do
wf = Fiber.new do
message = message.dup
message[:executableUri] = download_app_uri(message[:executableUri])
message[:index] = index
dea_id = find_dea_for(message)
if dea_id
json = Yajl::Encoder.encode(message)
CloudController.logger.debug("Sending start message #{json} to DEA #{dea_id}")
NATS.publish("dea.#{dea_id}.start", json)
else
CloudController.logger.warn("No resources available to start instance #{json}")
end
end
wf.resume
end
end
def find_dea_for(message)
find_dea_message = {
:droplet => message[:droplet],
:limits => message[:limits],
:name => message[:name],
:runtime => message[:runtime],
:sha => message[:sha1]
}
json_msg = Yajl::Encoder.encode(find_dea_message)
result = NATS.timed_request('dea.discover', json_msg, :timeout => 2).first
return nil if result.nil?
CloudController.logger.debug "Received #{result.inspect} in response to dea.discover request"
Yajl::Parser.parse(result, :symbolize_keys => true)[:id]
end
def stop_instances(indices)
stop_msg = { :droplet => app.id, :version => app.generate_version, :indices => indices }
NATS.publish('dea.stop', Yajl::Encoder.encode(stop_msg))
end
def stop_all
NATS.publish('dea.stop', Yajl::Encoder.encode(:droplet => app.id))
end
def get_file_url(instance, path=nil)
raise CloudError.new(CloudError::APP_STOPPED) if app.stopped?
search_options = {}
if instance =~ /^\d{1,10}$/
instance = instance.to_i
if instance < 0 || instance >= app.instances
raise CloudError.new(CloudError::APP_INSTANCE_NOT_FOUND, instance)
end
search_options[:indices] = [instance]
search_options[:states] = [:STARTING, :RUNNING, :CRASHED]
search_options[:version] = app.generate_version
else
search_options[:instance_ids] = [instance]
end
if instance = find_specific_instance(search_options)
["#{instance[:file_uri]}#{instance[:staged]}/#{path}", instance[:credentials]]
end
end
def perform_deferred_app_operation(time_limit = 30.0)
raise ArgumentError, "method requires a block" unless block_given?
start_time = Time.now
elapsed = 0.0
should_exit = false
until should_exit || elapsed > time_limit
break unless app_still_exists?
should_exit = yield(app)
elapsed = (Time.now - start_time)
fiber_sleep(0.5) unless should_exit
end
elapsed
end
def new_message
data = {:droplet => app.id, :name => app.name, :uris => app.mapped_urls}
data[:runtime] = app.runtime
data[:framework] = app.framework
data[:sha1] = app.staged_package_hash
data[:executableFile] = app.staged_package_path
data[:executableUri] = "/staged_droplets/#{app.id}/#{app.staged_package_hash}"
data[:version] = app.generate_version
data[:services] = app.service_bindings.map do |sb|
cfg = sb.service_config
svc = cfg.service
{ :name => cfg.alias,
:type => svc.synthesize_service_type,
:label => svc.label,
:vendor => svc.name,
:version => svc.version,
:tags => svc.tags,
:plan => cfg.plan,
:plan_option => cfg.plan_option,
:credentials => sb.credentials,
}
end
data[:limits] = app.limits
data[:env] = app.environment_variables
data[:users] = [app.owner.email] # XXX - should we collect all collabs here?
data
end
def app_still_exists?
@app && @app = App.uncached { App.find_by_id(@app.id) }
end
def fiber_sleep(secs)
f = Fiber.current
EM.add_timer(secs) { f.resume }
Fiber.yield
end
# Update the SHA1 stored for the app, repack the new bits, and mark the app as staged.
# repack does the right thing but needs a Fiber context, which will not be present here
def package_staged_app(staging_dir)
tmpdir = Dir.mktmpdir # we create the working directory ourselves so we can clean it up.
staged_file = AppPackage.repack_app_in(staging_dir, tmpdir, :tgz)
# Remove old one if needed
unless app.staged_package_hash.nil?
staged_package = File.join(AppPackage.package_dir, app.staged_package_hash)
FileUtils.rm_f(staged_package)
end
app.staged_package_hash = Digest::SHA1.file(staged_file).hexdigest
FileUtils.mv(staged_file, app.staged_package_path) unless File.exists?(app.staged_package_path)
app.package_state = 'STAGED'
rescue
app.package_state = 'FAILED'
ensure
FileUtils.rm_rf(tmpdir)
FileUtils.rm_rf(File.dirname(staged_file)) if staged_file
end
end