diff --git a/config/dea.yml b/config/dea.yml index fcb007b6..63f02e28 100644 --- a/config/dea.yml +++ b/config/dea.yml @@ -77,3 +77,6 @@ placement_properties: zone: "CRAZY_TOWN" cc_url: https://user:password@cloud_controller_ng.service.cf.internal + +hm9000: + uri: http://localhost:3569 diff --git a/lib/dea/bootstrap.rb b/lib/dea/bootstrap.rb index 2e01b028..032455ac 100644 --- a/lib/dea/bootstrap.rb +++ b/lib/dea/bootstrap.rb @@ -27,6 +27,7 @@ require "dea/http/httpserver" require "dea/utils/download" +require "dea/utils/hm9000" require "dea/staging/staging_task_registry" require "dea/staging/staging_task" @@ -55,6 +56,7 @@ class Bootstrap attr_reader :directory_server_v2, :http_server attr_reader :staging_task_registry attr_reader :uuid + attr_reader :hm9000 def initialize(config = {}) @config = Config.new(config) @@ -78,8 +80,9 @@ def setup validate_config @uuid = SecureRandom.uuid - setup_nats setup_logging + setup_nats + setup_hm9000 setup_loggregator setup_warden_container_lister setup_droplet_registry @@ -269,6 +272,10 @@ def setup_nats @nats = Dea::Nats.new(self, config) end + def setup_hm9000 + @hm9000 = HM9000.new(config["hm9000"]["uri"], logger) + end + attr_reader :staging_responder def start_nats @@ -455,8 +462,8 @@ def send_heartbeat instance.starting? || instance.running? || instance.crashed? || instance.evacuating? end - hbs = Dea::Protocol::V1::HeartbeatResponse.generate(self, instances) - nats.publish("dea.heartbeat", hbs) + hbs = Dea::Protocol::V1::HeartbeatResponse.generate(uuid, instances) + hm9000.send_heartbeat(hbs) nil end diff --git a/lib/dea/config.rb b/lib/dea/config.rb index b2e013f5..c1f3e8ce 100644 --- a/lib/dea/config.rb +++ b/lib/dea/config.rb @@ -34,6 +34,9 @@ class Config "disk_inode_limit" => DEFAULT_STAGING_DISK_INODE_LIMIT, }, "default_health_check_timeout" => 60, + "hm9000" => { + "uri" => "http://listener-hm9000.service.cf.internal:5335/dea/heartbeat" + }, } def self.schema @@ -71,6 +74,8 @@ def self.schema "cc_url" => String, + "hm9000" => Hash, + optional("crash_lifetime_secs") => Integer, optional("crash_block_usage_ratio_threshold") => Float, optional("crash_inode_usage_ratio_threshold") => Float, diff --git a/lib/dea/protocol.rb b/lib/dea/protocol.rb index 34b8b7d7..0c10e643 100644 --- a/lib/dea/protocol.rb +++ b/lib/dea/protocol.rb @@ -8,7 +8,7 @@ module Protocol; end module Dea::Protocol::V1 class HeartbeatResponse - def self.generate(bootstrap, instances) + def self.generate(uuid, instances) hbs = instances.map do |instance| { "cc_partition" => instance.cc_partition, @@ -22,7 +22,7 @@ def self.generate(bootstrap, instances) end { "droplets" => hbs, - "dea" => bootstrap.uuid, + "dea" => uuid, } end end diff --git a/lib/dea/utils/hm9000.rb b/lib/dea/utils/hm9000.rb new file mode 100644 index 00000000..2588cabb --- /dev/null +++ b/lib/dea/utils/hm9000.rb @@ -0,0 +1,48 @@ +require 'dea/utils/uri_cleaner' + +class HM9000 + INACTIVITY_TIMEOUT = 300.freeze + + attr_reader :logger + + def initialize(destination, custom_logger=nil) + @destination = destination + @logger = custom_logger || self.class.logger + end + + def send_heartbeat(heartbeat) + logger.info("send_heartbeat", destination: URICleaner.clean(@destination)) + + http = EM::HttpRequest.new(@destination, inactivity_timeout: INACTIVITY_TIMEOUT).post( body: Yajl::Encoder.encode(heartbeat)) + + http.errback do + handle_error(http) + end + + http.callback do + handle_http_response(http) + end + end + + def handle_http_response(http) + http_status = http.response_header.status + + if http_status == 202 + logger.debug("heartbeat accepted") + else + handle_error(http) + end + end + + + def handle_error(http) + open_connection_count = EM.connection_count # https://github.com/igrigorik/em-http-request/issues/190 says to check connection_count + logger.warn("Sending heartbeat failed", + destination: URICleaner.clean(@destination), + connection_count: open_connection_count, + http_error: http.error, + http_status: http.response_header.status, + http_response: http.response) + end +end + diff --git a/spec/support/bootstrap_setup.rb b/spec/support/bootstrap_setup.rb index 263df8d8..d811f602 100644 --- a/spec/support/bootstrap_setup.rb +++ b/spec/support/bootstrap_setup.rb @@ -45,6 +45,9 @@ "key_file" => "a_key_file", "cert_file" => "a_cert_file" }, + "hm9000" => { + "uri" => 'http://127.0.0.1:25432' + } } bootstrap = Dea::Bootstrap.new(config) diff --git a/spec/support/integration_helpers/dea_helpers.rb b/spec/support/integration_helpers/dea_helpers.rb index 3652ab1d..d96fdae6 100644 --- a/spec/support/integration_helpers/dea_helpers.rb +++ b/spec/support/integration_helpers/dea_helpers.rb @@ -75,8 +75,8 @@ def dea_start(extra_config={}) Timeout.timeout(10) do begin - heartbeat = nats.with_subscription("dea.heartbeat") {} - puts "dea server started, heartbeat received" + adverise = nats.with_subscription("dea.advertise") {} + puts "dea server started, dea.advertise received" rescue NATS::ConnectError, Timeout::Error # Ignore because either NATS is not running, or DEA is not running. end @@ -115,8 +115,24 @@ def wait_until_instance_gone(app_id, timeout = 60) end def wait_until_instance_evacuating(app_id) - heartbeat = nats.with_subscription("dea.heartbeat") {} - heartbeat["droplets"].detect { |instance| instance.fetch("state") == "EVACUATING" && instance.fetch("droplet") == app_id } + uri = dea_config['hm9000']['uri'] + + without_http = uri[uri.index(':')+1..-1] + port = without_http[without_http.index(':')+1..-1].to_i + + heartbeat = "" + with_event_machine(:timeout => 10) do + start_http_server(port) do |connection, data| + new_data = data[data.index('{')..-1] + heartbeat = Yajl::Parser.parse(new_data) + + if heartbeat["droplets"].detect { |instance| instance.fetch("state") == "EVACUATING" && instance.fetch("droplet") == app_id } + done + end + end + end + + return heartbeat["droplets"].detect { |instance| instance.fetch("state") == "EVACUATING" && instance.fetch("droplet") == app_id } end def wait_until(timeout = 5, &block) diff --git a/spec/unit/bootstrap/directed_start_spec.rb b/spec/unit/bootstrap/directed_start_spec.rb index d6a278d9..d8ec1ebb 100644 --- a/spec/unit/bootstrap/directed_start_spec.rb +++ b/spec/unit/bootstrap/directed_start_spec.rb @@ -7,15 +7,17 @@ include_context "bootstrap_setup" describe "directed start" do - def publish + def publish(next_tick=true) with_event_machine do bootstrap.setup bootstrap.start nats_mock.publish("dea.#{bootstrap.uuid}.start", valid_instance_attributes) - EM.next_tick do - done + if next_tick + EM.next_tick do + done + end end end end @@ -64,11 +66,14 @@ def publish it "does not publish a heartbeat" do received_heartbeat = false - nats_mock.subscribe("dea.heartbeat") do - received_heartbeat = true - end - publish + with_event_machine do + start_http_server(25432) do |connection, data| + received_heartbeat = true + end + + publish + end expect(received_heartbeat).to be false end @@ -101,11 +106,15 @@ def publish it "publishes a heartbeat" do received_heartbeat = false - nats_mock.subscribe("dea.heartbeat") do - received_heartbeat = true - end - publish + with_event_machine do + start_http_server(25432) do |connection, data| + received_heartbeat = true + done + end + + publish(false) + end expect(received_heartbeat).to be true end diff --git a/spec/unit/bootstrap/heartbeat_spec.rb b/spec/unit/bootstrap/heartbeat_spec.rb index 3b8e715a..9d414b47 100644 --- a/spec/unit/bootstrap/heartbeat_spec.rb +++ b/spec/unit/bootstrap/heartbeat_spec.rb @@ -12,17 +12,19 @@ instances = [] heartbeats = [] - # Unregister an instance with each heartbeat received - nats_mock.subscribe("dea.heartbeat") do |msg, _| - heartbeats << Yajl::Parser.parse(msg) - if heartbeats.size == 5 - done - else - bootstrap.instance_registry.unregister(instances[heartbeats.size - 1]) - end - end - with_event_machine(:timeout => 1) do + + # Unregister an instance with each heartbeat received + start_http_server(25432) do |connection, data| + #strip hearder off of data + new_data = data[data.index('{')..-1] + heartbeats << Yajl::Parser.parse(new_data) + if heartbeats.size == 5 + done + else + bootstrap.instance_registry.unregister(instances[heartbeats.size - 1]) + end + end # Hack to not have the test take too long because heartbeat interval is defined # as an Integer in the schema. bootstrap.config['intervals']['heartbeat'] = 0.01 @@ -57,12 +59,15 @@ describe "instance state filtering" do def run heartbeat = nil - nats_mock.subscribe("dea.heartbeat") do |msg, _| - heartbeat = Yajl::Parser.parse(msg) - done - end with_event_machine(:timeout => 1) do + start_http_server(25432) do |connection, data| + #strip header off of data + new_data = data[data.index('{')..-1] + heartbeat = Yajl::Parser.parse(new_data) + done + end + bootstrap.setup yield bootstrap.start diff --git a/spec/unit/bootstrap_spec.rb b/spec/unit/bootstrap_spec.rb index 91c79208..6b8d3259 100644 --- a/spec/unit/bootstrap_spec.rb +++ b/spec/unit/bootstrap_spec.rb @@ -56,6 +56,7 @@ expect(bootstrap).to receive(:setup_directory_server_v2) expect(bootstrap).to receive(:setup_directories) expect(bootstrap).to receive(:setup_pid_file) + expect(bootstrap).to receive(:setup_hm9000) bootstrap.setup end @@ -438,6 +439,8 @@ bootstrap.setup_staging_task_registry bootstrap.setup_resource_manager bootstrap.start_nats + bootstrap.setup_hm9000 + allow(bootstrap.hm9000).to receive(:send_heartbeat) end it "advertises dea" do @@ -459,7 +462,7 @@ end it "heartbeats its registry" do - allow(bootstrap).to receive(:send_heartbeat) + expect(bootstrap).to receive(:send_heartbeat) bootstrap.start_finish end end @@ -655,25 +658,38 @@ end end - describe "send_heartbeat" do + describe '#send_heartbeat' do before do allow(EM).to receive(:add_periodic_timer).and_return(nil) allow(EM).to receive(:add_timer).and_return(nil) - bootstrap.setup_nats - bootstrap.start_nats + bootstrap.setup_hm9000 + # bootstrap.setup_nats + # bootstrap.start_nats end context "when there are no registered instances" do + let(:heartbeat) do + { + "droplets" => [], + "dea" => bootstrap.uuid, + } + end it "publishes an empty dea.heartbeat" do - allow(nats_mock).to receive(:publish) + expect(bootstrap.hm9000).to receive(:send_heartbeat).with(heartbeat) bootstrap.send_heartbeat - - expect(nats_mock).to have_received(:publish).with("dea.heartbeat", anything) end end end + describe '#setup_hm9000' do + it 'initializes hm9000' do + bootstrap.setup_hm9000 + expect(bootstrap.hm9000).to_not be_nil + expect(bootstrap.hm9000).to be_a_kind_of(HM9000) + end + end + describe 'download_buildpacks' do let(:buildpack_uri) { URI::join(@config['cc_url'], '/internal/buildpacks').to_s } diff --git a/spec/unit/utils/hm9000_spec.rb b/spec/unit/utils/hm9000_spec.rb new file mode 100644 index 00000000..09cfdb40 --- /dev/null +++ b/spec/unit/utils/hm9000_spec.rb @@ -0,0 +1,89 @@ +require 'spec_helper' +require 'dea/utils/hm9000' +require 'dea/protocol' + +describe HM9000 do + around do |example| + with_event_machine { example.call } + end + + let(:to_uri) { URI("http://127.0.0.1:12345/") } + + let(:polling_timeout_in_second) { 3 } + let(:logger) { double(:logger, info: nil, warn: nil, debug: nil) } + + subject { HM9000.new(to_uri, logger) } + + describe "#send_heartbeat" do + let(:request) { double(:request, method: 'delete').as_null_object } + let(:http) { double(:http, req: request).as_null_object } + + let(:heartbeat) { Dea::Protocol::V1::HeartbeatResponse.generate("dea-uuid", []) } + + let(:status) { 202 } + + it "creates the correct request with the heartbeat in json" do + expect(EM::HttpRequest).to receive(:new).with(to_uri, inactivity_timeout: 300).and_return(http) + expect(http).to receive(:post).with( + body: Yajl::Encoder.encode(heartbeat) + ) + expect(http).to receive(:callback) + + subject.send_heartbeat(heartbeat) + + done + end + + context "when everything works perfectly" do + let(:http) { double(:http, req: request, response_header: { status: 202 } ).as_null_object } + it "logs the success" do + expect(EM::HttpRequest).to receive(:new).with(to_uri, inactivity_timeout: 300).and_return(http) + + expect(subject.logger).to receive(:info) + expect(http).to receive(:callback) + expect(http).to receive(:errback) + + subject.send_heartbeat(heartbeat) + + done + end + end + end + + describe '#handle_http_response' do + let(:response_header) { double(:response_header, status: status)} + let(:http) { double(:http, req: nil, response_header: response_header ).as_null_object } + context 'when status is 202' do + let(:status) { 202 } + it 'logs the success' do + expect(subject.logger).to receive(:debug) + subject.handle_http_response(http) + + done + end + end + + context 'when status is not 202' do + let(:status) { 401 } + it 'calls handle_error' do + expect(subject).to receive(:handle_error) + + subject.handle_http_response(http) + + done + end + end + end + + describe '#handle_error' do + let(:status) { 0 } + let(:response_header) { double(:response_header, status: status)} + let(:http) { double(:http, req: nil, response_header: response_header ).as_null_object } + it 'logs the error' do + expect(subject.logger).to receive(:warn) + subject.handle_error(http) + + done + end + end +end