Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

extract staging nats interaction into Stage responder

Change-Id: Ic6e35cfb6c13388bd958430843cecc78ee205415
  • Loading branch information...
commit b1e3df8085f10ae769a1b7c761b3a028d61d1d53 1 parent 295c1d7
Dmitriy Kalinin and Jesse Zhang authored
17 lib/dea/bootstrap.rb
View
@@ -314,6 +314,9 @@ def setup_nats
def start_nats
@nats.start
+
+ @responders = [Dea::Responders::Stage.new(nats, self, config)]
+ @responders.map(&:start)
end
def start_component
@@ -563,19 +566,6 @@ def handle_dea_stop(message)
end
end
- def handle_dea_stage(message)
- logger.info("<staging> Got staging request with #{message.data.inspect}")
- staging_task = StagingTask.new(self, message.data)
- staging_task.start do |error|
- result = {
- "task_id" => staging_task.task_id,
- "task_log" => staging_task.task_log
- }
- result["error"] = error.to_s if error
- message.respond(result)
- end
- end
-
def handle_dea_discover(message)
runtime = message.data["runtime"]
rs = message.data["limits"]
@@ -693,6 +683,7 @@ def shutdown
ignore_signals
+ @responders.map(&:stop) if @responders
nats.stop
unregister_directory_server_v2
13 lib/dea/nats.rb
View
@@ -57,12 +57,6 @@ def start
subscribe("droplet.status") do |message|
bootstrap.handle_droplet_status(message)
end
-
- if config["staging"] && config["staging"]["enabled"]
- subscribe("staging", :queue => "staging") do |message|
- bootstrap.handle_dea_stage(message)
- end
- end
end
def stop
@@ -75,13 +69,18 @@ def publish(subject, data)
end
def subscribe(subject, opts={})
+ # Do not track subscription option is used with responders
+ # since we want them to be responsible for subscribe/unsubscribe.
+ do_not_track_subscription = opts.delete(:do_not_track_subscription)
+
sid = client.subscribe(subject, opts) do |raw_data, respond_to|
message = Message.decode(self, subject, raw_data, respond_to)
logger.debug "Received on #{subject.inspect}: #{message.data.inspect}"
yield message
end
- @sids[subject] = sid
+ @sids[subject] = sid unless do_not_track_subscription
+ sid
end
def client
38 lib/dea/responders/stage.rb
View
@@ -0,0 +1,38 @@
+require "dea/staging_task"
+
+module Dea::Responders
+ class Stage
+ def initialize(nats, bootstrap, config)
+ @nats = nats
+ @bootstrap = bootstrap
+ @config = config
+ end
+
+ def start
+ if @config["staging"] && @config["staging"]["enabled"]
+ options = {:do_not_track_subscription => true, :queue => "staging"}
+ @sid = @nats.subscribe("staging", options) do |message|
+ handle(message)
+ end
+ end
+ end
+
+ def stop
+ @nats.unsubscribe(@sid) if @sid
+ end
+
+ def handle(message)
+ logger.info("<staging> Got staging request with #{message.data.inspect}")
+
+ task = Dea::StagingTask.new(@bootstrap, message.data)
+ task.start do |error|
+ result = {
+ "task_id" => task.task_id,
+ "task_log" => task.task_log
+ }
+ result["error"] = error.to_s if error
+ message.respond(result)
+ end
+ end
+ end
+end
27 spec/dea/bootstrap_spec.rb
View
@@ -22,33 +22,6 @@
Dea::Bootstrap.new(@config)
end
- describe "#handle_dea_stage" do
- let(:nats) { stub(:nats) }
- let(:message) { Dea::Nats::Message.new(nats, nil, {}, nil) }
-
- it "responds to the message when staging is finished" do
- Dea::StagingTask.any_instance.should_receive(:start).and_yield(nil)
- Dea::StagingTask.any_instance.should_receive(:task_id).and_return("the_uuid")
- Dea::StagingTask.any_instance.should_receive(:task_log).and_return("some log here")
-
- expected_message = {"task_id" => "the_uuid", "task_log" => "some log here"}
- nats.should_receive(:publish).with(nil, expected_message)
-
- bootstrap.handle_dea_stage(message)
- end
-
- it "responds to the message when staging raises an error" do
- Dea::StagingTask.any_instance.should_receive(:start).and_yield(RuntimeError.new("Staging Failed"))
- Dea::StagingTask.any_instance.should_receive(:task_id).and_return("the_uuid")
- Dea::StagingTask.any_instance.should_receive(:task_log).and_return("some log here")
-
- expected_message = {"task_id" => "the_uuid", "task_log" => "some log here", "error" => "Staging Failed"}
- nats.should_receive(:publish).with(nil, expected_message)
-
- bootstrap.handle_dea_stage(message)
- end
- end
-
describe "logging setup" do
after do
bootstrap.setup_logging
50 spec/dea/nats_spec.rb
View
@@ -58,43 +58,6 @@
nats_client.receive_message(subject, data)
end
end
-
- describe "#staging" do
- it "does not listen to staging by default" do
- data = { "subject" => "staging"}
-
- bootstrap.should_not_receive(:handle_dea_stage)
-
- nats_client.receive_message("staging", data)
- end
-
- context "when the config says to stage things" do
- before do
- @config.merge!({"staging" => {"enabled" => true}})
-
- new_nats = Dea::Nats.new(bootstrap, @config)
-
- NATS.stub(:connect) do |options|
- options[:uri].should match(/^nats:/)
- @new_nats_client = NatsClientMock.new(options)
- end
-
- new_nats.start
- end
-
-
- it "responds to the staging message" do
- data = { "subject" => "staging" }
-
- bootstrap.should_receive(:handle_dea_stage).with(kind_of(Dea::Nats::Message)) do |message|
- message.subject.should == "staging"
- message.data.should == data
- end
-
- @new_nats_client.receive_message("staging", data)
- end
- end
- end
end
describe "subscription teardown" do
@@ -115,4 +78,17 @@
nats_client.receive_message("echo", { "hello" => "world" }, "echo.reply")
end
end
+
+ describe "#subscribe" do
+ it "returns subscription id" do
+ sids = [nats.subscribe("subject-2"), nats.subscribe("subject-1")]
+ sids.uniq.should == sids
+ end
+
+ it "does not unsubscribe if subscribed with do-not-track-subscription option" do
+ nats.subscribe("subject-1", :do_not_track_subscription => true)
+ nats_client.should_not_receive(:unsubscribe)
+ nats.stop
+ end
+ end
end
110 spec/dea/responders/stage_spec.rb
View
@@ -0,0 +1,110 @@
+require "spec_helper"
+require "dea/nats"
+require "dea/responders/stage"
+
+describe Dea::Responders::Stage do
+ let(:nats) { mock(:nats) }
+ let(:config) { {} }
+ let(:bootstrap) { mock(:bootstrap, :config => config) }
+ subject { described_class.new(nats, bootstrap, config) }
+
+ describe "#start" do
+ let(:nats) { NatsClientMock.new }
+
+ context "when config does not allow staging operations" do
+ let(:config) { {} }
+
+ it "does not listen to staging" do
+ subject.start
+ subject.should_not_receive(:handle)
+ nats.publish("staging")
+ end
+ end
+
+ context "when the config allows staging operation" do
+ let(:config) { {"staging" => {"enabled" => true}} }
+
+ it "subscribes to staging message" do
+ subject.start
+ subject.should_receive(:handle)
+ nats.publish("staging")
+ end
+
+ it "subscribes to staging message as part of the queue group" do
+ nats.should_receive(:subscribe).with("staging", hash_including(:queue => "staging"))
+ subject.start
+ end
+
+ it "subscribes to staging message but manually tracks the subscription" do
+ nats.should_receive(:subscribe).with("staging", hash_including(:do_not_track_subscription => true))
+ subject.start
+ end
+ end
+ end
+
+ describe "#stop" do
+ let(:nats) { NatsClientMock.new }
+ let(:config) { {"staging" => {"enabled" => true}} }
+
+ context "when subscription was made" do
+ before { subject.start }
+
+ it "unsubscribes to staging message" do
+ subject.should_receive(:handle) # sanity check
+ nats.publish("staging")
+
+ subject.stop
+ subject.should_not_receive(:handle)
+ nats.publish("staging")
+ end
+ end
+
+ context "when subscription was not made" do
+ it "does not unsubscribe" do
+ nats.should_not_receive(:unsubscribe)
+ subject.stop
+ end
+ end
+ end
+
+ describe "#handle" do
+ let(:message) { Dea::Nats::Message.new(nats, nil, {"something" => "value"}, "respond-to") }
+ let(:staging_task) { mock(:staging_task, :task_id => "task-id", :task_log => "task-log") }
+
+ before { Dea::StagingTask.stub(:new => staging_task) }
+
+ it "starts staging task" do
+ Dea::StagingTask
+ .should_receive(:new)
+ .with(bootstrap, message.data)
+ .and_return(staging_task)
+ staging_task.should_receive(:start)
+ subject.handle(message)
+ end
+
+ context "when staging is successful" do
+ before { staging_task.stub(:start).and_yield(nil) }
+
+ it "responds with successful message" do
+ nats.should_receive(:publish).with("respond-to", {
+ "task_id" => "task-id",
+ "task_log" => "task-log",
+ })
+ subject.handle(message)
+ end
+ end
+
+ context "when staging task fails" do
+ before { staging_task.stub(:start).and_yield(RuntimeError.new("error-description")) }
+
+ it "responds with error message" do
+ nats.should_receive(:publish).with("respond-to", {
+ "task_id" => "task-id",
+ "task_log" => "task-log",
+ "error" => "error-description",
+ })
+ subject.handle(message)
+ end
+ end
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.