Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100644 835 lines (706 sloc) 28.139 kB
3ffc02a Initial commit
Cloud Foundry Engineer authored
1 # Copyright (c) 2009-2011 VMware, Inc.
2 module CloudController
3 require 'pathname'
4 require 'erb'
5 require 'yaml'
6 require 'fileutils'
7 require 'logger'
8 require 'optparse'
9 require 'set'
10
11 def self.root
12 @root ||= Pathname.new(File.expand_path('../../../cloud_controller', __FILE__))
13 end
14
15 def self.lib_dir
16 root.join('lib')
17 end
18
19 def self.common_lib_dir
20 File.expand_path('../../../lib', __FILE__)
21 end
22
23 # NOTE - Any models that rely on appconfig.yml settings must likewise
24 # be initialized by the Health Manager before they are used.
25 def self.all_models
26 Dir.glob(root.join('app', 'models', '*.rb'))
27 end
28
29 def self.setup
30 $:.unshift(lib_dir.to_s) unless $:.include?(lib_dir.to_s)
31 $:.unshift(common_lib_dir) unless $:.include?(common_lib_dir)
32 require root.join('config', 'boot')
33 require 'active_record'
34 require 'active_support/core_ext'
35 require 'yajl'
36 require 'eventmachine'
37 require 'nats/client'
38 require 'vcap/common'
39 require 'vcap/component'
0a6da44 Use the new logger in the health manager.
mpage authored
40 require 'vcap/logging'
738af7a HM now tracks NATS latency
Vadim Spivak authored
41 require 'vcap/rolling_metric'
1b5ffe0 @bnugmanov health manager START requests are queued in a PrioritySet: a priority…
bnugmanov authored
42 require 'vcap/priority_queue'
3ffc02a Initial commit
Cloud Foundry Engineer authored
43 all_models.each {|fn| require(fn)}
44
45 # This is needed for comparisons between the last_updated time of an app and the current time
46 # Time.zone = :utc
47 ActiveRecord::Base.time_zone_aware_attributes = true
48 ActiveRecord::Base.default_timezone = :utc
49 end
50
51 def self.load_yaml(path)
52 File.open(path, 'rb') do |fh|
53 yaml = ERB.new(fh.read).result(binding)
54 return YAML.load(yaml)
55 end
56 end
57 end
58
59 CloudController.setup
60 class HealthManager
61 VERSION = 0.98
62
63 attr_reader :database_scan, :droplet_lost, :droplets_analysis, :flapping_death, :flapping_timeout
64 attr_reader :restart_timeout, :stable_state, :droplets
f4ceed5 @bnugmanov [hm] [common] priority queue FIFO tests, bump version of vcap_common …
bnugmanov authored
65 attr_reader :request_queue
3ffc02a Initial commit
Cloud Foundry Engineer authored
66
67 # TODO - Oh these need comments so badly..
68 DOWN = 'DOWN'
69 STARTED = 'STARTED'
70 STOPPED = 'STOPPED'
71 CRASHED = 'CRASHED'
72 STARTING = 'STARTING'
73 RUNNING = 'RUNNING'
74 FLAPPING = 'FLAPPING'
75 DEA_SHUTDOWN = 'DEA_SHUTDOWN'
76 DEA_EVACUATION = 'DEA_EVACUATION'
77 APP_STABLE_STATES = Set.new([STARTED, STOPPED])
78 RUNNING_STATES = Set.new([STARTING, RUNNING])
79 RESTART_REASONS = Set.new([CRASHED, DEA_SHUTDOWN, DEA_EVACUATION])
80
71f3fe4 @bnugmanov hm: DEA_EVAC prioritization
bnugmanov authored
81
82 INFINITE_PRIORITY = 2_000_000_000
83
84
3ffc02a Initial commit
Cloud Foundry Engineer authored
85 def self.start(options)
86 health_manager = new(options)
87 health_manager.run
88 health_manager
89 end
90
91 def initialize(config)
92 @config = config
0a6da44 Use the new logger in the health manager.
mpage authored
93 VCAP::Logging.setup_from_config(config['logging'])
94 @logger = VCAP::Logging.logger('hm')
3ffc02a Initial commit
Cloud Foundry Engineer authored
95 @database_scan = config['intervals']['database_scan']
96 @droplet_lost = config['intervals']['droplet_lost']
16c97d3 @bnugmanov [hm] tests refactored to run faster, cleaner and eliminate timeout issue
bnugmanov authored
97 @droplets_analysis = config['intervals']['droplets_analysis'] || 10
3ffc02a Initial commit
Cloud Foundry Engineer authored
98 @flapping_death = config['intervals']['flapping_death']
99 @flapping_timeout = config['intervals']['flapping_timeout']
100 @restart_timeout = config['intervals']['restart_timeout']
101 @stable_state = config['intervals']['stable_state']
71f3fe4 @bnugmanov hm: DEA_EVAC prioritization
bnugmanov authored
102 @dequeueing_rate = config['dequeueing_rate'] || 50
3ffc02a Initial commit
Cloud Foundry Engineer authored
103 @database_environment = config['database_environment']
104
105 @droplets = {}
1b5ffe0 @bnugmanov health manager START requests are queued in a PrioritySet: a priority…
bnugmanov authored
106 @request_queue = VCAP::PrioritySet.new
3ffc02a Initial commit
Cloud Foundry Engineer authored
107
108 configure_database
109
110 if config['pid']
111 @pid_file = config['pid']
112 # Create pid file
113 begin
114 FileUtils.mkdir_p(File.dirname(@pid_file))
115 rescue => e
116 @logger.fatal "Can't create pid directory, exiting: #{e}"
117 end
118 File.open(@pid_file, 'wb') { |f| f.puts "#{Process.pid}" }
119 end
120 end
121
122 def encode_json(obj = {})
123 Yajl::Encoder.encode(obj)
124 end
125
126 def parse_json(string = '{}')
127 Yajl::Parser.parse(string)
128 end
129
130 def create_droplet_entry
131 { :versions => {}, :crashes => {} }
132 end
133
134 def create_index_entry
135 { :last_action => -1, :crashes => 0, :crash_timestamp => -1 }
136 end
137
138 def create_version_entry
139 { :indices => {} }
140 end
141
142 def run
143 @started = Time.now.to_i
144 register_error_handler
145
128b25e @derekcollison Fail fast on NATS disconnects and reconnect timeouts
derekcollison authored
146 NATS.on_error do |e|
34cff42 HM should exit when any NATS error happens
Vadim Spivak authored
147 @logger.error("NATS problem, #{e}")
0a6da44 Use the new logger in the health manager.
mpage authored
148 @logger.error(e)
34cff42 HM should exit when any NATS error happens
Vadim Spivak authored
149 exit!
128b25e @derekcollison Fail fast on NATS disconnects and reconnect timeouts
derekcollison authored
150 end
151
152 EM.error_handler do |e|
153 @logger.error "Eventmachine problem, #{e}"
154 @logger.error("#{e.backtrace.join("\n")}")
0a6da44 Use the new logger in the health manager.
mpage authored
155 @logger.error(e)
768d234 @yssk22 HealthManager should exit on detecting an error on eventmachine. For …
yssk22 authored
156 exit!
128b25e @derekcollison Fail fast on NATS disconnects and reconnect timeouts
derekcollison authored
157 end
3ffc02a Initial commit
Cloud Foundry Engineer authored
158
40193aa avoid starving the HM event loop so HM can login to NATS
Vadim Spivak authored
159 NATS.start(:uri => @config['mbus']) do
160 configure_timers
161 register_as_component
162 subscribe_to_messages
163 end
3ffc02a Initial commit
Cloud Foundry Engineer authored
164 end
165
166 # We use the CloudController database configuration
167 # if none is specified in our config file.
168 # By default, we connect to the development database.
169 def configure_database
170 env = @config['rails_environment'] || CloudController.environment
171 if @database_environment
172 config = @database_environment[env]
173 else
174 # using CloudController db configuration
175 config = AppConfig[:database_environment][env]
176 end
177 logger = Logger.new(STDOUT)
178 logger.level = Logger::INFO
179 establish_database_connection(config, logger)
180 end
181
182 def establish_database_connection(db_config, logger)
183 expand_database_path_for_sqlite3(db_config)
184 ActiveRecord::Base.establish_connection(db_config)
185 ActiveRecord::Base.logger = logger
186 logger.debug "Connected to CloudController database"
187 end
188
189 # If the adapter is sqlite3 and the path is relative, expand it
190 # in reference to the CloudController root.
191 def expand_database_path_for_sqlite3(db_config)
192 if db_config['adapter'] == 'sqlite3'
193 db_path = db_config['database']
194 unless db_path[0,1] == '/'
195 db_path = File.join(CloudController.root, db_path)
196 end
197 db_config['database'] = File.expand_path(db_path)
198 end
199 end
200
201 def shutdown
202 @logger.info('Shutting down.')
203 FileUtils.rm_f(@pid_file) if @pid_file
204 NATS.stop { EM.stop }
205 end
206
207 def analyze_app(app_id, droplet_entry, stats)
208 now = Time.now.to_i
209 update_timestamp = droplet_entry[:last_updated]
210 quiescent = (now - update_timestamp) > @stable_state
211 if APP_STABLE_STATES.include?(droplet_entry[:state]) && quiescent
212 extra_instances = []
213 missing_indices = []
214
215 droplet_entry[:crashes].delete_if do |_, crash_entry|
216 now - crash_entry[:timestamp] > @droplet_lost
217 end
218
219 droplet_entry[:versions].delete_if do |version, version_entry|
220 version_entry[:indices].delete_if do |index, index_entry|
221 if RUNNING_STATES.include?(index_entry[:state]) && now - index_entry[:timestamp] > @droplet_lost
222 index_entry[:state] = DOWN
223 index_entry[:state_timestamp] = now
224 end
225
9dbc7ea Log all actions take by the HM at the INFO level, provide a short des…
mpage authored
226 reason = nil
3ffc02a Initial commit
Cloud Foundry Engineer authored
227 if droplet_entry[:state] == STOPPED
228 extra_instance = true
9dbc7ea Log all actions take by the HM at the INFO level, provide a short des…
mpage authored
229 reason = "Droplet state is STOPPED."
3ffc02a Initial commit
Cloud Foundry Engineer authored
230 elsif index >= droplet_entry[:instances]
231 extra_instance = true
9dbc7ea Log all actions take by the HM at the INFO level, provide a short des…
mpage authored
232 reason = "Extra instance. Droplet should have #{droplet_entry[:instances]} instances running."
3ffc02a Initial commit
Cloud Foundry Engineer authored
233 elsif version != droplet_entry[:live_version]
234 extra_instance = true
9dbc7ea Log all actions take by the HM at the INFO level, provide a short des…
mpage authored
235 reason = "Live version mismatch. Live version is #{droplet_entry[:live_version]} instance version is #{version}."
3ffc02a Initial commit
Cloud Foundry Engineer authored
236 end
237
238 if RUNNING_STATES.include?(index_entry[:state]) && extra_instance
9dbc7ea Log all actions take by the HM at the INFO level, provide a short des…
mpage authored
239 @logger.info("Preparing to stop instance (app_id=#{app_id}, index=#{index}, instance=#{index_entry[:instance]}). Reason: #{reason}")
3ffc02a Initial commit
Cloud Foundry Engineer authored
240 extra_instances << index_entry[:instance]
241 elsif extra_instance
242 # stop tracking extra instances
243 true
244 end
245 end
246
247 # delete empty version entries for non live versions
248 if version_entry[:indices].empty?
249 droplet_entry[:state] == STOPPED || version != droplet_entry[:live_version]
250 end
251 end
252
253 if droplet_entry[:state] == STARTED
254 live_version_entry = droplet_entry[:versions][droplet_entry[:live_version]] || create_version_entry
255
cd94c4f HM now tracks stats per framework/runtime
Vadim Spivak authored
256 framework_stats = stats[:frameworks][droplet_entry[:framework]] ||= create_runtime_metrics
257 runtime_stats = stats[:runtimes][droplet_entry[:runtime]] ||= create_runtime_metrics
258
6034ef4 HealthManager varz changes
Vadim Spivak authored
259 framework_stats[:apps] += 1
260 runtime_stats[:apps] += 1
cd94c4f HM now tracks stats per framework/runtime
Vadim Spivak authored
261
262 framework_stats[:crashes] += droplet_entry[:crashes].length
263 runtime_stats[:crashes] += droplet_entry[:crashes].length
264
3ffc02a Initial commit
Cloud Foundry Engineer authored
265 index_entries = live_version_entry[:indices]
266 droplet_entry[:instances].times do |index|
267 index_entry = index_entries[index]
268 unless index_entry
269 index_entry = index_entries[index] = create_index_entry
270 index_entry[:state] = DOWN
271 index_entry[:state_timestamp] = now
272 end
273
cd94c4f HM now tracks stats per framework/runtime
Vadim Spivak authored
274 if RUNNING_STATES.include?(index_entry[:state])
275 stats[:running] += 1
276 framework_stats[:running_instances] += 1
277 runtime_stats[:running_instances] += 1
278 elsif index_entry[:state] == DOWN
279 stats[:down] += 1 if index_entry[:state] == DOWN
280 framework_stats[:missing_instances] += 1
281 runtime_stats[:missing_instances] += 1
282 elsif index_entry[:state] == FLAPPING
283 framework_stats[:flapping_instances] += 1
284 runtime_stats[:flapping_instances] += 1
285 end
3ffc02a Initial commit
Cloud Foundry Engineer authored
286
287 if index_entry[:state] == DOWN && now - index_entry[:last_action] > @restart_timeout
9dbc7ea Log all actions take by the HM at the INFO level, provide a short des…
mpage authored
288 @logger.info("Preparing to restart instance (app_id=#{app_id}, index=#{index}). Reason: droplet state is STARTED, but instance state is DOWN.")
3ffc02a Initial commit
Cloud Foundry Engineer authored
289 index_entry[:last_action] = now
290 missing_indices << index
291 end
292 end
293 end
294
295 # don't act if we were looking at a stale droplet
9dbc7ea Log all actions take by the HM at the INFO level, provide a short des…
mpage authored
296 if update_droplet(App.find_by_id(app_id))
297 if missing_indices.any? || extra_instances.any?
298 @logger.info("Droplet information is stale for app id #{app_id}, not taking action.")
299 @logger.info("(#{missing_indices.length} instances need to be started, #{extra_instances.length} instances need to be stopped.)")
300 end
301 return
302 end
3ffc02a Initial commit
Cloud Foundry Engineer authored
303
304 if missing_indices.any?
305 start_instances(app_id, missing_indices)
306 end
307 if extra_instances.any?
308 stop_instances(app_id, extra_instances)
309 end
310 end
311 end
312
313 def analyze_all_apps(collect_stats = true)
314 start = Time.now
315 instances = crashed = 0
6034ef4 HealthManager varz changes
Vadim Spivak authored
316 stats = {:running => 0, :down => 0, :frameworks => {}, :runtimes => {}}
cd94c4f HM now tracks stats per framework/runtime
Vadim Spivak authored
317
3ffc02a Initial commit
Cloud Foundry Engineer authored
318 @droplets.each do |id, droplet_entry|
319 analyze_app(id, droplet_entry, stats) if collect_stats
320 instances += droplet_entry[:instances]
321 crashed += droplet_entry[:crashes].size if droplet_entry[:crashes]
322 end
323
324 VCAP::Component.varz[:total_apps] = @droplets.size
325 VCAP::Component.varz[:total_instances] = instances
326 VCAP::Component.varz[:crashed_instances] = crashed
327
328 if collect_stats
329 VCAP::Component.varz[:running_instances] = stats[:running]
6034ef4 HealthManager varz changes
Vadim Spivak authored
330 VCAP::Component.varz[:down_instances] = stats[:down]
331 VCAP::Component.varz[:running][:frameworks] = stats[:frameworks]
332 VCAP::Component.varz[:running][:runtimes] = stats[:runtimes]
9dbc7ea Log all actions take by the HM at the INFO level, provide a short des…
mpage authored
333 @logger.info("Analyzed #{stats[:running]} running and #{stats[:down]} down apps in #{elapsed_time_in_ms(start)}")
3ffc02a Initial commit
Cloud Foundry Engineer authored
334 else
9dbc7ea Log all actions take by the HM at the INFO level, provide a short des…
mpage authored
335 @logger.info("Analyzed #{@droplets.size} apps in #{elapsed_time_in_ms(start)}")
3ffc02a Initial commit
Cloud Foundry Engineer authored
336 end
337 end
338
cd94c4f HM now tracks stats per framework/runtime
Vadim Spivak authored
339 def create_runtime_metrics
340 {
6034ef4 HealthManager varz changes
Vadim Spivak authored
341 :apps => 0,
cd94c4f HM now tracks stats per framework/runtime
Vadim Spivak authored
342 :crashes => 0,
343 :running_instances => 0,
344 :missing_instances => 0,
345 :flapping_instances => 0
346 }
347 end
348
6034ef4 HealthManager varz changes
Vadim Spivak authored
349 def create_db_metrics
350 {
351 :apps => 0,
352 :started_apps => 0,
353 :instances => 0,
354 :started_instances => 0,
355 :memory => 0,
356 :started_memory => 0
357 }
358 end
359
3ffc02a Initial commit
Cloud Foundry Engineer authored
360 def elapsed_time_in_ms(start)
361 elapsed_ms = (Time.now - start) * 1000
362 "#{'%.1f' % elapsed_ms}ms"
363 end
364
365 def process_updated_message(message)
366 VCAP::Component.varz[:droplet_updated_msgs_received] += 1
367 droplet_id = parse_json(message)['droplet']
368 update_droplet App.find_by_id(droplet_id)
369 end
370
371 def process_exited_message(message)
372 VCAP::Component.varz[:droplet_exited_msgs_received] += 1
373 now = Time.now.to_i
374
375 exit_message = parse_json(message)
376 droplet_id = exit_message['droplet']
377 version = exit_message['version']
378 index = exit_message['index']
379 instance = exit_message['instance']
380
381 droplet_entry = @droplets[droplet_id]
382 index_entry = nil
383
384 if droplet_entry
385 version_entry = droplet_entry[:versions][version]
386 if version_entry
387 index_entry = version_entry[:indices][index]
388 end
389
390 if version == droplet_entry[:live_version] && index >= 0 && index < droplet_entry[:instances]
391 unless version_entry
392 version_entry = droplet_entry[:versions][version] = create_version_entry
393 end
394
395 unless index_entry
396 index_entry = version_entry[:indices][index] = create_index_entry
397 index_entry[:instance] = instance
398 end
399
400 if index_entry[:instance].nil? || index_entry[:instance] == instance ||
401 !RUNNING_STATES.include?(index_entry[:state])
402 if RESTART_REASONS.include?(exit_message['reason'])
403 if index_entry[:crash_timestamp] > 0 && now - index_entry[:crash_timestamp] > @flapping_timeout
404 index_entry[:crashes] = 0
405 index_entry[:crash_timestamp] = -1
406 end
407
408 if exit_message['reason'] == CRASHED
409 index_entry[:crashes] += 1
410 index_entry[:crash_timestamp] = now
411 end
412
413 if index_entry[:crashes] > @flapping_death
414 index_entry[:state] = FLAPPING
415 index_entry[:state_timestamp] = Time.now.to_i
416 else
417 index_entry[:state] = DOWN
418 index_entry[:state_timestamp] = Time.now.to_i
419 index_entry[:last_action] = now
71f3fe4 @bnugmanov hm: DEA_EVAC prioritization
bnugmanov authored
420
421 high_priority = (exit_message['reason'] == DEA_EVACUATION)
422
9dbc7ea Log all actions take by the HM at the INFO level, provide a short des…
mpage authored
423 @logger.info("Preparing to start instance (app_id=#{droplet_id}, index=#{index}). Reason: Instance exited with reason '#{exit_message['reason']}'.")
71f3fe4 @bnugmanov hm: DEA_EVAC prioritization
bnugmanov authored
424 start_instances(droplet_id, [index], high_priority)
3ffc02a Initial commit
Cloud Foundry Engineer authored
425 end
426 end
427 end
428 elsif index_entry
429 version_entry[:indices].delete(index)
430 droplet_entry[:versions].delete(version) if version_entry[:indices].empty?
431 end
432
433 if exit_message['reason'] == CRASHED
434 droplet_entry[:crashes][instance] = {
435 :timestamp => Time.now.to_i,
436 :crash_timestamp => exit_message['crash_timestamp']
437 }
438 end
439 end
d6a32e8 @richardcloudsoft Add health_manager spec: should restart an instance that exits unexpe…
richardcloudsoft authored
440
441 droplet_entry # return the droplet that we changed. This allows the spec tests to ensure the behaviour is correct.
3ffc02a Initial commit
Cloud Foundry Engineer authored
442 end
443
444 def process_heartbeat_message(message)
445 VCAP::Component.varz[:heartbeat_msgs_received] += 1
ee8f6d7 @richardcloudsoft Add health_manager spec: should update its internal state to reflect …
richardcloudsoft authored
446 result = []
3ffc02a Initial commit
Cloud Foundry Engineer authored
447 parse_json(message)['droplets'].each do |heartbeat|
448 droplet_id = heartbeat['droplet']
449 instance = heartbeat['instance']
450 droplet_entry = @droplets[droplet_id]
451 if droplet_entry
ee8f6d7 @richardcloudsoft Add health_manager spec: should update its internal state to reflect …
richardcloudsoft authored
452 result << droplet_entry
3ffc02a Initial commit
Cloud Foundry Engineer authored
453 state = heartbeat['state']
454 if RUNNING_STATES.include?(state)
455 version_entry = droplet_entry[:versions][heartbeat['version']]
456 unless version_entry
457 version_entry = droplet_entry[:versions][heartbeat['version']] = create_version_entry
458 end
459
460 index_entry = version_entry[:indices][heartbeat['index']]
461 unless index_entry
462 index_entry = version_entry[:indices][heartbeat['index']] = create_index_entry
463 end
464
465 if index_entry[:state] == RUNNING && index_entry[:instance] != instance
466 stop_instances(droplet_id, [instance])
467 else
468 index_entry[:instance] = instance
469 index_entry[:timestamp] = Time.now.to_i
470 index_entry[:state] = state.to_s
471 index_entry[:state_timestamp] = heartbeat['state_timestamp']
472 end
473 elsif state == CRASHED
474 droplet_entry[:crashes][instance] = {
475 :timestamp => Time.now.to_i,
476 :crash_timestamp => heartbeat['state_timestamp']
477 }
478 end
479 else
480 instance_uptime = Time.now.to_i - heartbeat['state_timestamp']
481 health_manager_uptime = Time.now.to_i - @started
482 threshold = @database_scan * 2
483
484 if health_manager_uptime > threshold && instance_uptime > threshold
9dbc7ea Log all actions take by the HM at the INFO level, provide a short des…
mpage authored
485 @logger.info("Stopping unknown app: #{droplet_id}/#{instance}.")
3ffc02a Initial commit
Cloud Foundry Engineer authored
486 stop_instances(droplet_id, [instance])
487 end
488 end
489 end
ee8f6d7 @richardcloudsoft Add health_manager spec: should update its internal state to reflect …
richardcloudsoft authored
490
491 result # return the droplets that we changed. This allows the spec tests to ensure the behaviour is correct.
3ffc02a Initial commit
Cloud Foundry Engineer authored
492 end
493
494 def process_health_message(message, reply)
495 VCAP::Component.varz[:healthmanager_health_request_msgs_received] += 1
496 message_json = parse_json(message)
497 droplets = message_json['droplets']
498 exchange = message_json['exchange']
499 droplets.each do |droplet|
500 droplet_id = droplet['droplet']
501
502 droplet_entry = @droplets[droplet_id]
503 if droplet_entry
504 version = droplet['version']
505 version_entry = droplet_entry[:versions][version]
506 running = 0
507 if version_entry
508 version_entry[:indices].each_value do |index_entry|
509 running += 1 if index_entry[:state] == RUNNING
510 end
511 end
512
513 response_json = encode_json(:droplet => droplet_id, :version => version, :healthy => running)
514 NATS.publish(reply, response_json)
515 end
516 end
517 end
518
519 def process_status_message(message, reply)
520 VCAP::Component.varz[:healthmanager_status_msgs_received] += 1
521 message_json = JSON.parse(message)
522 droplet_id = message_json['droplet']
523 droplet_entry = @droplets[droplet_id]
524
525 if droplet_entry
526 state = message_json['state']
527 if state == FLAPPING
528 version = message_json['version']
529 result = []
530 version_entry = droplet_entry[:versions][version]
531 if version_entry
532 version_entry[:indices].each do |index, index_entry|
533 if index_entry[:state] == FLAPPING
534 result << {
535 :index => index,
536 :since => index_entry[:state_timestamp]
537 }
538 end
539 end
540 end
541 NATS.publish(reply, {:indices => result}.to_json)
542
543 elsif state == CRASHED
544 result = []
545 droplet_entry[:crashes].each do |instance, crash_entry|
546 result << {
547 :instance => instance,
548 :since => crash_entry[:crash_timestamp]
549 }
550 end
551 NATS.publish(reply, {:instances => result}.to_json)
552 end
553 end
554 end
555
556 def update_from_db
557 start = Time.now
558 old_droplet_ids = Set.new(@droplets.keys)
559 App.all.each do |droplet|
560 old_droplet_ids.delete(droplet.id)
561 update_droplet(droplet)
562 end
563 old_droplet_ids.each {|id| @droplets.delete(id)}
564 # TODO - Devise a version of the below that works with vast numbers of apps and users.
565 VCAP::Component.varz[:total_users] = User.count
566 VCAP::Component.varz[:users] = User.all_email_addresses.map {|e| {:email => e}}
567 VCAP::Component.varz[:apps] = App.health_manager_representations
9dbc7ea Log all actions take by the HM at the INFO level, provide a short des…
mpage authored
568 @logger.info("Database scan took #{elapsed_time_in_ms(start)} and found #{@droplets.size} apps")
cd94c4f HM now tracks stats per framework/runtime
Vadim Spivak authored
569
570 start = Time.now
571
6034ef4 HealthManager varz changes
Vadim Spivak authored
572 VCAP::Component.varz[:total] = {
573 :frameworks => {},
574 :runtimes => {}
575 }
cd94c4f HM now tracks stats per framework/runtime
Vadim Spivak authored
576
6034ef4 HealthManager varz changes
Vadim Spivak authored
577 App.count(:group => ["framework", "runtime", "state"]).each do |grouping, count|
578 framework, runtime, state = grouping
579
580 framework_stats = VCAP::Component.varz[:total][:frameworks][framework] ||= create_db_metrics
581 framework_stats[:apps] += count
582 framework_stats[:started_apps] += count if state == "STARTED"
cd94c4f HM now tracks stats per framework/runtime
Vadim Spivak authored
583
6034ef4 HealthManager varz changes
Vadim Spivak authored
584 runtime_stats = VCAP::Component.varz[:total][:runtimes][runtime] ||= create_db_metrics
585 runtime_stats[:apps] += count
586 runtime_stats[:started_apps] += count if state == "STARTED"
cd94c4f HM now tracks stats per framework/runtime
Vadim Spivak authored
587 end
588
6034ef4 HealthManager varz changes
Vadim Spivak authored
589 App.sum(:instances, :group => ["framework", "runtime", "state"]).each do |grouping, count|
590 framework, runtime, state = grouping
591
592 framework_stats = VCAP::Component.varz[:total][:frameworks][framework] ||= create_db_metrics
593 framework_stats[:instances] += count
594 framework_stats[:started_instances] += count if state == "STARTED"
595
596 runtime_stats = VCAP::Component.varz[:total][:runtimes][runtime] ||= create_db_metrics
597 runtime_stats[:instances] += count
598 runtime_stats[:started_instances] += count if state == "STARTED"
cd94c4f HM now tracks stats per framework/runtime
Vadim Spivak authored
599 end
600
6034ef4 HealthManager varz changes
Vadim Spivak authored
601 App.sum("instances * memory", :group => ["framework", "runtime", "state"]).each do |grouping, count|
602 # memory is stored as a string
603 count = count.to_i
604 framework, runtime, state = grouping
605
606 framework_stats = VCAP::Component.varz[:total][:frameworks][framework] ||= create_db_metrics
607 framework_stats[:memory] += count
608 framework_stats[:started_memory] += count if state == "STARTED"
609
610
611 runtime_stats = VCAP::Component.varz[:total][:runtimes][runtime] ||= create_db_metrics
612 runtime_stats[:memory] += count
613 runtime_stats[:started_memory] += count if state == "STARTED"
cd94c4f HM now tracks stats per framework/runtime
Vadim Spivak authored
614 end
615
616 @logger.info("Database stat scan took #{elapsed_time_in_ms(start)}")
3ffc02a Initial commit
Cloud Foundry Engineer authored
617 end
618
619 def droplet_version(droplet)
620 "#{droplet.staged_package_hash}-#{droplet.run_count}"
621 end
622
623 def update_droplet(droplet)
624 return true unless droplet
625
626 droplet_entry = @droplets[droplet.id]
627 unless droplet_entry
628 droplet_entry = create_droplet_entry
629 @droplets[droplet.id] = droplet_entry
630 end
631 entry_updated = droplet_entry[:last_updated] != droplet.last_updated
632
633 droplet_entry[:instances] = droplet.instances
cd94c4f HM now tracks stats per framework/runtime
Vadim Spivak authored
634 droplet_entry[:framework] = droplet.framework
635 droplet_entry[:runtime] = droplet.runtime
3ffc02a Initial commit
Cloud Foundry Engineer authored
636 droplet_entry[:state] = droplet.state.upcase
637 droplet_entry[:last_updated] = droplet.last_updated
638 droplet_entry[:live_version] = droplet_version(droplet)
639
640 entry_updated
641 end
642
71f3fe4 @bnugmanov hm: DEA_EVAC prioritization
bnugmanov authored
643 def start_instances(droplet_id, indices, high_priority = false)
3ffc02a Initial commit
Cloud Foundry Engineer authored
644 droplet_entry = @droplets[droplet_id]
645 start_message = {
646 :droplet => droplet_id,
647 :op => :START,
648 :last_updated => droplet_entry[:last_updated],
649 :version => droplet_entry[:live_version],
650 :indices => indices
651 }
1b5ffe0 @bnugmanov health manager START requests are queued in a PrioritySet: a priority…
bnugmanov authored
652
653 if queue_requests?
71f3fe4 @bnugmanov hm: DEA_EVAC prioritization
bnugmanov authored
654 queue_request(start_message, high_priority)
1b5ffe0 @bnugmanov health manager START requests are queued in a PrioritySet: a priority…
bnugmanov authored
655 else
656 #old behavior: send the message immediately
657 NATS.publish('cloudcontrollers.hm.requests', start_message.to_json)
658 @logger.info("Requesting the start of extra instances: #{start_message}")
659 end
660 end
661
71f3fe4 @bnugmanov hm: DEA_EVAC prioritization
bnugmanov authored
662 def queue_request(message, high_priority)
1b5ffe0 @bnugmanov health manager START requests are queued in a PrioritySet: a priority…
bnugmanov authored
663 #the priority is higher for older items, to de-prioritize flapping items
664 priority = Time.now.to_i - message[:last_updated]
665 priority = 0 if priority < 0 #avoid timezone drama
71f3fe4 @bnugmanov hm: DEA_EVAC prioritization
bnugmanov authored
666 priority = INFINITE_PRIORITY if high_priority
1b5ffe0 @bnugmanov health manager START requests are queued in a PrioritySet: a priority…
bnugmanov authored
667 key = message.clone
668 key.delete :last_updated
669 @logger.info("Queueing priority '#{priority}' request: #{message}, using key: #{key}. Queue size: #{@request_queue.size}")
670 @request_queue.insert(message, priority, key)
3ffc02a Initial commit
Cloud Foundry Engineer authored
671 end
672
673 def stop_instances(droplet_id, instances)
674 droplet_entry = @droplets[droplet_id]
675 last_updated = droplet_entry ? droplet_entry[:last_updated] : 0
676 stop_message = {
677 :droplet => droplet_id,
678 :op => :STOP,
679 :last_updated => last_updated,
680 :instances => instances
681 }.to_json
682 NATS.publish('cloudcontrollers.hm.requests', stop_message)
9dbc7ea Log all actions take by the HM at the INFO level, provide a short des…
mpage authored
683 @logger.info("Requesting the stop of extra instances: #{stop_message}")
3ffc02a Initial commit
Cloud Foundry Engineer authored
684 end
685
686 def register_error_handler
687 EM.error_handler { |e|
688 if e.kind_of? NATS::Error
689 @logger.error("NATS problem, #{e}")
690 exit
691 else
692 @logger.error "Eventmachine problem, #{e}"
693 @logger.error "#{e.backtrace.join("\n")}"
694 end
695 }
696 end
697
698 def configure_timers
699 EM.next_tick { update_from_db }
700 EM.add_periodic_timer(@database_scan) { update_from_db }
701
702 # Do first pass without the individual analysis
703 EM.next_tick { analyze_all_apps(collect_stats = false) }
704
705 # Start the droplet analysis timer after the droplet lost timeout to make sure all the heartbeats came in.
706 EM.add_timer(@droplet_lost) do
707 EM.add_periodic_timer(@droplets_analysis) { analyze_all_apps }
708 end
738af7a HM now tracks NATS latency
Vadim Spivak authored
709
1b5ffe0 @bnugmanov health manager START requests are queued in a PrioritySet: a priority…
bnugmanov authored
710 if queue_requests?
71f3fe4 @bnugmanov hm: DEA_EVAC prioritization
bnugmanov authored
711 EM.add_periodic_timer(1) do
f4ceed5 @bnugmanov [hm] [common] priority queue FIFO tests, bump version of vcap_common …
bnugmanov authored
712 deque_a_batch_of_requests
71f3fe4 @bnugmanov hm: DEA_EVAC prioritization
bnugmanov authored
713 end
714 end
715 end
716
f4ceed5 @bnugmanov [hm] [common] priority queue FIFO tests, bump version of vcap_common …
bnugmanov authored
717 def deque_a_batch_of_requests(num_requests=@dequeueing_rate)
71f3fe4 @bnugmanov hm: DEA_EVAC prioritization
bnugmanov authored
718 num_requests.times do
719 unless @request_queue.empty?
720 #TODO: if STOP requests are also queued, refactor this to be generic, particularly the log message
721 start_message = encode_json(@request_queue.remove)
722 NATS.publish('cloudcontrollers.hm.requests', start_message)
723 @logger.info("Requesting the start of missing instances: #{start_message}")
724 VCAP::Component.varz[:queue_length] = @request_queue.size
1b5ffe0 @bnugmanov health manager START requests are queued in a PrioritySet: a priority…
bnugmanov authored
725 end
726 end
3ffc02a Initial commit
Cloud Foundry Engineer authored
727 end
728
729 def register_as_component
4a54ead @dieu Merge of "Fix passing params to status server"
dieu authored
730 status_config = @config['status'] || {}
3ffc02a Initial commit
Cloud Foundry Engineer authored
731 VCAP::Component.register(:type => 'HealthManager',
732 :host => VCAP.local_ip(@config['local_route']),
738af7a HM now tracks NATS latency
Vadim Spivak authored
733 :index => @config['index'],
4a54ead @dieu Merge of "Fix passing params to status server"
dieu authored
734 :config => @config,
735 :port => status_config['port'],
736 :user => status_config['user'],
737 :password => status_config['password'])
3ffc02a Initial commit
Cloud Foundry Engineer authored
738
739 # Initialize VCAP component varzs..
740 VCAP::Component.varz[:total_apps] = 0
741 VCAP::Component.varz[:total_users] = 0
742 VCAP::Component.varz[:total_instances] = 0
743
744 # These will get processed after a small delay..
745 VCAP::Component.varz[:running_instances] = -1
746 VCAP::Component.varz[:crashed_instances] = -1
1b5ffe0 @bnugmanov health manager START requests are queued in a PrioritySet: a priority…
bnugmanov authored
747
3ffc02a Initial commit
Cloud Foundry Engineer authored
748 VCAP::Component.varz[:down_instances] = -1
749
4b33718 @bnugmanov add healthmanager queue size to varz
bnugmanov authored
750 VCAP::Component.varz[:queue_length] = 0
751
6034ef4 HealthManager varz changes
Vadim Spivak authored
752 VCAP::Component.varz[:total] = {
753 :frameworks => {},
754 :runtimes => {}
755 }
756
757 VCAP::Component.varz[:running] = {
758 :frameworks => {},
759 :runtimes => {}
760 }
cd94c4f HM now tracks stats per framework/runtime
Vadim Spivak authored
761
3ffc02a Initial commit
Cloud Foundry Engineer authored
762 VCAP::Component.varz[:heartbeat_msgs_received] = 0
763 VCAP::Component.varz[:droplet_exited_msgs_received] = 0
764 VCAP::Component.varz[:droplet_updated_msgs_received] = 0
765 VCAP::Component.varz[:healthmanager_status_msgs_received] = 0
766 VCAP::Component.varz[:healthmanager_health_request_msgs_received] = 0
767 @logger.info("Starting VCAP Health Manager (#{VERSION})")
768 end
769
770 def subscribe_to_messages
771 # Now we have something worth cleaning up at shutdown.
772 trap('TERM') { shutdown }
773 trap('INT') { shutdown }
774
775 NATS.subscribe('dea.heartbeat') do |message|
776 @logger.debug("heartbeat: #{message}")
777 process_heartbeat_message(message)
778 end
779
780 NATS.subscribe('droplet.exited') do |message|
781 @logger.debug("droplet.exited: #{message}")
782 process_exited_message(message)
783 end
784
785 NATS.subscribe('droplet.updated') do |message|
786 @logger.debug("droplet.updated: #{message}")
787 process_updated_message(message)
788 end
789
790 NATS.subscribe('healthmanager.status') do |message, reply|
791 @logger.debug("healthmanager.status: #{message}")
792 process_status_message(message, reply)
793 end
794
795 NATS.subscribe('healthmanager.health') do |message, reply|
796 @logger.debug("healthmanager.health: #{message}")
797 process_health_message(message, reply)
798 end
799
800 NATS.publish('healthmanager.start')
801 end
1b5ffe0 @bnugmanov health manager START requests are queued in a PrioritySet: a priority…
bnugmanov authored
802
803 def queue_requests?
71f3fe4 @bnugmanov hm: DEA_EVAC prioritization
bnugmanov authored
804 @dequeueing_rate != 0
1b5ffe0 @bnugmanov health manager START requests are queued in a PrioritySet: a priority…
bnugmanov authored
805 end
3ffc02a Initial commit
Cloud Foundry Engineer authored
806 end
807
808 if $0 == __FILE__ || File.expand_path($0) == File.expand_path(File.join(File.dirname(__FILE__), '../bin/health_manager'))
809
c429c62 @mahpat Run cloud foundry components installed by chef scripts
mahpat authored
810 config_path = ENV["CLOUD_FOUNDRY_CONFIG_PATH"] || File.join(File.dirname(__FILE__), '../config')
811 config_file = File.join(config_path, "health_manager.yml")
3ffc02a Initial commit
Cloud Foundry Engineer authored
812 options = OptionParser.new do |opts|
813 opts.banner = 'Usage: healthmanager [OPTIONS]'
814 opts.on("-c", "--config [ARG]", "Configuration File") do |opt|
815 config_file = opt
816 end
817 opts.on("-h", "--help", "Help") do
818 puts opts
819 exit
820 end
821 end
822 options.parse!(ARGV.dup)
823
824 begin
825 config = YAML.load_file(config_file)
826 rescue => e
827 $stderr.puts "Could not read configuration file: #{e}"
828 exit 1
829 end
830
831 EM.epoll
832
833 EM.run { HealthManager.start(config) }
834 end
Something went wrong with that request. Please try again.