Permalink
Browse files

Merge "[hm] resilience: db reconnection logic with exponential back-off"

  • Loading branch information...
2 parents e8489a8 + debad4d commit 35dcbcc8d19291f347dbb47e9dda7549e5720d5d @bnugmanov bnugmanov committed with Gerrit Code Review Feb 13, 2012
@@ -94,6 +94,7 @@ def initialize(config)
@flapping_timeout = config['intervals']['flapping_timeout']
@restart_timeout = config['intervals']['restart_timeout']
@stable_state = config['intervals']['stable_state']
+ @max_db_reconnect_wait = config['intervals']['max_db_reconnect_wait'] || 300 #up to five minutes by default
@dequeueing_rate = config['dequeueing_rate'] || 50
@database_environment = config['database_environment']
@@ -136,17 +137,13 @@ def create_version_entry
def run
@started = Time.now.to_i
- register_error_handler
-
NATS.on_error do |e|
@logger.error("NATS problem, #{e}")
@logger.error(e)
exit!
end
-
EM.error_handler do |e|
@logger.error "Eventmachine problem, #{e}"
- @logger.error("#{e.backtrace.join("\n")}")
@logger.error(e)
exit!
end
@@ -174,6 +171,36 @@ def configure_database
establish_database_connection(config, logger)
end
+ def ensure_connected(&block)
+ sleep_time = 1
+ total_sleep_time = 0
+ failure_count = 0
+ begin
+ yield
+ rescue ActiveRecord::StatementInvalid
+ # This exception is raised when a connection was previously connected, but
+ # upon executing a statement detects that the connection is actually gone.
+ # Calling #disconnect! on the connection pool will make it reconnect.
+ @logger.warn('Possibly lost db connection, attempting to re-connect')
+ ActiveRecord::Base.connection_pool.disconnect!
+ retry
+ rescue ActiveSupport::Dependencies::Blamable => e
+ @logger.warn("Attempting to recover from: #{e}")
+ failure_count += 1
+ if total_sleep_time < @max_db_reconnect_wait
+ @logger.warn("Waiting for #{sleep_time} seconds before re-attempting database operation")
+ sleep sleep_time
+ total_sleep_time += sleep_time
+ sleep_time *= 2 unless sleep_time > 60
+ retry
+ else
+ @logger.error("Unable to reconnect after #{failure_count} attempts over #{total_sleep_time} seconds of waiting, giving up. Error information follows.")
+ @logger.error(e)
+ exit!
+ end
+ end
+ end
+
def establish_database_connection(db_config, logger)
expand_database_path_for_sqlite3(db_config)
ActiveRecord::Base.establish_connection(db_config)
@@ -287,13 +314,15 @@ def analyze_app(app_id, droplet_entry, stats)
end
end
- # don't act if we were looking at a stale droplet
- if update_droplet(App.find_by_id(app_id))
- if missing_indices.any? || extra_instances.any?
- @logger.info("Droplet information is stale for app id #{app_id}, not taking action.")
- @logger.info("(#{missing_indices.length} instances need to be started, #{extra_instances.length} instances need to be stopped.)")
+ ensure_connected do
+ # don't act if we were looking at a stale droplet
+ if update_droplet(App.find_by_id(app_id))
+ if missing_indices.any? || extra_instances.any?
+ @logger.info("Droplet information is stale for app id #{app_id}, not taking action.")
+ @logger.info("(#{missing_indices.length} instances need to be started, #{extra_instances.length} instances need to be stopped.)")
+ end
+ return
end
- return
end
if missing_indices.any?
@@ -360,7 +389,7 @@ def elapsed_time_in_ms(start)
def process_updated_message(message)
VCAP::Component.varz[:droplet_updated_msgs_received] += 1
droplet_id = parse_json(message)['droplet']
- update_droplet App.find_by_id(droplet_id)
+ ensure_connected { update_droplet App.find_by_id(droplet_id) }
end
def process_exited_message(message)
@@ -549,66 +578,70 @@ def process_status_message(message, reply)
end
def update_from_db
- start = Time.now
- old_droplet_ids = Set.new(@droplets.keys)
- App.all.each do |droplet|
- old_droplet_ids.delete(droplet.id)
- update_droplet(droplet)
- end
- old_droplet_ids.each {|id| @droplets.delete(id)}
- # TODO - Devise a version of the below that works with vast numbers of apps and users.
- VCAP::Component.varz[:total_users] = User.count
- VCAP::Component.varz[:users] = User.all_email_addresses.map {|e| {:email => e}}
- VCAP::Component.varz[:apps] = App.health_manager_representations
- @logger.info("Database scan took #{elapsed_time_in_ms(start)} and found #{@droplets.size} apps")
+ ensure_connected do
+ start = Time.now
+ old_droplet_ids = Set.new(@droplets.keys)
- start = Time.now
+ App.all.each do |droplet|
+ old_droplet_ids.delete(droplet.id)
+ update_droplet(droplet)
+ end
- VCAP::Component.varz[:total] = {
- :frameworks => {},
- :runtimes => {}
- }
+ old_droplet_ids.each {|id| @droplets.delete(id)}
+ # TODO - Devise a version of the below that works with vast numbers of apps and users.
+ VCAP::Component.varz[:total_users] = User.count
+ VCAP::Component.varz[:users] = User.all_email_addresses.map {|e| {:email => e}}
+ VCAP::Component.varz[:apps] = App.health_manager_representations
+ @logger.info("Database scan took #{elapsed_time_in_ms(start)} and found #{@droplets.size} apps")
- App.count(:group => ["framework", "runtime", "state"]).each do |grouping, count|
- framework, runtime, state = grouping
+ start = Time.now
- framework_stats = VCAP::Component.varz[:total][:frameworks][framework] ||= create_db_metrics
- framework_stats[:apps] += count
- framework_stats[:started_apps] += count if state == "STARTED"
+ VCAP::Component.varz[:total] = {
+ :frameworks => {},
+ :runtimes => {}
+ }
- runtime_stats = VCAP::Component.varz[:total][:runtimes][runtime] ||= create_db_metrics
- runtime_stats[:apps] += count
- runtime_stats[:started_apps] += count if state == "STARTED"
- end
+ App.count(:group => ["framework", "runtime", "state"]).each do |grouping, count|
+ framework, runtime, state = grouping
- App.sum(:instances, :group => ["framework", "runtime", "state"]).each do |grouping, count|
- framework, runtime, state = grouping
+ framework_stats = VCAP::Component.varz[:total][:frameworks][framework] ||= create_db_metrics
+ framework_stats[:apps] += count
+ framework_stats[:started_apps] += count if state == "STARTED"
- framework_stats = VCAP::Component.varz[:total][:frameworks][framework] ||= create_db_metrics
- framework_stats[:instances] += count
- framework_stats[:started_instances] += count if state == "STARTED"
+ runtime_stats = VCAP::Component.varz[:total][:runtimes][runtime] ||= create_db_metrics
+ runtime_stats[:apps] += count
+ runtime_stats[:started_apps] += count if state == "STARTED"
+ end
- runtime_stats = VCAP::Component.varz[:total][:runtimes][runtime] ||= create_db_metrics
- runtime_stats[:instances] += count
- runtime_stats[:started_instances] += count if state == "STARTED"
- end
+ App.sum(:instances, :group => ["framework", "runtime", "state"]).each do |grouping, count|
+ framework, runtime, state = grouping
- App.sum("instances * memory", :group => ["framework", "runtime", "state"]).each do |grouping, count|
- # memory is stored as a string
- count = count.to_i
- framework, runtime, state = grouping
+ framework_stats = VCAP::Component.varz[:total][:frameworks][framework] ||= create_db_metrics
+ framework_stats[:instances] += count
+ framework_stats[:started_instances] += count if state == "STARTED"
- framework_stats = VCAP::Component.varz[:total][:frameworks][framework] ||= create_db_metrics
- framework_stats[:memory] += count
- framework_stats[:started_memory] += count if state == "STARTED"
+ runtime_stats = VCAP::Component.varz[:total][:runtimes][runtime] ||= create_db_metrics
+ runtime_stats[:instances] += count
+ runtime_stats[:started_instances] += count if state == "STARTED"
+ end
+ App.sum("instances * memory", :group => ["framework", "runtime", "state"]).each do |grouping, count|
+ # memory is stored as a string
+ count = count.to_i
+ framework, runtime, state = grouping
- runtime_stats = VCAP::Component.varz[:total][:runtimes][runtime] ||= create_db_metrics
- runtime_stats[:memory] += count
- runtime_stats[:started_memory] += count if state == "STARTED"
- end
+ framework_stats = VCAP::Component.varz[:total][:frameworks][framework] ||= create_db_metrics
+ framework_stats[:memory] += count
+ framework_stats[:started_memory] += count if state == "STARTED"
- @logger.info("Database stat scan took #{elapsed_time_in_ms(start)}")
+
+ runtime_stats = VCAP::Component.varz[:total][:runtimes][runtime] ||= create_db_metrics
+ runtime_stats[:memory] += count
+ runtime_stats[:started_memory] += count if state == "STARTED"
+ end
+
+ @logger.info("Database stat scan took #{elapsed_time_in_ms(start)}")
+ end
end
def droplet_version(droplet)
@@ -678,18 +711,6 @@ def stop_instances(droplet_id, instances)
@logger.info("Requesting the stop of extra instances: #{stop_message}")
end
- def register_error_handler
- EM.error_handler { |e|
- if e.kind_of? NATS::Error
- @logger.error("NATS problem, #{e}")
- exit
- else
- @logger.error "Eventmachine problem, #{e}"
- @logger.error "#{e.backtrace.join("\n")}"
- end
- }
- end
-
def configure_timers
EM.next_tick { update_from_db }
EM.add_periodic_timer(@database_scan) { update_from_db }
@@ -48,7 +48,7 @@
'stable_state' => -1, #ensures all apps are "quiescent" for the purpose of testing
'nats_ping' => 1,
},
- 'logging' => {'level' => 'debug'},
+ 'logging' => {'level' => 'warn'},
'pid' => File.join(@run_dir, 'health_manager.pid'),
'rails_environment' => 'test',
@@ -57,8 +57,8 @@ def should_publish_to_nats(message, payload)
end
after(:all) do
- #::User.destroy_all
- #::App.destroy_all
+ ::User.destroy_all
+ ::App.destroy_all
end
before(:each) do
@@ -104,55 +104,6 @@ def make_heartbeat_message(indices, state)
{ 'droplets' => droplets }.to_json
end
- pending "should not do anything when everything is running" do
- NATS.should_receive(:start).with(:uri => 'nats://localhost:4222/')
-
- NATS.should_receive(:subscribe).with('dea.heartbeat').and_return { |_, block| @hb_block = block }
- NATS.should_receive(:subscribe).with('droplet.exited')
- NATS.should_receive(:subscribe).with('droplet.updated')
- NATS.should_receive(:subscribe).with('healthmanager.status')
- NATS.should_receive(:subscribe).with('healthmanager.health')
- NATS.should_receive(:publish).with('healthmanager.start')
-
- NATS.should_receive(:subscribe).with('vcap.component.discover')
- NATS.should_receive(:publish).with('vcap.component.announce', /\{.*\}/)
-
- EM.run do
- @hm.stub!(:register_error_handler)
- @hm.run
-
- EM.add_periodic_timer(1) do
- @hb_block.call({:droplets => [
- {
- :droplet => @app.id,
- :version => @app.staged_package_hash,
- :state => :RUNNING,
- :instance => 'instance 1',
- :index => 0
- },
- {
- :droplet => @app.id,
- :version => @app.staged_package_hash,
- :state => :RUNNING,
- :instance => 'instance 2',
- :index => 1
- },
- {
- :droplet => @app.id,
- :version => @app.staged_package_hash,
- :state => :RUNNING,
- :instance => 'instance 3',
- :index => 2
- }
- ]}.to_json)
- end
-
- EM.add_timer(4.5) do
- EM.stop_event_loop
- end
- end
- end
-
it "should detect instances that are down and send a START request" do
stats = { :frameworks => {}, :runtimes => {}, :down => 0 }
should_publish_to_nats "cloudcontrollers.hm.requests", {

0 comments on commit 35dcbcc

Please sign in to comment.