Permalink
Browse files

made MessageBus a class

Change-Id: Ib499fc9e82d326eb31769a8ec7371f8f788239bc
  • Loading branch information...
1 parent 3f3d12e commit 16e4126286fb659475255ac7f54296430b4402d7 @d d committed Feb 7, 2013
@@ -63,10 +63,10 @@ def after_update(app, changes)
end
def send_droplet_updated_message(app)
- json = Yajl::Encoder.encode(:droplet => app.guid,
- :cc_partition => config[:cc_partition])
- MessageBus.publish("droplet.updated", json)
- nil
+ MessageBus.instance.publish("droplet.updated", Yajl::Encoder.encode(
+ :droplet => app.guid,
+ :cc_partition => config[:cc_partition]
+ ))
end
def self.translate_validation_exception(e, attributes)
@@ -22,7 +22,8 @@ def stage_app(app)
LegacyStaging.with_upload_handle(app.guid) do |handle|
client_error = nil
results = EM.schedule_sync do |promise|
- client = VCAP::Stager::Client::EmAware.new(MessageBus.nats.client, queue)
+ client = VCAP::Stager::Client::EmAware.new(MessageBus.instance.nats.client, queue)
+
request = staging_request(app)
logger.debug "staging #{app.guid} request: #{request}"
deferrable = client.stage(request, staging_timeout)
@@ -143,19 +143,20 @@ def self.merge_defaults(config)
end
def self.configure(config)
- # TODO: this introduces 2 config styles. CC takes config
- # via per instance constructor. Remove that in favor of this
- # method as there will be more along these lines.
- VCAP::CloudController::MessageBus.configure(config)
+ mbus = VCAP::CloudController::MessageBus.new(config)
+ VCAP::CloudController::MessageBus.instance = mbus
+
VCAP::CloudController::AccountCapacity.configure(config)
VCAP::CloudController::ResourcePool.configure(config)
VCAP::CloudController::AppPackage.configure(config)
VCAP::CloudController::AppStager.configure(config)
VCAP::CloudController::LegacyStaging.configure(config)
- VCAP::CloudController::DeaPool.configure(config)
- VCAP::CloudController::DeaClient.configure(config)
- VCAP::CloudController::HealthManagerClient.configure
- VCAP::CloudController::LegacyBulk.configure(config)
+
+ VCAP::CloudController::DeaPool.configure(config, mbus)
+ VCAP::CloudController::DeaClient.configure(config, mbus)
+ VCAP::CloudController::HealthManagerClient.configure(mbus)
+
+ VCAP::CloudController::LegacyBulk.configure(config, mbus)
VCAP::CloudController::Models::QuotaDefinition.configure(config)
end
@@ -24,7 +24,7 @@ class << self
attr_reader :config, :message_bus, :dea_pool
- def configure(config, message_bus = MessageBus, dea_pool = DeaPool)
+ def configure(config, message_bus, dea_pool = DeaPool)
@config = config
@message_bus = message_bus
@dea_pool = dea_pool
@@ -9,7 +9,7 @@ module DeaPool
class << self
attr_reader :config, :message_bus
- def configure(config, message_bus = MessageBus)
+ def configure(config, message_bus)
@config = config
@message_bus = message_bus
@deas = {}
@@ -5,7 +5,7 @@ module HealthManagerClient
class << self
attr_reader :message_bus
- def configure(message_bus = MessageBus)
+ def configure(message_bus)
@message_bus = message_bus
end
@@ -37,13 +37,13 @@ class LegacyBulk < RestController::Base
allow_unauthenticated_access
class << self
- attr_reader :message_bus, :config
+ attr_reader :config, :message_bus
- def configure(config)
- @message_bus = config.fetch(:message_bus, MessageBus)
+ def configure(config, message_bus)
@config = config[:bulk_api].merge(
:cc_partition => config.fetch(:cc_partition),
)
+ @message_bus = message_bus
end
def register_subscription
@@ -62,7 +62,6 @@ def credentials
config[:auth_password],
]
end
-
end
def initialize(*)
@@ -4,19 +4,24 @@
require "vcap/component"
require "cloud_controller/json_patch"
-module VCAP::CloudController::MessageBus
+class VCAP::CloudController::MessageBus
class << self
- attr_reader :config
- attr_reader :nats
+ attr_reader :instance
- def configure(config)
- @config = config
- @nats = config[:nats] || NATS
+ def instance=(instance)
+ raise ArgumentError, "instance must not be nil" unless instance
+ @instance = instance
end
end
- def self.register_components
- # hook up with NATS
+ attr_reader :config, :nats
+
+ def initialize(config)
+ @config = config
+ @nats = config[:nats] || NATS
+ end
+
+ def register_components
# TODO: put useful metrics in varz
# TODO: subscribe to the two DEA channels
EM.schedule do
@@ -32,7 +37,7 @@ def self.register_components
end
end
- def self.register_routes
+ def register_routes
EM.schedule do
# TODO: blacklist api2 in legacy CC
# TODO: Yajl should probably also be injected
@@ -42,7 +47,9 @@ def self.register_routes
:uris => [config[:external_domain]],
:tags => {:component => "CloudController" },
})
+
nats.publish("router.register", router_register_message)
+
# Broadcast when a router restarts
nats.subscribe("router.start") do
nats.publish("router.register", router_register_message)
@@ -59,7 +66,7 @@ def self.register_routes
# @yield [payload, inbox] callback invoked when a message is posted on the subject
# @yieldparam [String] payload the message posted on the channel
# @yieldparam [optional, String] inbox an optional "reply to" subject, nil if not requested
- def self.subscribe(subject, opts = {}, &blk)
+ def subscribe(subject, opts = {}, &blk)
subscribe_on_reactor(subject, opts) do |payload, inbox|
EM.defer do
begin
@@ -74,21 +81,13 @@ def self.subscribe(subject, opts = {}, &blk)
end
end
- def self.subscribe_on_reactor(subject, opts = {}, &blk)
- EM.schedule do
- nats.subscribe(subject, opts) do |msg, inbox|
- process_message(msg, inbox, &blk)
- end
- end
- end
-
- def self.publish(subject, message = nil)
+ def publish(subject, message = nil)
EM.schedule do
nats.publish(subject, message)
end
end
- def self.request(subject, data = nil, opts = {})
+ def request(subject, data = nil, opts = {})
opts ||= {}
expected = opts[:expected] || 1
timeout = opts[:timeout] || -1
@@ -115,14 +114,22 @@ def self.request(subject, data = nil, opts = {})
private
- def self.process_message(msg, inbox, &blk)
+ def subscribe_on_reactor(subject, opts = {}, &blk)
+ EM.schedule do
+ nats.subscribe(subject, opts) do |msg, inbox|
+ process_message(msg, inbox, &blk)
+ end
+ end
+ end
+
+ def process_message(msg, inbox, &blk)
payload = Yajl::Parser.parse(msg, :symbolize_keys => true)
blk.yield(payload, inbox)
rescue => e
logger.error "exception processing: '#{msg}' '#{e}'"
end
- def self.logger
+ def logger
@logger ||= Steno.logger("cc.mbus")
end
end
@@ -116,8 +116,8 @@ def run!
# TODO: we really should put these bootstrapping into a place other
# than Rack::Builder
use Rack::CommonLogger
- VCAP::CloudController::MessageBus.register_components
- VCAP::CloudController::MessageBus.register_routes
+ VCAP::CloudController::MessageBus.instance.register_components
+ VCAP::CloudController::MessageBus.instance.register_routes
VCAP::CloudController::DeaPool.register_subscriptions
VCAP::CloudController::LegacyBulk.register_subscription
VCAP::CloudController.health_manager_respondent = VCAP::CloudController::HealthManagerRespondent.new(config)
View
@@ -108,7 +108,7 @@ module VCAP::CloudController
app_obj.save
app_obj.needs_staging?.should be_false
- MessageBus.should_not_receive(:publish).with("droplet.updated", anything)
+ MessageBus.instance.should_not_receive(:publish).with("droplet.updated", anything)
req = Yajl::Encoder.encode(:instances => app_obj.instances + 1)
put "/v2/apps/#{app_obj.guid}", req, json_headers(admin_headers)
@@ -126,7 +126,7 @@ module VCAP::CloudController
"cc_partition" => config[:cc_partition],
}
- MessageBus.should_not_receive(:publish)
+ MessageBus.instance.should_not_receive(:publish)
req = Yajl::Encoder.encode(:instances => app_obj.instances + 1)
put "/v2/apps/#{app_obj.guid}", req, json_headers(admin_headers)
@@ -138,17 +138,18 @@ module VCAP::CloudController
app_obj.state = "STARTED"
app_obj.save
app_obj.needs_staging?.should be_true
+
AppStager.should_receive(:stage_app) do |app|
app.update(:droplet_hash => "def")
end
- MessageBus.should_receive(:publish).with(
+
+ MessageBus.instance.should_receive(:publish).with(
"droplet.updated",
- json_match(
- hash_including(
- "droplet" => app_obj.guid,
- ),
- ),
+ json_match(hash_including(
+ "droplet" => app_obj.guid,
+ )),
)
+
req = Yajl::Encoder.encode(:instances => app_obj.instances + 1)
put "/v2/apps/#{app_obj.guid}", req, json_headers(admin_headers)
last_response.status.should == 201
@@ -175,9 +176,10 @@ module VCAP::CloudController
"cc_partition" => config[:cc_partition],
}
- MessageBus.should_receive(:publish).
- with("droplet.updated",
- json_match(hash_including(expected)))
+ MessageBus.instance.should_receive(:publish).with(
+ "droplet.updated",
+ json_match(hash_including(expected))
+ )
put "/v2/apps/#{app_obj.guid}", req, json_headers(admin_headers)
last_response.status.should == 201
@@ -194,9 +196,10 @@ module VCAP::CloudController
"cc_partition" => config[:cc_partition],
}
- MessageBus.should_receive(:publish).
- with("droplet.updated",
- json_match(hash_including(expected)))
+ MessageBus.instance.should_receive(:publish).with(
+ "droplet.updated",
+ json_match(hash_including(expected))
+ )
put "/v2/apps/#{app_obj.guid}", req, json_headers(admin_headers)
last_response.status.should == 201
@@ -225,9 +228,10 @@ module VCAP::CloudController
"cc_partition" => config[:cc_partition],
}
- MessageBus.should_receive(:publish).
- with("droplet.updated",
- json_match(hash_including(expected)))
+ MessageBus.instance.should_receive(:publish).with(
+ "droplet.updated",
+ json_match(hash_including(expected))
+ )
put "/v2/apps/#{app_obj.guid}", req, json_headers(admin_headers)
last_response.status.should == 201
@@ -301,25 +305,20 @@ module VCAP::CloudController
:space => space,
)
- nats = double("mock nats")
- config_override(:nats => nats)
- nats.should_receive(:publish).with(
+ MessageBus.instance.should_receive(:publish).with(
"dea.update",
json_match(
hash_including("uris" => ["app.jesse.cloud"]),
),
)
- EM.run do
- put(
- @app_url,
- App::UpdateMessage.new(
- :route_guids => [route.guid],
- ).encode(),
- @headers_for_user,
- )
- EM.next_tick { EM.stop }
- end
+ put(
+ @app_url,
+ App::UpdateMessage.new(
+ :route_guids => [route.guid],
+ ).encode(),
+ @headers_for_user,
+ )
last_response.status.should == 201
end
@@ -341,28 +340,20 @@ module VCAP::CloudController
r["metadata"]["guid"]
}.sort.should == [bar_route.guid, route.guid].sort
- nats = double("mock nats")
- config_override(:nats => nats)
- # inject mock nats
- MessageBus.configure(config)
-
- nats.should_receive(:publish).with(
+ MessageBus.instance.should_receive(:publish).with(
"dea.update",
- json_match(
- hash_including("uris" => ["foo.jesse.cloud"]),
- ),
+ json_match(hash_including(
+ "uris" => ["foo.jesse.cloud"],
+ )),
)
- EM.run do
- put(
- @app_url,
- App::UpdateMessage.new(
- :route_guids => [route.guid],
- ).encode,
- @headers_for_user,
- )
- EM.stop
- end
+ put(
+ @app_url,
+ App::UpdateMessage.new(
+ :route_guids => [route.guid],
+ ).encode,
+ @headers_for_user,
+ )
last_response.status.should == 201
end
Oops, something went wrong.

0 comments on commit 16e4126

Please sign in to comment.