Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Commit

Permalink
DEA sends heartbeats to HM9000 over HTTP
Browse files Browse the repository at this point in the history
 - change heartbeat method to only take uuid
 - no longer give entire bootstrap
 - create hm9000
 - dea sends heartbeats over http
 - Change integration helpers for http heartbeat
 - Fix dea logging and heartbeat URI

[#112002745]

Signed-off-by: Jonathan Berkhahn <jaberkha@us.ibm.com>
  • Loading branch information
swetharepakula authored and Anonymous Coward committed Mar 17, 2016
1 parent 08d8315 commit 298d25c
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 41 deletions.
3 changes: 3 additions & 0 deletions config/dea.yml
Expand Up @@ -77,3 +77,6 @@ placement_properties:
zone: "CRAZY_TOWN"

cc_url: https://user:password@cloud_controller_ng.service.cf.internal

hm9000:
uri: http://localhost:3569
13 changes: 10 additions & 3 deletions lib/dea/bootstrap.rb
Expand Up @@ -27,6 +27,7 @@
require "dea/http/httpserver"

require "dea/utils/download"
require "dea/utils/hm9000"

require "dea/staging/staging_task_registry"
require "dea/staging/staging_task"
Expand Down Expand Up @@ -55,6 +56,7 @@ class Bootstrap
attr_reader :directory_server_v2, :http_server
attr_reader :staging_task_registry
attr_reader :uuid
attr_reader :hm9000

def initialize(config = {})
@config = Config.new(config)
Expand All @@ -78,8 +80,9 @@ def setup
validate_config

@uuid = SecureRandom.uuid
setup_nats
setup_logging
setup_nats
setup_hm9000
setup_loggregator
setup_warden_container_lister
setup_droplet_registry
Expand Down Expand Up @@ -269,6 +272,10 @@ def setup_nats
@nats = Dea::Nats.new(self, config)
end

def setup_hm9000
@hm9000 = HM9000.new(config["hm9000"]["uri"], logger)
end

attr_reader :staging_responder

def start_nats
Expand Down Expand Up @@ -455,8 +462,8 @@ def send_heartbeat
instance.starting? || instance.running? || instance.crashed? || instance.evacuating?
end

hbs = Dea::Protocol::V1::HeartbeatResponse.generate(self, instances)
nats.publish("dea.heartbeat", hbs)
hbs = Dea::Protocol::V1::HeartbeatResponse.generate(uuid, instances)
hm9000.send_heartbeat(hbs)

nil
end
Expand Down
5 changes: 5 additions & 0 deletions lib/dea/config.rb
Expand Up @@ -34,6 +34,9 @@ class Config
"disk_inode_limit" => DEFAULT_STAGING_DISK_INODE_LIMIT,
},
"default_health_check_timeout" => 60,
"hm9000" => {
"uri" => "http://listener-hm9000.service.cf.internal:5335/dea/heartbeat"
},
}

def self.schema
Expand Down Expand Up @@ -71,6 +74,8 @@ def self.schema

"cc_url" => String,

"hm9000" => Hash,

optional("crash_lifetime_secs") => Integer,
optional("crash_block_usage_ratio_threshold") => Float,
optional("crash_inode_usage_ratio_threshold") => Float,
Expand Down
4 changes: 2 additions & 2 deletions lib/dea/protocol.rb
Expand Up @@ -8,7 +8,7 @@ module Protocol; end

module Dea::Protocol::V1
class HeartbeatResponse
def self.generate(bootstrap, instances)
def self.generate(uuid, instances)
hbs = instances.map do |instance|
{
"cc_partition" => instance.cc_partition,
Expand All @@ -22,7 +22,7 @@ def self.generate(bootstrap, instances)
end

{ "droplets" => hbs,
"dea" => bootstrap.uuid,
"dea" => uuid,
}
end
end
Expand Down
48 changes: 48 additions & 0 deletions lib/dea/utils/hm9000.rb
@@ -0,0 +1,48 @@
require 'dea/utils/uri_cleaner'

class HM9000
INACTIVITY_TIMEOUT = 300.freeze

attr_reader :logger

def initialize(destination, custom_logger=nil)
@destination = destination
@logger = custom_logger || self.class.logger
end

def send_heartbeat(heartbeat)
logger.info("send_heartbeat", destination: URICleaner.clean(@destination))

http = EM::HttpRequest.new(@destination, inactivity_timeout: INACTIVITY_TIMEOUT).post( body: Yajl::Encoder.encode(heartbeat))

http.errback do
handle_error(http)
end

http.callback do
handle_http_response(http)
end
end

def handle_http_response(http)
http_status = http.response_header.status

if http_status == 202
logger.debug("heartbeat accepted")
else
handle_error(http)
end
end


def handle_error(http)
open_connection_count = EM.connection_count # https://github.com/igrigorik/em-http-request/issues/190 says to check connection_count
logger.warn("Sending heartbeat failed",
destination: URICleaner.clean(@destination),
connection_count: open_connection_count,
http_error: http.error,
http_status: http.response_header.status,
http_response: http.response)
end
end

3 changes: 3 additions & 0 deletions spec/support/bootstrap_setup.rb
Expand Up @@ -45,6 +45,9 @@
"key_file" => "a_key_file",
"cert_file" => "a_cert_file"
},
"hm9000" => {
"uri" => 'http://127.0.0.1:25432'
}
}

bootstrap = Dea::Bootstrap.new(config)
Expand Down
24 changes: 20 additions & 4 deletions spec/support/integration_helpers/dea_helpers.rb
Expand Up @@ -75,8 +75,8 @@ def dea_start(extra_config={})

Timeout.timeout(10) do
begin
heartbeat = nats.with_subscription("dea.heartbeat") {}
puts "dea server started, heartbeat received"
adverise = nats.with_subscription("dea.advertise") {}
puts "dea server started, dea.advertise received"
rescue NATS::ConnectError, Timeout::Error
# Ignore because either NATS is not running, or DEA is not running.
end
Expand Down Expand Up @@ -115,8 +115,24 @@ def wait_until_instance_gone(app_id, timeout = 60)
end

def wait_until_instance_evacuating(app_id)
heartbeat = nats.with_subscription("dea.heartbeat") {}
heartbeat["droplets"].detect { |instance| instance.fetch("state") == "EVACUATING" && instance.fetch("droplet") == app_id }
uri = dea_config['hm9000']['uri']

without_http = uri[uri.index(':')+1..-1]
port = without_http[without_http.index(':')+1..-1].to_i

heartbeat = ""
with_event_machine(:timeout => 10) do
start_http_server(port) do |connection, data|
new_data = data[data.index('{')..-1]
heartbeat = Yajl::Parser.parse(new_data)

if heartbeat["droplets"].detect { |instance| instance.fetch("state") == "EVACUATING" && instance.fetch("droplet") == app_id }
done
end
end
end

return heartbeat["droplets"].detect { |instance| instance.fetch("state") == "EVACUATING" && instance.fetch("droplet") == app_id }
end

def wait_until(timeout = 5, &block)
Expand Down
31 changes: 20 additions & 11 deletions spec/unit/bootstrap/directed_start_spec.rb
Expand Up @@ -7,15 +7,17 @@
include_context "bootstrap_setup"

describe "directed start" do
def publish
def publish(next_tick=true)
with_event_machine do
bootstrap.setup
bootstrap.start

nats_mock.publish("dea.#{bootstrap.uuid}.start", valid_instance_attributes)

EM.next_tick do
done
if next_tick
EM.next_tick do
done
end
end
end
end
Expand Down Expand Up @@ -64,11 +66,14 @@ def publish

it "does not publish a heartbeat" do
received_heartbeat = false
nats_mock.subscribe("dea.heartbeat") do
received_heartbeat = true
end

publish
with_event_machine do
start_http_server(25432) do |connection, data|
received_heartbeat = true
end

publish
end

expect(received_heartbeat).to be false
end
Expand Down Expand Up @@ -101,11 +106,15 @@ def publish

it "publishes a heartbeat" do
received_heartbeat = false
nats_mock.subscribe("dea.heartbeat") do
received_heartbeat = true
end

publish
with_event_machine do
start_http_server(25432) do |connection, data|
received_heartbeat = true
done
end

publish(false)
end

expect(received_heartbeat).to be true
end
Expand Down
33 changes: 19 additions & 14 deletions spec/unit/bootstrap/heartbeat_spec.rb
Expand Up @@ -12,17 +12,19 @@
instances = []
heartbeats = []

# Unregister an instance with each heartbeat received
nats_mock.subscribe("dea.heartbeat") do |msg, _|
heartbeats << Yajl::Parser.parse(msg)
if heartbeats.size == 5
done
else
bootstrap.instance_registry.unregister(instances[heartbeats.size - 1])
end
end

with_event_machine(:timeout => 1) do

# Unregister an instance with each heartbeat received
start_http_server(25432) do |connection, data|
#strip hearder off of data
new_data = data[data.index('{')..-1]
heartbeats << Yajl::Parser.parse(new_data)
if heartbeats.size == 5
done
else
bootstrap.instance_registry.unregister(instances[heartbeats.size - 1])
end
end
# Hack to not have the test take too long because heartbeat interval is defined
# as an Integer in the schema.
bootstrap.config['intervals']['heartbeat'] = 0.01
Expand Down Expand Up @@ -57,12 +59,15 @@
describe "instance state filtering" do
def run
heartbeat = nil
nats_mock.subscribe("dea.heartbeat") do |msg, _|
heartbeat = Yajl::Parser.parse(msg)
done
end

with_event_machine(:timeout => 1) do
start_http_server(25432) do |connection, data|
#strip header off of data
new_data = data[data.index('{')..-1]
heartbeat = Yajl::Parser.parse(new_data)
done
end

bootstrap.setup
yield
bootstrap.start
Expand Down
30 changes: 23 additions & 7 deletions spec/unit/bootstrap_spec.rb
Expand Up @@ -56,6 +56,7 @@
expect(bootstrap).to receive(:setup_directory_server_v2)
expect(bootstrap).to receive(:setup_directories)
expect(bootstrap).to receive(:setup_pid_file)
expect(bootstrap).to receive(:setup_hm9000)

bootstrap.setup
end
Expand Down Expand Up @@ -438,6 +439,8 @@
bootstrap.setup_staging_task_registry
bootstrap.setup_resource_manager
bootstrap.start_nats
bootstrap.setup_hm9000
allow(bootstrap.hm9000).to receive(:send_heartbeat)
end

it "advertises dea" do
Expand All @@ -459,7 +462,7 @@
end

it "heartbeats its registry" do
allow(bootstrap).to receive(:send_heartbeat)
expect(bootstrap).to receive(:send_heartbeat)
bootstrap.start_finish
end
end
Expand Down Expand Up @@ -655,25 +658,38 @@
end
end

describe "send_heartbeat" do
describe '#send_heartbeat' do
before do
allow(EM).to receive(:add_periodic_timer).and_return(nil)
allow(EM).to receive(:add_timer).and_return(nil)
bootstrap.setup_nats
bootstrap.start_nats
bootstrap.setup_hm9000
# bootstrap.setup_nats
# bootstrap.start_nats
end

context "when there are no registered instances" do
let(:heartbeat) do
{
"droplets" => [],
"dea" => bootstrap.uuid,
}
end
it "publishes an empty dea.heartbeat" do
allow(nats_mock).to receive(:publish)
expect(bootstrap.hm9000).to receive(:send_heartbeat).with(heartbeat)

bootstrap.send_heartbeat

expect(nats_mock).to have_received(:publish).with("dea.heartbeat", anything)
end
end
end

describe '#setup_hm9000' do
it 'initializes hm9000' do
bootstrap.setup_hm9000
expect(bootstrap.hm9000).to_not be_nil
expect(bootstrap.hm9000).to be_a_kind_of(HM9000)
end
end

describe 'download_buildpacks' do
let(:buildpack_uri) { URI::join(@config['cc_url'], '/internal/buildpacks').to_s }

Expand Down

0 comments on commit 298d25c

Please sign in to comment.