-
Notifications
You must be signed in to change notification settings - Fork 357
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This change adds support for processing Health Manager requests to start, stop or spin down apps. Test plan: all unit tests passed Change-Id: I35a9bb69a46a73fbd2f9cd0591d4df516b6a81eb
- Loading branch information
Showing
3 changed files
with
342 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
# Copyright (c) 2009-2012 VMware, Inc. | ||
|
||
require "steno" | ||
|
||
require File.expand_path("../message_bus.rb", __FILE__) | ||
require File.expand_path("../dea/dea_client", __FILE__) | ||
|
||
module VCAP::CloudController | ||
class << self | ||
attr_accessor :health_manager_respondent | ||
end | ||
|
||
class HealthManagerRespondent | ||
attr_reader :logger, :config | ||
attr_reader :message_bus, :dea_client | ||
|
||
# Semantically there should only be one such thing, although | ||
# I'm hesitant about making singletons | ||
# - Jesse | ||
def initialize(config) | ||
@logger = config.fetch(:logger, Steno.logger("cc.hm")) | ||
@dea_client = config.fetch(:dea_client, DeaClient) | ||
@message_bus = dea_client.message_bus | ||
|
||
@config = config | ||
|
||
message_bus.subscribe("cloudcontrollers.hm.requests", :queue => "cc") do |decoded_msg| | ||
process_hm_request(decoded_msg) | ||
end | ||
end | ||
|
||
# @param [Hash] payload the decoded request message | ||
def process_hm_request(payload) | ||
case payload[:op] | ||
when "START" | ||
process_hm_start(payload) | ||
when "STOP" | ||
process_hm_stop(payload) | ||
when "SPINDOWN" | ||
process_hm_spindown(payload) | ||
else | ||
logger.warn("Unknown operated requested: #{payload[:op]}, payload: #{payload.inspect}") | ||
end | ||
end | ||
|
||
private | ||
def process_hm_start(payload) | ||
# TODO: Ideally we should validate the message here with Membrane | ||
begin | ||
app_id = payload.fetch(:droplet) | ||
indices = payload.fetch(:indices) | ||
last_updated = payload.fetch(:last_updated).to_i | ||
version = payload.fetch(:version) | ||
rescue KeyError => e | ||
logger.error("Malformed start request: #{payload}, #{e.message}") | ||
return | ||
end | ||
|
||
app = Models::App[:guid => app_id] | ||
return unless app | ||
return unless app.started? | ||
return unless version == app.version | ||
return unless last_updated == app.updated_at.to_i | ||
|
||
message_override = {} | ||
if payload[:flapping] | ||
message_override[:flapping] = true | ||
end | ||
dea_client.start_instances_with_message(app, indices, message_override) | ||
end | ||
|
||
def process_hm_stop(payload) | ||
# TODO: Ideally we should validate the message here with Membrane | ||
begin | ||
app_id = payload.fetch(:droplet) | ||
indices = payload.fetch(:indices) | ||
last_updated = payload.fetch(:last_updated).to_i | ||
rescue KeyError => e | ||
logger.error("Malformed stop request: #{payload}, #{e.message}") | ||
return | ||
end | ||
|
||
app = Models::App[:guid => app_id] | ||
|
||
# TODO: do we want to keep this behavior? | ||
unless app | ||
dea_client.stop( | ||
Models::App.new(:guid => app_id), | ||
) | ||
return | ||
end | ||
|
||
return unless last_updated == app.updated_at.to_i | ||
|
||
dea_client.stop_instances(app, indices) | ||
end | ||
|
||
def process_hm_spindown(payload) | ||
# TODO: Ideally we should validate the message here with Membrane | ||
begin | ||
app_id = payload.fetch(:droplet) | ||
rescue KeyError => e | ||
logger.error("Malformed spindown request: #{payload}, #{e.message}") | ||
return | ||
end | ||
|
||
app = Models::App[:guid => app_id] | ||
|
||
# TODO: do we want to keep this behavior? | ||
unless app | ||
dea_client.stop( | ||
Models::App.new(:guid => app_id), | ||
) | ||
return | ||
end | ||
|
||
if app.update(:state => "STOPPED") | ||
dea_client.stop(app) | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
# Copyright (c) 2009-2012 VMware, Inc. | ||
|
||
require File.expand_path("../spec_helper", __FILE__) | ||
require "cloud_controller/health_manager_respondent" | ||
|
||
module VCAP::CloudController | ||
describe HealthManagerRespondent do | ||
before :each do | ||
# #save refreshes the timestamp | ||
@app = Models::App.make( | ||
:instances => 2, | ||
).save | ||
@mbus = double("mock nats") | ||
@dea_client = double("mock dea client", | ||
:message_bus => @mbus, | ||
) | ||
|
||
@mbus.should_receive(:subscribe).with( | ||
"cloudcontrollers.hm.requests", | ||
:queue => "cc", | ||
) | ||
|
||
@respondent = HealthManagerRespondent.new( | ||
config.merge( | ||
:message_bus => @mbus, | ||
:dea_client => @dea_client, | ||
) | ||
) | ||
end | ||
|
||
describe "#process_hm_request" do | ||
describe "on START request" do | ||
it "should drop request if timestamps mismatch" do | ||
payload = { | ||
:droplet => @app.guid, | ||
:op => "START", | ||
:last_updated => Time.now - 86400, | ||
:version => @app.version, | ||
:indices => [0,1], | ||
} | ||
|
||
@dea_client.should_not_receive(:start_instances_with_message) | ||
@mbus.should_not_receive(:publish).with(/^dea.+.start$/, anything) | ||
@respondent.process_hm_request(payload) | ||
end | ||
|
||
it "should drop request if versions mismatch" do | ||
payload = { | ||
:droplet => @app.guid, | ||
:op => "START", | ||
:last_updated => @app.updated_at, | ||
:version => 'deadbeaf-0', | ||
:indices => [0,1], | ||
} | ||
|
||
@dea_client.should_not_receive(:start_instances_with_message) | ||
@mbus.should_not_receive(:publish).with(/^dea.+.start$/, anything) | ||
@respondent.process_hm_request(payload) | ||
end | ||
|
||
it "should drop request if app isn't started" do | ||
payload = { | ||
:droplet => @app.guid, | ||
:op => "START", | ||
:last_updated => @app.updated_at, | ||
:version => @app.version, | ||
:indices => [0,1], | ||
} | ||
@dea_client.should_not_receive(:start_instances_with_message) | ||
@mbus.should_not_receive(:publish).with(/^dea.+.start$/, anything) | ||
@respondent.process_hm_request(payload) | ||
end | ||
|
||
it "should send a start request to dea" do | ||
@app.update( | ||
:state => "STARTED", | ||
) | ||
|
||
payload = { | ||
:droplet => @app.guid, | ||
:op => "START", | ||
:last_updated => @app.updated_at, | ||
:version => @app.version, | ||
:indices => [1], | ||
} | ||
@dea_client.should_receive(:start_instances_with_message).with( | ||
# XXX: we should do something about this, like overriding | ||
# Sequel::Model#eql? or something that ignores the nanosecond | ||
# nonsense | ||
respond_with(:guid => @app.guid), | ||
[1], | ||
{}, | ||
) | ||
|
||
@respondent.process_hm_request(payload) | ||
end | ||
|
||
it "should send a start request indicating a flapping app" do | ||
@app.update( | ||
:state => "STARTED", | ||
) | ||
|
||
payload = { | ||
:droplet => @app.guid, | ||
:op => "START", | ||
:last_updated => @app.updated_at, | ||
:version => @app.version, | ||
:indices => [1], | ||
:flapping => true, | ||
} | ||
@dea_client.should_receive(:start_instances_with_message).with( | ||
respond_with(:guid => @app.guid), | ||
[1], | ||
:flapping => true, | ||
) | ||
|
||
@respondent.process_hm_request(payload) | ||
end | ||
end | ||
|
||
describe "on STOP request" do | ||
it "should drop request if timestamps mismatch" do | ||
payload = { | ||
:droplet => @app.guid, | ||
:op => "STOP", | ||
:last_updated => Time.now - 86400, | ||
:indices => [0,1], | ||
} | ||
|
||
@dea_client.should_not_receive(:stop_instances) | ||
@mbus.should_not_receive(:publish).with(/^dea.+.start$/, anything) | ||
@respondent.process_hm_request(payload) | ||
end | ||
|
||
it "should send a stop request to dea for a runaway app" do | ||
@app.destroy | ||
|
||
payload = { | ||
:droplet => @app.guid, | ||
:op => "STOP", | ||
:last_updated => @app.updated_at, | ||
:indices => [1], | ||
} | ||
@dea_client.should_receive(:stop) do |app| | ||
app.guid.should == @app.guid | ||
end | ||
|
||
@respondent.process_hm_request(payload) | ||
end | ||
|
||
it "should send a stop request to dea" do | ||
payload = { | ||
:droplet => @app.guid, | ||
:op => "STOP", | ||
:last_updated => @app.updated_at, | ||
:indices => [1], | ||
} | ||
|
||
@dea_client.should_receive(:stop_instances).with( | ||
respond_with(:guid => @app.guid), | ||
[1], | ||
) | ||
|
||
@respondent.process_hm_request(payload) | ||
end | ||
end | ||
|
||
describe "on SPINDOWN request" do | ||
it "should drop the request if app already stopped" do | ||
@app.update( | ||
:state => "STOPPED", | ||
) | ||
|
||
payload = { | ||
:droplet => @app.guid, | ||
:op => "SPINDOWN", | ||
} | ||
@dea_client.should_not_receive(:stop) | ||
@mbus.should_not_receive(:publish).with( | ||
"dea.stop", | ||
anything, | ||
) | ||
|
||
@respondent.process_hm_request(payload) | ||
end | ||
|
||
it "should stop an app" do | ||
@app.update( | ||
:state => "STARTED", | ||
) | ||
|
||
payload = { | ||
:droplet => @app.guid, | ||
:op => "SPINDOWN", | ||
} | ||
@dea_client.should_receive(:stop).with( | ||
respond_with(:guid => @app.guid), | ||
) | ||
@respondent.process_hm_request(payload) | ||
end | ||
|
||
it "should update the state of an app to stopped" do | ||
@app.update( | ||
:state => "STARTED", | ||
) | ||
payload = { | ||
:droplet => @app.guid, | ||
:op => "SPINDOWN", | ||
} | ||
|
||
@dea_client.should_receive(:stop) | ||
@respondent.process_hm_request(payload) | ||
|
||
@app.reload.should be_stopped | ||
end | ||
end | ||
end | ||
end | ||
end |