diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..81aeea8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +/vendor/cache/* +*~ +.idea +\#*\# +.\#* +.bundle +bundler +spec_reports +spec_coverage +ci-artifacts-dir +ci-working-dir +*.rbc +*.swp +.rvmrc +*.pid diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..63ca258 --- /dev/null +++ b/Gemfile @@ -0,0 +1,13 @@ +source "http://rubygems.org" + + +gem "rake" +gem "rspec" + +gem "vcap_common", ">= 1.0.8" +gem "vcap_logging" + +gem 'eventmachine', :git => 'git://github.com/cloudfoundry/eventmachine.git', :branch => 'release-0.12.11-cf' +gem "yajl-ruby" +gem "rest-client" +gem "em-http-request" diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..98e7d5b --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,64 @@ +GIT + remote: git://github.com/cloudfoundry/eventmachine.git + revision: 2806c630d8631d5dcf9fb2555f665b829052aabe + branch: release-0.12.11-cf + specs: + eventmachine (0.12.11.cloudfoundry.3) + +GEM + remote: http://rubygems.org/ + specs: + addressable (2.2.7) + daemons (1.1.8) + diff-lcs (1.1.3) + em-http-request (0.3.0) + addressable (>= 2.0.0) + escape_utils + eventmachine (>= 0.12.9) + escape_utils (0.2.4) + json_pure (1.6.6) + mime-types (1.18) + nats (0.4.22) + daemons (>= 1.1.4) + eventmachine (>= 0.12.10) + json_pure (>= 1.6.1) + thin (>= 1.3.1) + posix-spawn (0.3.6) + rack (1.4.1) + rake (0.9.2.2) + rest-client (1.6.7) + mime-types (>= 1.16) + rspec (2.9.0) + rspec-core (~> 2.9.0) + rspec-expectations (~> 2.9.0) + rspec-mocks (~> 2.9.0) + rspec-core (2.9.0) + rspec-expectations (2.9.1) + diff-lcs (~> 1.1.3) + rspec-mocks (2.9.0) + thin (1.3.1) + daemons (>= 1.0.9) + eventmachine (>= 0.12.6) + rack (>= 1.0.0) + vcap_common (1.0.10) + eventmachine (~> 0.12.11.cloudfoundry.3) + nats (~> 0.4.22.beta.8) + posix-spawn (~> 0.3.6) + thin (~> 1.3.1) + yajl-ruby (~> 0.8.3) + vcap_logging (0.1.4) + rake + yajl-ruby (0.8.3) + +PLATFORMS + ruby + +DEPENDENCIES + em-http-request + eventmachine! + rake + rest-client + rspec + vcap_common (>= 1.0.8) + vcap_logging + yajl-ruby diff --git a/README.md b/README.md index bb3ccc8..b87e162 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ issued for missing/extra instances, respectively. Additionally, Health Manager collects and exposes statistics and health status for individual applications, as well as aggregates for -frameworks, runtimes, etc. (WIP at this time) +frameworks, runtimes, etc. ## AppState diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..f5cd1dd --- /dev/null +++ b/Rakefile @@ -0,0 +1,7 @@ +require "rspec/core/rake_task" +require "rspec/core/version" + +desc "Run all examples" +RSpec::Core::RakeTask.new(:spec) do |t| + t.rspec_opts = %w[--color --format documentation] +end diff --git a/bin/bulk_util.rb b/bin/bulk_util.rb new file mode 100755 index 0000000..5ee6e41 --- /dev/null +++ b/bin/bulk_util.rb @@ -0,0 +1,28 @@ +#!/usr/bin/env ruby +home = File.join(File.dirname(__FILE__),'..') +ENV['BUNDLE_GEMFILE'] = "#{home}/Gemfile" + +require 'rubygems' +require 'bundler/setup' +require File.join(home, 'lib','health_manager') + +trap('INT') { NATS.stop { EM.stop }} +trap('SIGTERM') { NATS.stop { EM.stop }} + + +EM::run { + + NATS.start :uri => ENV['NATS_URI'] || 'nats://nats:nats@192.168.24.128:4222' do + config = { + 'bulk' => {'host'=> ENV['BULK_URL'] || 'api.vcap.me', 'batch_size' => '2'}, + } + VCAP::Logging.setup_from_config({'level'=>ENV['LOG_LEVEL'] || 'debug'}) + + prov = HealthManager::BulkBasedExpectedStateProvider.new(config) + prov.each_droplet do |id, droplet| + puts "Droplet #{id}:" + puts droplet.inspect + end + EM.add_timer(5) { EM.stop { NATS.stop } } + end +} diff --git a/bin/health_manager b/bin/health_manager new file mode 100755 index 0000000..98becf8 --- /dev/null +++ b/bin/health_manager @@ -0,0 +1,14 @@ +#!/usr/bin/env ruby +home = File.join(File.dirname(__FILE__),'..') + +ENV['BUNDLE_GEMFILE'] = "#{home}/Gemfile" +require 'bundler/setup' + +require File.join(home, 'lib','health_manager') + +hm = HealthManager::Manager.new() + +trap('INT') { hm.shutdown } +trap('SIGTERM') { hm.shutdown } + +hm.start diff --git a/config/health_manager.yml b/config/health_manager.yml new file mode 100644 index 0000000..4c76c9b --- /dev/null +++ b/config/health_manager.yml @@ -0,0 +1,47 @@ +--- +# Local_route is the IP address of a well known server on your network, it +# is used to choose the right ip address (think of hosts that have multiple nics +# and IP addresses assigned to them) of the host running the Health Manager. Default +# value of nil, should work in most cases. +# local_route: 127.0.0.1 + +# NATS message bus URI +mbus: nats://nats:nats@192.168.24.128:4222/ +logging: + level: warn +pid: /var/vcap/sys/run/healthmanager.pid + +queue_batch_size: 10 + +intervals: + # Interval for collecting statistics about this cloudfoundry instance. + # Amongst other things, data collected includes number of users, number of + # applications and memory usage. + database_scan: 10 + # Time to wait before starting analysis for stopped applications. + droplet_lost: 30 + # Interval between scans for analysis of applications. + droplets_analysis: 5 + # An application is deemed to be flapping if it is found to be in a crashed + # state (after a restart following every crash) for more than "flapping_death" + # number of times in an interval that is "flapping_timeout" long. + flapping_death: 2 + flapping_timeout: 180 + # Time to wait before trying to restart an application after a crash is + # detected + restart_timeout: 20 + # Time to wait before analyzing the state of an application that has been + # started/restarted + stable_state: 60 + +#number of start requests send each second (subject to EM timer limitations) +#default value is 50. +dequeueing_rate: 50 + +# Used for /healthz and /vars endpoints. If not provided random +# values will be generated on component start. Uncomment to use +# static values. +status: + port: 54321 + user: thin + password: thin diff --git a/lib/health_manager.rb b/lib/health_manager.rb new file mode 100644 index 0000000..6dac572 --- /dev/null +++ b/lib/health_manager.rb @@ -0,0 +1,136 @@ +# HealthManager 2.0. (c) 2011-2012 VMware, Inc. +$:.unshift(File.dirname(__FILE__)) + +require 'yaml' +require 'yajl' +require 'optparse' +require 'time' +require 'nats/client' + +require 'vcap/common' +require 'vcap/component' +require 'vcap/logging' +require 'vcap/priority_queue' + +require 'health_manager/constants' +require 'health_manager/common' +require 'health_manager/app_state' +require 'health_manager/app_state_provider' +require 'health_manager/nats_based_known_state_provider' +require 'health_manager/bulk_based_expected_state_provider' +require 'health_manager/scheduler' +require 'health_manager/nudger' +require 'health_manager/harmonizer' +require 'health_manager/varz_common' +require 'health_manager/varz' +require 'health_manager/reporter' + +module HealthManager + class Manager + include HealthManager::Common + #primarily for testing + attr_reader :scheduler + attr_reader :known_state_provider + attr_reader :expected_state_provider + + def initialize(config={}) + args = parse_args + @config = read_config_from_file(args[:config_file]).merge(config) + + @logging_config = @config['logging'] + @logging_config = {'level' => ENV['LOG_LEVEL']} if ENV['LOG_LEVEL'] #ENV override + @logging_config ||= {'level' => 'info'} #fallback value + + VCAP::Logging.setup_from_config(@logging_config) + + logger.info("HealthManager: initializing") + + @varz = Varz.new(@config) + @reporter = Reporter.new(@config) + @scheduler = Scheduler.new(@config) + @known_state_provider = AppStateProvider.get_known_state_provider(@config) + @expected_state_provider = AppStateProvider.get_expected_state_provider(@config) + @nudger = Nudger.new(@config) + @harmonizer = Harmonizer.new(@config) + + register_hm_components + end + + def register_as_vcap_component + + logger.info("registering VCAP component") + logger.debug("config: #{sanitized_config}") + + status_config = @config['status'] || {} + VCAP::Component.register(:type => 'HealthManager', + :host => VCAP.local_ip(@config['local_route']), + :index => @config['index'], + :config => sanitized_config, + :port => status_config['port'], + :user => status_config['user'], + :password => status_config['password']) + + end + + def create_pid_file + @pid_file = @config['pid'] + begin + FileUtils.mkdir_p(File.dirname(@pid_file)) + rescue => e + logger.fatal("Can't create pid directory, exiting: #{e}") + end + File.open(@pid_file, 'wb') { |f| f.puts "#{Process.pid}" } + logger.debug("pid file written: #{@pid_file}") + end + + def start + logger.info("starting...") + + EM.epoll + NATS.start :uri => get_nats_uri do + @varz.prepare + @reporter.prepare + @harmonizer.prepare + @expected_state_provider.start + @known_state_provider.start + + unless ENV[HM_SHADOW]=='false' + logger.info("creating Shadower") + @shadower = Shadower.new(@config) + @shadower.subscribe + end + + register_as_vcap_component + create_pid_file if @config['pid'] + + @scheduler.start #blocking call + end + end + + def shutdown + logger.info("shutting down...") + NATS.stop { EM.stop } + logger.info("...good bye.") + end + + def read_config_from_file(config_file) + config_path = ENV['CLOUD_FOUNDRY_CONFIG_PATH'] || File.join(File.dirname(__FILE__),'../config') + config_file ||= File.join(config_path, 'health_manager.yml') + begin + config = YAML.load_file(config_file) + rescue => e + $stderr.puts "Could not read configuration file #{config_file}: #{e}" + exit 1 + end + config + end + + def get_nats_uri + ENV[NATS_URI] || @config['mbus'] + end + + def self.now + Time.now.to_i + end + end +end diff --git a/lib/health_manager/app_state.rb b/lib/health_manager/app_state.rb new file mode 100644 index 0000000..a01bd57 --- /dev/null +++ b/lib/health_manager/app_state.rb @@ -0,0 +1,221 @@ +require 'set' + +module HealthManager + #this class provides answers about droplet's State + class AppState + include HealthManager::Common + class << self + + def known_event_types + [ + :missing_instances, + :extra_instances, + :exit_crashed, + :exit_stopped, + :exit_dea, + :droplet_updated, + ] + end + + def add_listener(event_type, &block) + check_event_type(event_type) + @listeners ||= {} + @listeners[event_type] ||= [] + @listeners[event_type] << block + end + + def notify_listener(event_type, app_state, *args) + check_event_type(event_type) + listeners = @listeners[event_type] || [] + listeners.each do |block| + block.call(app_state, *args) + end + end + + def check_event_type(event_type) + raise ArgumentError, "Unknown event type: #{event_type}" unless known_event_types.include?(event_type) + end + + def remove_all_listeners + @listeners = {} + end + attr_accessor :heartbeat_deadline + attr_accessor :flapping_timeout + end + + def initialize(id) + @id = id + @num_instances = 0 + @versions = {} + @crashes = {} + reset_missing_indices + end + attr_reader :id + attr_accessor :state + attr_accessor :live_version + attr_accessor :num_instances + attr_accessor :framework, :runtime + attr_accessor :last_updated + attr_accessor :versions, :crashes + + def notify(event_type, *args) + self.class.notify_listener(event_type, self, *args) + end + + def to_json(*a) + { "json_class" => self.class.name, + }.merge(self.instance_variables.inject({}) {|h,v| + h[v] = self.instance_variable_get(v); h + }).to_json(*a) + end + + def process_heartbeat(beat) + events = [] + + instance = get_instance(beat['version'], beat['index']) + instance['last_heartbeat'] = now + + instance['timestamp'] = now + %w(instance state_timestamp state).each { |key| + instance[key] = beat[key] + } + + events.each { |event| notify(*event) } + end + + def check_for_missing_indices + unless reset_recently? or missing_indices.empty? + notify :missing_instances, missing_indices + reset_missing_indices + end + end + + def check_and_prune_extra_indices + extra_instances = [] + + @versions.delete_if do |version, version_entry | + version_entry['instances'].delete_if do |index, instance| # deleting extra instances + + if RUNNING_STATES.include?(instance['state']) && + timestamp_older_than?(instance['timestamp'], + AppState.heartbeat_deadline) + instance['state'] = DOWN + instance['state_timestamp'] = now + end + + extra = [[@state == STOPPED, 'Droplet state is STOPPED'], + [index >= @num_instances, 'Extra instance'], + [version != @live_version, 'Live version mismatch'] + ].find { |condition, _| condition } + + if extra + if RUNNING_STATES.include?(instance['state']) + reason = extra.last + extra_instances << [instance['instance'], reason] + end + true + else + false + end + end + + if version_entry['instances'].empty? + @state == STOPPED || version != @live_version #for delete_if + end + end + + unless extra_instances.empty? + notify :extra_instances, extra_instances + end + end + + def reset_missing_indices + @reset_timestamp = now + end + + def missing_indices + return [] unless @state == STARTED + + (0...num_instances).find_all { |i| + instance = get_instance(live_version, i) + lhb = instance['last_heartbeat'] + [ + instance['state'] == CRASHED, + lhb.nil?, + lhb && timestamp_older_than?(lhb, AppState.heartbeat_deadline) + ].any? + } + end + + def prune_crashes + @crashes.delete_if { |_, crash| + timestamp_older_than?(crash['timestamp'], AppState.flapping_timeout) + } + end + + def num_instances= val + @num_instances = val + reset_missing_indices + @num_instances + end + + #check for all anomalies and trigger appropriate events so that listeners can take action + def analyze + check_for_missing_indices + check_and_prune_extra_indices + prune_crashes + end + + def reset_recently? + timestamp_fresher_than?(@reset_timestamp, AppState.heartbeat_deadline || 0) + end + + def process_exit_dea(message) + notify(:exit_dea, message) + end + + def process_exit_stopped(message) + reset_missing_indices + @state = STOPPED + notify(:exit_stopped, message) + end + + def process_exit_crash(message) + instance = get_instance(message['version'],message['index']) + instance['state'] = CRASHED + + instance['crashes'] = 0 if timestamp_older_than?(instance['crash_timestamp'], AppState.flapping_timeout) + + instance['crashes'] += 1 + instance['crash_timestamp'] = message['crash_timestamp'] + + @crashes[instance['instance']] = { + 'timestamp' => now, + 'crash_timestamp' => message['crash_timestamp'] + } + notify :exit_crashed, message + end + + def process_droplet_updated(message) + reset_missing_indices + notify :droplet_updated, message + end + + def get_version(version) + @versions[version] ||= {'instances'=>{}} + end + + def get_instances(version) + get_version(version)['instances'] + end + + def get_instance(version, index) + get_instances(version)[index] ||= { + 'state' => DOWN, + 'crashes' => 0, + 'crash_timestamp' => -1, + 'last_action' => -1 + } + end + end +end diff --git a/lib/health_manager/app_state_provider.rb b/lib/health_manager/app_state_provider.rb new file mode 100644 index 0000000..0d4048d --- /dev/null +++ b/lib/health_manager/app_state_provider.rb @@ -0,0 +1,78 @@ +require 'set' + +module HealthManager + #base class for providing states of applications. Concrete + #implementations will use different data sources to obtain and/or + #persists the state of apps. This class serves as data holder and + #interface provider for its users (i.e. HealthManager). + class AppStateProvider + include HealthManager::Common + + def initialize(config={}) + @config = config + @droplets = {} #hashes droplet_id => AppState instance + @cur_droplet_index = 0 + end + + attr_reader :droplets + + def start; end + + #these methods have to do with threading and quantization + def rewind + @cur_droplet_index = 0 + end + + def next_droplet + return nil unless @cur_droplet_index < @droplets.size + droplet = @droplets[@droplets.keys[@cur_droplet_index]] + @cur_droplet_index += 1 + return droplet + end + + def has_droplet?(id) + @droplets.has_key?(id.to_i) + end + + def get_droplet(id) + id = id.to_i + @droplets[id] ||= AppState.new(id) + end + + def get_state(id) + get_droplet(id).state + end + + def set_state(id, state) + get_droplet(id).state = state + end + + class << self + def get_known_state_provider(config={}) + new_configured_class(config, 'known_state_provider', NatsBasedKnownStateProvider) + end + + def get_expected_state_provider(config={}) + new_configured_class(config, 'expected_state_provider', BulkBasedExpectedStateProvider) + end + + def new_configured_class(config, config_key, default_class) + klass_name = config[config_key] || config[config_key.to_s] || config[config_key.to_sym] + klass = ::HealthManager.const_get(klass_name) if klass_name && ::HealthManager.const_defined?(klass_name) + klass ||= default_class + klass.new(config) + end + end + end + + # "abstract" provider of expected state. Primarily for documenting the API + class ExpectedStateProvider < AppStateProvider + def set_expected_state(known, expected) + raise 'Not Implemented' #should be implemented by the concrete class + end + end + + # "abstract" provider of known state. Primarily for documenting the API + class KnownStateProvider < AppStateProvider + end +end diff --git a/lib/health_manager/bulk_based_expected_state_provider.rb b/lib/health_manager/bulk_based_expected_state_provider.rb new file mode 100644 index 0000000..5790387 --- /dev/null +++ b/lib/health_manager/bulk_based_expected_state_provider.rb @@ -0,0 +1,104 @@ +require 'em-http' + +module HealthManager + #this implementation will use the REST(ish) BulkAPI to + #interrogate the CloudController on the expected state of the apps + #the API should allow for non-blocking operation + class BulkBasedExpectedStateProvider < ExpectedStateProvider + + def each_droplet(&block) + process_next_batch({},&block) + end + + def set_expected_state(known, expected) + logger.debug { "bulk: #set_expected_state: known: #{known.inspect} expected: #{expected.inspect}" } + + known.num_instances = expected['instances'] + known.state = expected['state'] + known.live_version = "#{expected['staged_package_hash']}-#{expected['run_count']}" + known.framework = expected['framework'] + known.runtime = expected['runtime'] + known.last_updated = parse_utc(expected['updated_at']) + end + + private + def process_next_batch(bulk_token,&block) + with_credentials do |user, password| + options = { + :head => { 'authorization' => [user,password] }, + :query => { + 'batch_size' => batch_size, + 'bulk_token' => bulk_token.to_json + }, + } + http = EM::HttpRequest.new(app_url).get(options) + http.callback { + if http.response_header.status != 200 + logger.error("bulk: request problem. Response: #{http.response_header} #{http.response}") + varz.reset_expected_stats + varz.publish_expected_stats + next + end + + response = parse_json(http.response) + bulk_token = response['bulk_token'] + batch = response['results'] + + if batch.nil? || batch.empty? + varz.publish_expected_stats + logger.info("bulk: done") + next + end + + logger.debug { "bulk: batch of size #{batch.size} received" } + + batch.each do |app_id, droplet| + varz.update_expected_stats_for_droplet(droplet) + block.call(app_id, droplet) + end + process_next_batch(bulk_token, &block) + } + http.errback { + logger.error("problem talking to bulk API at #{app_url}") + varz.reset_expected_stats + varz.publish_expected_stats + @user = @password = nil #ensure re-acquisition of credentials + } + end + end + + def host + (@config['bulk'] && @config['bulk']['host']) || "api.vcap.me" + end + + def batch_size + (@config['bulk'] && @config['bulk']['batch_size']) || "50" + end + + def app_url + url = "#{host}/bulk/apps" + url = "http://"+url unless url.start_with?("http://") + url + end + + def with_credentials + if @user && @password + yield @user, @password + else + logger.info("bulk: requesting API credentials over NATS...") + sid = NATS.request('cloudcontroller.bulk.credentials') do |response| + logger.info("bulk: API credentials received.") + auth = parse_json(response) + @user = auth[:user] || auth['user'] + @password = auth[:password] || auth['password'] + yield @user, @password + end + + NATS.timeout(sid, + get_param_from_config_or_constant(:nats_request_timeout,@config)) do + logger.error("bulk: NATS timeout getting bulk api credentials. Request ignored.") + end + end + end + end +end diff --git a/lib/health_manager/common.rb b/lib/health_manager/common.rb new file mode 100644 index 0000000..8197c75 --- /dev/null +++ b/lib/health_manager/common.rb @@ -0,0 +1,120 @@ +module HealthManager::Common + + def interval(name) + get_interval_from_config_or_constant(name, @config) + end + + def get_interval_from_config_or_constant(name, config) + intervals = config[:intervals] || config['intervals'] || {} + get_param_from_config_or_constant(name,intervals) + end + + def get_param_from_config_or_constant(name, config) + value = config[name] || config[name.to_sym] || config[name.to_s] + unless value + const_name = name.to_s.upcase + if HealthManager.const_defined?( const_name ) + value = HealthManager.const_get( const_name ) + end + end + raise ArgumentError, "undefined parameter #{name}" unless value + logger.debug("config: #{name}: #{value}") + value + end + + HealthManager::COMPONENTS.each do |name| + define_method name do + find_hm_component(name) + end + end + + def register_hm_components + HealthManager::COMPONENTS.each { |name| + component = self.instance_variable_get("@#{name}") + register_hm_component(name, component) + } + end + + def register_hm_component(name, component) + hm_registry[name] = component + end + + def find_hm_component(name) + unless component = hm_registry[name] + raise ArgumentError, "component #{name} can't be found in the registry #{@config}" + end + component + end + + def hm_registry + @config[:health_manager_component_registry] ||= {} + end + + def sanitized_config + config = @config.dup + config.delete(:health_manager_component_registry) + config + end + + def parse_args + results = {} + options = OptionParser.new do |opts| + opts.banner = "Usage: health_manager [OPTIONS]" + opts.on("-c", "--config [ARG]", "Configuration File") do |opt| + results[:config_file] = opt + end + + opts.on("-h", "--help", "Help") do + puts opts + exit + end + end + options.parse!(ARGV.dup) + results + end + + def read_config_from_file(config_file) + config = {} + begin + config = File.open(config_file) do |f| + YAML.load(f) + end + rescue => e + puts "Could not read configuration file: #{e}" + exit + end + config + end + + def logger + @logger ||= get_logger + end + + def get_logger + VCAP::Logging.logger('healthmanager') + end + + def encode_json(obj={}) + Yajl::Encoder.encode(obj) + end + + def parse_json(string='{}') + Yajl::Parser.parse(string) + end + + def timestamp_fresher_than?(timestamp, age) + timestamp > 0 && now - timestamp < age + end + + def timestamp_older_than?(timestamp, age) + timestamp > 0 && now - timestamp > age + end + + def now + ::HealthManager::Manager.now + end + + def parse_utc(time) + Time.parse(time).to_i + end +end diff --git a/lib/health_manager/constants.rb b/lib/health_manager/constants.rb new file mode 100644 index 0000000..ce71fea --- /dev/null +++ b/lib/health_manager/constants.rb @@ -0,0 +1,54 @@ +require 'set' + +module HealthManager + + COMPONENTS = [:manager, + :harmonizer, + :known_state_provider, + :expected_state_provider, + :scheduler, + :nudger, + :varz, + :reporter, + ] + + #restart priorities + LOW_PRIORITY = 1 + NORMAL_PRIORITY = 1000 + HIGH_PRIORITY = 1_000_000 + + MAX_HEARTBEATS_SAVED = 5 + QUEUE_BATCH_SIZE = 40 + + #intervals + EXPECTED_STATE_UPDATE = 10 + ANALYSIS_DELAY = 5 + DROPLET_ANALYSIS = 10 + DROPLET_LOST = 30 + POSTPONE = 2 + REQUEST_QUEUE = 1 + NATS_REQUEST_TIMEOUT = 5 + RUN_LOOP_INTERVAL = 2 + FLAPPING_TIMEOUT = 60 + FLAPPING_DEATH = 1 + + + #app states + DOWN = 'DOWN' + STARTED = 'STARTED' + STOPPED = 'STOPPED' + CRASHED = 'CRASHED' + STARTING = 'STARTING' + RUNNING = 'RUNNING' + FLAPPING = 'FLAPPING' + DEA_SHUTDOWN = 'DEA_SHUTDOWN' + DEA_EVACUATION = 'DEA_EVACUATION' + APP_STABLE_STATES = Set.new([STARTED, STOPPED]) + RUNNING_STATES = Set.new([STARTING, RUNNING]) + RESTART_REASONS = Set.new([CRASHED, DEA_SHUTDOWN, DEA_EVACUATION]) + + #environment options + NATS_URI = 'NATS_URI' + LOG_LEVEL = 'LOG_LEVEL' + HM_SHADOW = 'HM_SHADOW' +end diff --git a/lib/health_manager/harmonizer.rb b/lib/health_manager/harmonizer.rb new file mode 100644 index 0000000..22144f3 --- /dev/null +++ b/lib/health_manager/harmonizer.rb @@ -0,0 +1,151 @@ +# This class describes in a declarative manner the policy that HealthManager is implementing. +# It describes a set of rules that recognize certain conditions (e.g. missing instances, etc) and +# initiates certain actions (e.g. restarting the missing instances) + +module HealthManager + class Harmonizer + include HealthManager::Common + def initialize(config = {}) + @config = config + end + + def add_logger_listener(event) + AppState.add_listener(event) do |*args| + logger.debug { "app_state: event: #{event}: #{args}" } + end + end + + def prepare + logger.debug { "harmonizer: #prepare" } + + #set system-wide configurations + AppState.heartbeat_deadline = interval(:droplet_lost) + AppState.flapping_timeout = interval(:flapping_timeout) + + + #set up listeners for anomalous events to respond with correcting actions + AppState.add_listener(:missing_instances) do |app_state| + logger.info { "harmonizer: missing_instances"} + nudger.start_missing_instances(app_state,NORMAL_PRIORITY) + end + + AppState.add_listener(:extra_instances) do |app_state, extra_instances| + logger.info { "harmonizer: extra_instances"} + nudger.stop_instances_immediately(app_state, extra_instances) + end + + AppState.add_listener(:exit_dea) do |app_state, message| + index = message['index'] + + logger.info { "harmonizer: exit_dea: app_id=#{app_state.id} index=#{index}" } + nudger.start_instance(app_state, index, HIGH_PRIORITY) + end + + AppState.add_listener(:exit_crashed) do |app_state, message| + + index = message['index'] + logger.info { "harmonizer: exit_crashed" } + + if flapping?(app_state, message['version'], message['index']) + # TODO: implement delayed restarts + else + nudger.start_instance(app_state,index,LOW_PRIORITY) + # app_state.reset_missing_indices + end + end + + AppState.add_listener(:droplet_updated) do |*args| + logger.info { "harmonizer: droplet_updated" } + update_expected_state + end + + #schedule time-based actions + + scheduler.immediately { update_expected_state } + + scheduler.at_interval :request_queue do + nudger.deque_batch_of_requests + end + + scheduler.at_interval :expected_state_update do + update_expected_state + end + + scheduler.after_interval :droplet_lost do + scheduler.at_interval :droplet_analysis do + analyze_all_apps + end + end + end + + def flapping?(droplet, version, index) + instance = droplet.get_instance(version, index) + if instance['crashes'] > interval(:flapping_death) + instance['state'] = FLAPPING + end + instance['state'] == FLAPPING + end + + def analyze_all_apps + if scheduler.task_running? :droplet_analysis + logger.warn("Droplet analysis still in progress. Consider increasing droplet_analysis interval.") + return + end + + logger.debug { "harmonizer: droplet_analysis" } + + varz.reset_realtime_stats + known_state_provider.rewind + + scheduler.start_task :droplet_analysis do + known_droplet = known_state_provider.next_droplet + if known_droplet + known_droplet.analyze + varz.update_realtime_stats_for_droplet(known_droplet) + true + else + # TODO: remove + varz.set(:droplets, known_state_provider.droplets) + varz.publish_realtime_stats + + # TODO: add elapsed time + logger.info ["harmonizer: Analyzed #{varz.get(:running_instances)} running ", + "#{varz.get(:down_instances)} down instances"].join + false #signal :droplet_analysis task completion to the scheduler + end + end + end + + def update_expected_state + logger.debug { "harmonizer: expected_state_update pre-check" } + + if expected_state_update_in_progress? + postpone_expected_state_update + return + end + + varz.reset_expected_stats + expected_state_provider.each_droplet do |app_id, expected| + known = known_state_provider.get_droplet(app_id) + expected_state_provider.set_expected_state(known, expected) + end + end + + def postpone_expected_state_update + if @postponed + logger.info("harmonizer: update_expected_state is currently running, and a postponed one is already scheduled. Ignoring.") + else + logger.info("harmonizer: postponing expected_state_update") + @postponed = scheduler.after_interval :postpone do + logger.info("harmonizer: starting postponed expected_state_update") + @postponed = nil + update_expected_state + end + end + end + + def expected_state_update_in_progress? + varz.held?(:total) + end + end +end diff --git a/lib/health_manager/nats_based_known_state_provider.rb b/lib/health_manager/nats_based_known_state_provider.rb new file mode 100644 index 0000000..ae1bbcb --- /dev/null +++ b/lib/health_manager/nats_based_known_state_provider.rb @@ -0,0 +1,68 @@ + +module HealthManager + + #this implementation maintains the known state by listening to the + #DEA heartbeat messages + class NatsBasedKnownStateProvider < KnownStateProvider + def initialize(config={}) + @config = config + super + end + + def start + logger.info("subscribing to heartbeats") + NATS.subscribe('dea.heartbeat') do |message| + process_heartbeat(message) + end + + logger.info("subscribing to droplet.exited") + NATS.subscribe('droplet.exited') do |message| + process_droplet_exited(message) + end + + logger.info("subscribing to droplet.updated") + NATS.subscribe('droplet.updated') do |message| + process_droplet_updated(message) + end + + super + end + + def process_droplet_exited(message) + logger.debug {"process_droplet_exited: #{message}"} + varz.inc(:droplet_exited_msgs_received) + message = parse_json(message) + droplet = get_droplet(message['droplet']) + + case message['reason'] + when CRASHED + varz.inc(:crashed_instances) + droplet.process_exit_crash(message) + when DEA_SHUTDOWN, DEA_EVACUATION + droplet.process_exit_dea(message) + when STOPPED + droplet.process_exit_stopped(message) + end + end + + def process_heartbeat(message) + logger.debug {"known: #process_heartbeat: #{message}"} + varz.inc(:heartbeat_msgs_received) + + message = parse_json(message) + dea_uuid = message['dea'] + + message['droplets'].each do |beat| + id = beat['droplet'] + get_droplet(id).process_heartbeat(beat) + end + end + + def process_droplet_updated(message) + logger.debug {"known: #process_droplet_updated: #{message}" } + varz.inc(:droplet_updated_msgs_received) + message = parse_json(message) + get_droplet(message['droplet']).process_droplet_updated(message) + end + end +end diff --git a/lib/health_manager/nudger.rb b/lib/health_manager/nudger.rb new file mode 100644 index 0000000..f529308 --- /dev/null +++ b/lib/health_manager/nudger.rb @@ -0,0 +1,120 @@ +module HealthManager + class Nudger + include HealthManager::Common + + def initialize( config={} ) + @config = config + @queue = VCAP::PrioritySet.new + @queue_batch_size = get_param_from_config_or_constant(:queue_batch_size, @config) + end + + def deque_batch_of_requests + @queue_batch_size.times do |i| + break if @queue.empty? + message = encode_json(@queue.remove) + + + if ['false','mixed'].include? ENV[HM_SHADOW] + publish_request_message(message) + else + logger.info("nudger: SHADOW: cloudcontrollers.hm.requests: #{message}") + end + end + end + + def publish_request_message(message) + logger.info("nudger: NATS.publish: cloudcontrollers.hm.requests: #{message}") + NATS.publish('cloudcontrollers.hm.requests', message) + end + + def start_missing_instances(app, priority) + start_instances(app, app.missing_indices, priority) + end + + def start_instance(app, index, priority) + start_instances(app, [index], priority) + end + + def start_instances(app, indicies, priority) + logger.debug { "nudger: starting instances #{indicies} for #{app.id} priority: #{priority}" } + message = { + :droplet => app.id, + :op => :START, + :last_updated => app.last_updated, + :version => app.live_version, + :indices => indicies + } + queue(message, priority) + end + + def stop_instances_immediately(app, instances_and_reasons) + + publish_request_message(make_stop_message(app, instances_and_reasons.map {|instance, reason| instance })) + end + + def stop_instance(app, instance, priority) + logger.debug { "nudger: stopping instance: app: #{app.id} instance: #{instance}" } + queue(make_stop_message(app,instance),priority) + end + + def make_stop_message(app, instance) + { + :droplet => app.id, + :op => :STOP, + :last_updated => app.last_updated, + :instances => instance + } + end + + private + def queue(message, priority) + logger.debug { "nudger: queueing: #{message}, #{priority}" } + priority ||= NORMAL_PRIORITY + key = message.clone.delete(:last_updated) + @queue.insert(message, priority, key) + varz.set(:queue_length, @queue.size) + end + end + + class Shadower + include HealthManager::Common + + def initialize(config = {}) + @received = {} + end + + def subscribe + subscribe_on('healthmanager.start') + subscribe_on('cloudcontrollers.hm.requests') + + ['status','health'].each do |m| + subj = "healthmanager.#{m}" + logger.info("shadower: subscribing: #{subj}") + NATS.subscribe(subj) do |message, reply| + logger.info("shadower: received: #{subj}: #{message}") + subscribe_on(reply, "#{subj}.reply") + end + end + end + + def subscribe_on(subj, topic = nil) + topic ||= subj + logger.info("shadower: subscribing: #{subj}/#{topic}") + + #"subjet" is NATS subject. "Topic" is the label for the bin. + #they are the same except for NATS "reply-to" subjects. + + NATS.subscribe(subj) do |message| + logger.info{"shadower: received: #{subj}/#{topic}: #{message}"} + + @received[topic] ||= [] + @received[topic] << message + if @received[topic].size > 1000 + @received[topic] = @received[topic][500..-1] + end + + NATS.unsubscribe(subj) if subj!=topic #unsubscribe from "reply-to" subjects + end + end + end +end diff --git a/lib/health_manager/reporter.rb b/lib/health_manager/reporter.rb new file mode 100644 index 0000000..721e9be --- /dev/null +++ b/lib/health_manager/reporter.rb @@ -0,0 +1,74 @@ +# Responds to status messages, publishes varz and healthz through VCAP::Component + +module HealthManager + class Reporter + include HealthManager::Common + def initialize(config={}) + @config = config + end + + def prepare + NATS.subscribe('healthmanager.status') { |msg, reply| + process_status_message(msg,reply) + } + NATS.subscribe('healthmanager.health') { |msg, reply| + + process_health_message(msg,reply) + } + end + + def process_status_message(message, reply) + varz.inc(:healthmanager_status_msgs_received) + message = parse_json(message) + logger.debug { "reporter: status: message: #{message}" } + droplet_id = message['droplet'] + + return unless known_state_provider.has_droplet?(droplet_id) + known_droplet = known_state_provider.get_droplet(droplet_id) + state = message['state'] + + result = nil + case state + + when FLAPPING + version = message['version'] + result = known_droplet.get_instances(version). + select { |i, instance| + FLAPPING == instance['state'] + }.map { |i, instance| + { :index => i, :since => instance['state_timestamp'] } + } + NATS.publish(reply, {:indices => result}.to_json) + when CRASHED + result = known_droplet.crashes.map { |instance, crash| + { :instance => instance, :since => crash['crash_timestamp'] } + } + NATS.publish(reply, {:instances => result}.to_json) + end + end + + def process_health_message(message, reply) + varz.inc(:healthmanager_health_request_msgs_received) + message = parse_json(message) + message['droplets'].each do |droplet| + droplet_id = droplet['droplet'] + + next unless known_state_provider.has_droplet?(droplet_id) + + version = droplet['version'] + known_droplet = known_state_provider.get_droplet(droplet_id) + + running = (0...known_droplet.num_instances).count { |i| + RUNNING == known_droplet.get_instance(version, i)['state'] + } + response = { + :droplet => droplet_id, + :version => version, + :healthy => running + } + NATS.publish(reply, encode_json(response)) + end + end + + end +end diff --git a/lib/health_manager/scheduler.rb b/lib/health_manager/scheduler.rb new file mode 100644 index 0000000..d91a143 --- /dev/null +++ b/lib/health_manager/scheduler.rb @@ -0,0 +1,111 @@ +module HealthManager + class Scheduler + include HealthManager::Common + + def initialize( config={} ) + @config = config + @schedule = [] + @last_receipt = 0 + @receipt_to_timer = {} + @running_tasks = {} + @run_loop_interval = get_param_from_config_or_constant(:run_loop_interval, @config) + end + + def schedule( options, &block) + raise ArgumentError unless options.length == 1 + raise ArgumentError, 'block required' unless block_given? + arg = options.first + sendee = { + :periodic => [:add_periodic_timer], + :timer => [:add_timer], + }[arg.first] + + raise ArgumentError,"Unknown scheduling keyword, please use :immediate, :periodic or :timer" unless sendee + sendee << arg[1] + receipt = get_receipt + @schedule << [block, sendee, receipt] + receipt + end + + def after_interval(interval_name, &block) + after(interval(interval_name), &block) + end + + def at_interval(interval_name, &block) + every(interval(interval_name), &block) + end + + def every(interval, &block) + schedule(:periodic => interval, &block) + end + + def after(interval, &block) + schedule(:timer => interval, &block) + end + + def immediately(&block) + EM.next_tick(&block) + end + + def run + until @schedule.empty? + block, sendee, receipt = @schedule.shift + @receipt_to_timer[receipt] = EM.send(*sendee, &block) + end + + EM.add_timer(@run_loop_interval) { run } + end + + def cancel(receipt) + if @receipt_to_timer.has_key?(receipt) + EM.cancel_timer(@receipt_to_timer.delete(receipt)) + else + @schedule.reject! { |_,_,r| (r == receipt) } + end + end + + def quantize_task(task, &block) + if yield + EM.next_tick { quantize_task( task, &block) } + else + mark_task_stopped(task) + end + end + + def start_task(task, &block) + return if task_running?(task) + mark_task_started(task) + quantize_task(task, &block) + end + + def mark_task_started(task) + @running_tasks[task] = :started + end + + def mark_task_stopped(task) + raise ArgumentError, "task #{task} not started" unless @running_tasks.delete(task) + end + def task_running?(task) + @running_tasks[task] == :started + end + + def start + if EM.reactor_running? + run + else + EM.run do + run + end + end + end + + def stop + EM.stop if EM.reactor_running? + end + + private + def get_receipt + @last_receipt += 1 + end + end +end diff --git a/lib/health_manager/varz.rb b/lib/health_manager/varz.rb new file mode 100644 index 0000000..7a281cb --- /dev/null +++ b/lib/health_manager/varz.rb @@ -0,0 +1,159 @@ +# concrete varz-s for the new healthmanager, backward-compatible with old. + +module HealthManager + class Varz + include HealthManager::Common + + # The "Realtime" stats come mostly from Known State Provider + + def prepare + declare_counter :total_apps + declare_counter :total_instances + declare_counter :running_instances + declare_counter :down_instances + declare_counter :crashed_instances + declare_counter :flapping_instances + + declare_node :running + declare_node :running, :frameworks + declare_node :running, :runtimes + + declare_counter :total_users + declare_collection :users # why have this? + declare_collection :apps + + declare_node :total + declare_node :total, :frameworks + declare_node :total, :runtimes + + declare_counter :queue_length + + declare_counter :heartbeat_msgs_received + declare_counter :droplet_exited_msgs_received + declare_counter :droplet_updated_msgs_received + declare_counter :healthmanager_status_msgs_received + declare_counter :healthmanager_health_request_msgs_received + + declare_counter :varz_publishes + declare_counter :varz_holds + declare_node :droplets + end + + REALTIME_STATS = [:total_apps, + :total_instances, + :running_instances, + :down_instances, + :crashed_instances, + :flapping_instances, + + :running, # node + ] + + EXPECTED_STATS = [:total, + :total_users, + :users, + :apps + ] + + def reset_realtime_stats + REALTIME_STATS.each { |s| hold(s); reset(s) } + end + + def reset_expected_stats + EXPECTED_STATS.each { |s| hold(s); reset(s) } + end + + def update_realtime_stats_for_droplet(droplet) + inc(:total_apps) + add(:total_instances, droplet.num_instances) + add(:crashed_instances, droplet.crashes.size) + + if droplet.state == STARTED && droplet.framework && droplet.runtime + + #top-level running/missing/flapping stats, i.e., empty path prefix + update_state_stats_for_instances(droplet) + + ['framework', 'runtime'].each { |metric| + path = [:running, "#{metric}s".to_sym, droplet.send(metric) ] + + #e.g., [:running, :frameworks, 'sinatra'] + #or, [:running, :runtimes, 'ruby19' ] + + create_runtime_metrics(*path) + + inc(*path, :apps) + add(*path, :crashes, droplet.crashes.size) + + #per framework, per runtime running/missing/flapping stats + update_state_stats_for_instances(*path, droplet) + } + end + end + + def update_state_stats_for_instances(*path, droplet) + + droplet.num_instances.times do |index| + instance = droplet.get_instance(droplet.live_version, index) + case instance['state'] + when STARTING, RUNNING + inc(*path, :running_instances) + when DOWN + inc(*path, :missing_instances) + when FLAPPING + inc(*path, :flapping_instances) + end + end + end + + def update_expected_stats_for_droplet(droplet_hash) + ['framework','runtime'].each { |metric| + path = [:total, "#{metric}s".to_sym, droplet_hash['metric']] + + create_db_metrics(*path) + + inc(*path, :apps) + add(*path, :instances, droplet_hash['instances']) + add(*path, :memory, droplet_hash['memory'] * droplet_hash['instances']) + + if droplet_hash['state'] == STARTED + inc(*path, :started_apps) + add(*path, :started_instances, droplet_hash['instances']) + add(*path, :started_memory, droplet_hash['memory'] * droplet_hash['instances']) + end + } + end + + def publish_realtime_stats + REALTIME_STATS.each { |s| varz.release(s) } + publish + end + + def publish_expected_stats + EXPECTED_STATS.each { |s| varz.release(s) } + publish + end + + private + def create_runtime_metrics(*path) + declare_node(*path) + set(*path, { + :apps => 0, + :crashes => 0, + :running_instances => 0, + :missing_instances => 0, + :flapping_instances => 0 + }) if get(*path).empty? + end + def create_db_metrics(*path) + declare_node(*path) + set(*path, { + :apps => 0, + :started_apps => 0, + :instances => 0, + :started_instances => 0, + :memory => 0, + :started_memory => 0 + }) if get(*path).empty? + end + end +end diff --git a/lib/health_manager/varz_common.rb b/lib/health_manager/varz_common.rb new file mode 100644 index 0000000..96e0076 --- /dev/null +++ b/lib/health_manager/varz_common.rb @@ -0,0 +1,149 @@ +# This is a wrapper/helper over VCAP::Component varz/ functionality. +# perhaps a part of this should be lifted to common/component. + +module HealthManager + class Varz + def initialize(config = {}) + @config = config + @counters = {} + @holds = {} + end + + def real_varz + VCAP::Component.varz || {} + end + + def hold(*path) + check_var_exists(*path) + @holds[path] = true + end + + def held?(*path) + partial = [] + path.any? { |leg| partial << leg; @holds[partial] } + end + + def sync(*path) + return if held?(*path) + h, k = get_last_hash_and_key!(real_varz, *path) + h[k] = get(*path) + end + + def release(*path) + raise ArgumentError.new("Path #{path} is not held") unless @holds[path] + @holds.delete(path) + end + + def publish + inc(:varz_publishes) + set(:varz_holds, @holds.size) + publish_not_held_recursively(real_varz, get_varz) + end + + def publish_not_held_recursively(lvalue, rvalue, *path) + return if held?(*path) + if rvalue.kind_of?(Hash) + lvalue = lvalue[path.last] ||= {} unless path.empty? + rvalue.keys.each { |key| + publish_not_held_recursively(lvalue, rvalue[key], *path, key) + } + else + lvalue[path.last] = rvalue + end + end + + def declare_node(*path) + check_var_exists(*path[0...-1]) + h,k = get_last_hash_and_key(get_varz, *path) + h[k] ||= {} + end + + def declare_collection(*path) + check_var_exists(*path[0...-1]) + h,k = get_last_hash_and_key(get_varz, *path) + h[k] ||= [] + end + + def declare_counter(*path) + check_var_exists(*path[0...-1]) + + h,k = get_last_hash_and_key(get_varz, *path) + raise ArgumentError.new("Counter #{path} already declared") if h[k] + h[k] = 0 + end + + def reset(*path) + check_var_exists(*path) + h,k = get_last_hash_and_key(get_varz, *path) + + if h[k].kind_of? Hash + h[k].keys.each { |key| reset(*path, key) } + elsif h[k].kind_of? Integer + h[k] = 0 + elsif h[k].kind_of? Array + h[k] = [] + else + raise ArgumentError.new("Don't know how to reset varz at path #{path}: #{h[k]} (#{h[k].class})") + end + end + + def push(*path, value) + check_var_exists(*path) + h,k= get_last_hash_and_key(get_varz, *path) + raise ArgumentError.new("Varz #{path} is not an Array, can't push") unless h[k].kind_of?(Array) + h[k] << value + sync(*path) + h[k] + end + + def add(*path, value) + check_var_exists(*path) + h,k= get_last_hash_and_key(get_varz, *path) + h[k] += value + sync(*path) + h[k] + end + + def inc(*path) + add(*path, 1) + end + + def get(*path) + check_var_exists(*path) + h,k = get_last_hash_and_key(get_varz, *path) + h[k] + end + + def set(*path, value) + check_var_exists(*path) + h,k = get_last_hash_and_key(get_varz, *path) + h[k] = value + end + + def get_varz + @counters + end + + private + + def get_last_hash_and_key!(source, *path) + counter = source ||= {} + path[0...-1].each { |p| counter = counter[p] ||= {}} + return counter, path.last + end + + def get_last_hash_and_key(source, *path) + counter = source + path[0...-1].each { |p| counter = counter[p] } + return counter, path.last + end + + def check_var_exists(*path) + c = @counters + path.each { |var| + raise ArgumentError.new("undeclared: #{var} in #{path}") unless c[var] + c = c[var] + } + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..30e1b7a --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,21 @@ +# Copyright (c) 2009-2012 VMware, Inc. +$:.unshift File.join(File.dirname(__FILE__), '..', 'lib') + +home = File.join(File.dirname(__FILE__), '..') +ENV['BUNDLE_GEMFILE'] = "#{home}/Gemfile" + +require 'rubygems' +require 'rspec' +require 'bundler/setup' + +require 'health_manager' + +def set_env(name, value) + @env_stack ||= [] + @env_stack.push(ENV[name]) + ENV[name] = value +end + +def restore_env(name) + ENV[name] = @env_stack.pop +end diff --git a/spec/unit/app_state_provider_spec.rb b/spec/unit/app_state_provider_spec.rb new file mode 100644 index 0000000..f53b978 --- /dev/null +++ b/spec/unit/app_state_provider_spec.rb @@ -0,0 +1,24 @@ +require File.join(File.dirname(__FILE__),'spec_helper') + +describe HealthManager do + #shortcuts + AppState = HealthManager::AppState + AppStateProvider = HealthManager::AppStateProvider + BulkBasedExpectedStateProvider = HealthManager::BulkBasedExpectedStateProvider + + describe AppStateProvider do + describe '#get_known_state_provider' do + it 'should return NATS-based provider by default' do + AppStateProvider.get_known_state_provider. + should be_an_instance_of(NatsBasedKnownStateProvider) + end + end + + describe '#get_expected_state_provider' do + it 'should return bulk-API-based provider by default' do + AppStateProvider.get_expected_state_provider. + should be_an_instance_of(BulkBasedExpectedStateProvider) + end + end + end +end diff --git a/spec/unit/app_state_spec.rb b/spec/unit/app_state_spec.rb new file mode 100644 index 0000000..3f29ced --- /dev/null +++ b/spec/unit/app_state_spec.rb @@ -0,0 +1,94 @@ +require File.join(File.dirname(__FILE__),'spec_helper') + +describe HealthManager do + + AppState = HealthManager::AppState + + include HealthManager::Common + + after :each do + AppState.remove_all_listeners + end + + describe AppState do + before(:each) do + @id = 1 + @live_version = '123456abcdef' + @num_instances = 4 + @droplet = AppState.new(@id) + @droplet.live_version = @live_version + @droplet.num_instances = @num_instances + AppState.remove_all_listeners + AppState.heartbeat_deadline = @heartbeat_dealing = 10 + end + + it 'should invoke missing_instances event handler' do + future_answer = [1,3] + event_handler_invoked = false + app = AppState.new(1) + app.state = 'STARTED' + app.num_instances=4 + + #no heartbeats arrived yet, so all instances are assumed missing + app.missing_indices.should == [0,1,2,3] + + AppState.add_listener :missing_instances do |a, indices| + a.should == app + indices.should == future_answer + event_handler_invoked = true + end + + event_handler_invoked.should be_false + hbs = make_heartbeat([app])['droplets'] + + hbs.delete_at(3) + hbs.delete_at(1) + + hbs.each {|hb| + app.process_heartbeat(hb) + } + + app.missing_indices.should == future_answer + event_handler_invoked.should be_false + + app.analyze + + event_handler_invoked.should be_false + + AppState.heartbeat_deadline = 0 + app.analyze + + event_handler_invoked.should be_true + end + + it 'should invoke extra_instances event handler' do + future_answer = [["12345-0","Extra instance"]] + + event_handler_invoked = false + app = AppState.new(1) + app.live_version = '12345' + app.state = 'STARTED' + app.num_instances=4 + + #no heartbeats arrived yet, so all instances are assumed missing + + AppState.add_listener :extra_instances do |a, indices| + a.should == app + indices.should == future_answer + event_handler_invoked = true + end + + event_handler_invoked.should be_false + hbs = make_heartbeat([app])['droplets'] + + hbs << hbs.first.dup + hbs.first['index'] = 4 + + hbs.each {|hb| app.process_heartbeat(hb) } + event_handler_invoked.should be_false + app.analyze + event_handler_invoked.should be_true + + end + end +end diff --git a/spec/unit/health_manager_spec.rb b/spec/unit/health_manager_spec.rb new file mode 100644 index 0000000..a799591 --- /dev/null +++ b/spec/unit/health_manager_spec.rb @@ -0,0 +1,48 @@ +require 'spec_helper.rb' + +describe HealthManager do + + Manager = HealthManager::Manager + Harmonizer = HealthManager::Harmonizer + KnownStateProvider = HealthManager::KnownStateProvider + ExpectedStateProvider = HealthManager::ExpectedStateProvider + Reporter = HealthManager::Reporter + Nudger = HealthManager::Nudger + Varz = HealthManager::Varz + + before(:all) do + EM.error_handler do |e| + fail "EM error: #{e.message}" + end + end + + before(:each) do + @config = {:intervals => + { + :expected_state_update => 1.5, + } + } + @m = Manager.new(@config) + @m.varz.prepare + end + + describe Manager do + it 'should have all componets registered and available' do + + @m.harmonizer.should be_a_kind_of Harmonizer + + # chaining components should also work. + # thus ensuring all components available from all components + @m.harmonizer.varz.should be_a_kind_of Varz + @m.varz.reporter.should be_a_kind_of Reporter + @m.reporter.known_state_provider.should be_a_kind_of KnownStateProvider + @m.known_state_provider.expected_state_provider.should be_a_kind_of ExpectedStateProvider + @m.expected_state_provider.nudger.should be_a_kind_of Nudger + @m.nudger.scheduler.should be_a_kind_of Scheduler + end + end + + describe Harmonizer do + it 'should be able to describe a policy of bringing a known state to expected state' + end +end diff --git a/spec/unit/nats_based_known_state_provider_spec.rb b/spec/unit/nats_based_known_state_provider_spec.rb new file mode 100644 index 0000000..b3cf9a8 --- /dev/null +++ b/spec/unit/nats_based_known_state_provider_spec.rb @@ -0,0 +1,83 @@ +require 'spec_helper' + +describe HealthManager do + + NatsBasedKnownStateProvider = HealthManager::NatsBasedKnownStateProvider + + include HealthManager::Common + + after :each do + AppState.remove_all_listeners + end + + describe AppStateProvider do + describe NatsBasedKnownStateProvider do + + before(:each) do + @nb = NatsBasedKnownStateProvider.new(build_valid_config) + end + + it 'should subscribe to heartbeat, droplet.exited/updated messages' do + NATS.should_receive(:subscribe).with('dea.heartbeat') + NATS.should_receive(:subscribe).with('droplet.exited') + NATS.should_receive(:subscribe).with('droplet.updated') + @nb.start + end + + it 'should forward heartbeats' do + + app = make_app({'num_instances' => 4 }) + + app1 = @nb.get_droplet(app.id) + + #setting the expected state, as an expected_state_provider would + app1.live_version = app.live_version + app1.state = app.state + app1.num_instances = app.num_instances + app1.framework = app.framework + app1.runtime = app.runtime + app1.last_updated = app.last_updated + + instance = app1.get_instance(@version, 0) + instance['state'].should == 'DOWN' + instance['last_heartbeat'].should be_nil + + hb = make_heartbeat([app]) + @nb.process_heartbeat(hb.to_json) + + instance = app1.get_instance(@version, 0) + instance['state'].should == 'RUNNING' + instance['last_heartbeat'].should_not be_nil + + end + end + end + + def make_app(options={}) + @app_id ||= 0 + @app_id += 1 + @version = '123456' + app = AppState.new(@app_id) + { + 'num_instances' => 2, + 'framework' => 'sinatra', + 'runtime' => 'ruby18', + 'live_version' => @version, + 'state' => ::HealthManager::STARTED, + 'last_updated' => now + + }.merge(options).each { |k,v| + app.send "#{k}=", v + } + app + end + + def build_valid_config(config={}) + @config = config + varz = Varz.new(@config) + varz.prepare + register_hm_component(:varz, varz) + register_hm_component(:scheduler, @scheduler = Scheduler.new(@config)) + @config + end +end diff --git a/spec/unit/nudger_spec.rb b/spec/unit/nudger_spec.rb new file mode 100644 index 0000000..aea750c --- /dev/null +++ b/spec/unit/nudger_spec.rb @@ -0,0 +1,31 @@ +require 'spec_helper.rb' + +describe HealthManager do + Manager = HealthManager::Manager + Nudger = HealthManager::Nudger + + before(:each) do + @m = Manager.new + @m.varz.prepare + end + + describe Nudger do + it 'should be able to start app instance' do + n = @m.nudger + NATS.should_receive(:publish).with('cloudcontrollers.hm.requests', match(/"op":"START"/)).once + n.start_instance(AppState.new(1), 0, 0) + set_env(::HealthManager::HM_SHADOW, 'false') + n.deque_batch_of_requests + restore_env(::HealthManager::HM_SHADOW) + end + + it 'should be able to stop app instance' do + n = @m.nudger + NATS.should_receive(:publish).with('cloudcontrollers.hm.requests', match(/"op":"STOP"/)).once + n.stop_instance(AppState.new(1), 0, 0) + set_env(::HealthManager::HM_SHADOW, 'false') + n.deque_batch_of_requests + restore_env(::HealthManager::HM_SHADOW) + end + end +end diff --git a/spec/unit/scheduler_spec.rb b/spec/unit/scheduler_spec.rb new file mode 100644 index 0000000..c781ae2 --- /dev/null +++ b/spec/unit/scheduler_spec.rb @@ -0,0 +1,152 @@ +require 'spec_helper' + +describe HealthManager do + + Scheduler = HealthManager::Scheduler + + include HealthManager::Common + + describe Scheduler do + + before(:each) do + @s = Scheduler.new + end + + describe '#interval' do + it 'should return configured interval values' do + s1 = Scheduler.new( :intervals => {:droplet_analysis =>7 } ) + s2 = Scheduler.new( 'intervals' => {'droplet_analysis' =>6 } ) + + s1.interval(:droplet_analysis).should == 7 + s1.interval('droplet_analysis').should == 7 + s2.interval(:droplet_analysis).should == 6 + s2.interval('droplet_analysis').should == 6 + end + + it 'should return default interval values' do + s = Scheduler.new + s.interval(:analysis_delay).should == ::HealthManager::ANALYSIS_DELAY + s.interval('analysis_delay').should == ::HealthManager::ANALYSIS_DELAY + end + + it 'should raise ArgumentError for invalid intervals' do + lambda { @s.interval(:bogus) }.should raise_error(ArgumentError, /undefined parameter/) + end + end + + it 'should be able to schedule own termination' do + @s.schedule :timer => 1 do + @s.stop + end + start_at = now + @s.start + stop_at = now + stop_at.should > start_at #at least a second should have elapsed + end + + it 'should be able to execute immediately' do + done = false + @s.immediately do + done = true + end + @s.immediately do + @s.stop + end + @s.start + done.should be_true + end + + it 'should be able to schedule periodic' do + count = 0 + @s.schedule :timer => 1.1 do + @s.stop + end + + @s.schedule :periodic => 0.3 do + count += 1 + end + + @s.start + count.should == 3 + end + + it 'should be able to schedule multiple blocks' do + #this shows running the scheduler within explicit EM.run + EM.run do + @counter = Hash.new(0) + @s.immediately do + @counter[:immediate] += 1 + end + @s.every 0.3 do + @counter[:periodic] += 1 + end + @s.every 0.7 do + @counter[:timer] += 1 + end + #set up expectations for two points in time: + EM.add_timer(0.5) do + @counter[:immediate].should == 1 + @counter[:periodic].should == 1 + @counter[:timer].should == 0 + end + EM.add_timer(1.1) do + @counter[:immediate].should == 1 + @counter[:periodic].should == 3 + @counter[:timer].should == 1 + EM.stop + end + @s.run + end + end + + it 'should allow cancelling scheduled blocks' do + flag = false + cancelled_flag = false + + cancelled_timer1 = @s.schedule(:timer => 0.1) do + cancelled_flag = true + end + + cancelled_timer2 = @s.schedule(:timer => 0.3) do + cancelled_flag = true + end + + @s.after 0.2 do + flag = true + @s.cancel(cancelled_timer2) + end + + @s.after 1 do + @s.stop + end + + @s.cancel(cancelled_timer1) + + @s.start + + cancelled_flag.should be_false + flag.should be_true + end + + it 'should be able to start/stop/quantize tasks' do + + iters = 0 + @s.after 0.1 do + @s.start_task(:boo) do + iters += 1 + @s.task_running?(:boo).should be_true + iters < 5 #continuation condition + end + end + + @s.after 0.2 do + @s.stop + end + + @s.task_running?(:boo).should be_false + @s.start + iters.should == 5 + @s.task_running?(:boo).should be_false + end + end +end diff --git a/spec/unit/spec_helper.rb b/spec/unit/spec_helper.rb new file mode 100644 index 0000000..bce8622 --- /dev/null +++ b/spec/unit/spec_helper.rb @@ -0,0 +1,32 @@ +require File.join(File.dirname(__FILE__), '..','spec_helper') + +VCAP::Logging.setup_from_config({'level' => ENV['LOG_LEVEL'] || 'warn'}) + +module HealthManager::Common + + def in_em(timeout=2) + EM.run do + EM.add_timer(timeout) do + EM.stop + end + yield + end + end + + def make_heartbeat(apps) + hb = [] + apps.each do |app| + app.num_instances.times {|index| + hb << { + 'droplet' => app.id, + 'version' => app.live_version, + 'instance' => "#{app.live_version}-#{index}", + 'index' => index, + 'state' => ::HealthManager::RUNNING, + 'state_timestamp' => now + } + } + end + {'droplets' => hb, 'dea'=>'123456789abcdefgh'} + end +end diff --git a/spec/unit/varz_spec.rb b/spec/unit/varz_spec.rb new file mode 100644 index 0000000..4f80694 --- /dev/null +++ b/spec/unit/varz_spec.rb @@ -0,0 +1,191 @@ +require 'spec_helper' + +describe HealthManager do + + Varz = HealthManager::Varz + + describe Varz do + before :each do + @v = Varz.new + end + + def v; @v; end + + it 'should allow declaring counters' do + v.declare_counter :counter1 + v.get(:counter1).should == 0 + end + + it 'should allow declaring nodes and subcounters' do + v.declare_node :node + v.declare_counter :node, :foo + v.declare_node :node, :node1 + v.declare_counter :node, :node1, :foo + end + + it 'should disallow double declarations' do + v.declare_counter :foo + v.declare_counter :bar + vv = Varz.new + vv.declare_counter :foo #ok to declare same counters for different Varz objects + lambda { v.declare_counter(:foo).should raise_error ArgumentError } + end + + it 'should disallow undeclared counters' do + lambda { v.get :counter_bogus }.should raise_error ArgumentError + lambda { v.inc :counter_bogus }.should raise_error ArgumentError + v.declare_node :foo + v.declare_counter :foo, :bar + lambda { v.reset :foo, :bogus }.should raise_error ArgumentError + end + + it 'should have correct held? predicate' do + v.declare_node(:n) + v.declare_counter(:n, :c1) + v.declare_counter(:n, :c2) + + v.held?(:n).should be_false + v.held?(:n, :c1).should be_false + v.held?(:n, :c2).should be_false + + v.hold(:n, :c1) + + v.held?(:n).should be_false + v.held?(:n, :c1).should be_true + v.held?(:n, :c2).should be_false + + v.release(:n, :c1) + v.hold(:n,:c2) + + v.held?(:n).should be_false + v.held?(:n, :c1).should be_false + v.held?(:n, :c2).should be_true + + v.hold(:n) + + v.held?(:n).should be_true + v.held?(:n, :c1).should be_true + v.held?(:n, :c2).should be_true + end + + it 'should prevent bogus holding and releasing' do + lambda { v.hold :bogus }.should raise_error ArgumentError + v.declare_counter :boo + lambda { v.release :boo }.should raise_error ArgumentError + + v.hold :boo + v.release :boo + end + + it 'should allow publishing, holding and releasing' do + v.declare_counter :counter1 + + v.declare_node :node1 + v.declare_counter :node1, :counter2 + + v.declare_node :node1, :node2 + v.declare_counter :node1, :node2, :counter3 + + #one held, but all incremented + v.hold(:node1, :counter2) + + v.inc(:counter1) + v.inc(:node1, :counter2) + v.inc(:node1, :node2, :counter3) + + v.publish_not_held_recursively(res = {}, v.get_varz) + + res[:counter1].should == 1 + res[:node1][:counter2].should be_nil + res[:node1][:node2][:counter3].should == 1 + + #after release and republish, value is again available + v.release(:node1,:counter2) + v.publish_not_held_recursively(res, v.get_varz) + + res[:node1][:counter2].should == 1 + + #now holding top-level entry + v.hold(:counter1) + v.inc(:counter1) + v.inc(:node1, :counter2) + v.inc(:node1, :node2, :counter3) + + v.publish_not_held_recursively(res, v.get_varz) + + res[:counter1].should == 1 + res[:node1][:counter2].should == 2 + res[:node1][:node2][:counter3].should == 2 + + v.release(:counter1) + v.publish_not_held_recursively(res, v.get_varz) + + res[:counter1].should == 2 + + #now holding third-level entry + + v.hold(:node1,:node2,:counter3) + + v.inc(:counter1) + v.inc(:node1, :counter2) + v.inc(:node1, :node2, :counter3) + + v.publish_not_held_recursively(res, v.get_varz) + + res[:counter1].should == 3 + res[:node1][:counter2].should == 3 + res[:node1][:node2][:counter3].should == 2 + + v.release(:node1,:node2,:counter3) + v.publish_not_held_recursively(res, v.get_varz) + + res[:node1][:node2][:counter3].should == 3 + end + + it 'should properly increment and reset counters' do + v.declare_counter :foo + v.declare_node :node + v.declare_counter :node, :bar + + v.get(:foo).should == 0 + v.inc(:foo).should == 1 + v.get(:foo).should == 1 + + v.add :foo, 10 + v.get(:foo).should == 11 + v.get(:node, :bar).should == 0 + v.inc(:node, :bar).should == 1 + v.get(:foo).should == 11 + + v.reset :foo + v.get(:foo).should == 0 + v.get(:node, :bar).should == 1 + + end + + it 'should allow setting of counters' do + v.declare_node :node + v.declare_node :node, 'subnode' + v.declare_counter :node, 'subnode', 'counter' + v.set :node, 'subnode', 'counter', 30 + v.get(:node, 'subnode', 'counter').should == 30 + + v.inc :node, 'subnode', 'counter' + v.get(:node, 'subnode', 'counter').should == 31 + end + + it 'should return valid varz' do + v.declare_counter :total_apps + v.declare_node :frameworks + v.declare_node :frameworks, 'sinatra' + v.declare_counter :frameworks, 'sinatra', :apps + + v.set :total_apps, 10 + 10.times { v.inc :frameworks, 'sinatra', :apps } + + v.get_varz.should == { + :total_apps => 10, + :frameworks => { 'sinatra' => {:apps => 10 }}} + end + end +end