Skip to content

Commit

Permalink
Prepared the cluster for a more flexible state storage backend config…
Browse files Browse the repository at this point in the history
…uration.
  • Loading branch information
roidrage committed Jun 26, 2009
1 parent 902738f commit ecd142f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 13 deletions.
38 changes: 25 additions & 13 deletions lib/nanite/cluster.rb
Expand Up @@ -2,23 +2,16 @@ module Nanite
class Cluster
attr_reader :agent_timeout, :nanites, :reaper, :serializer, :identity, :amq, :redis, :mapper, :callbacks

def initialize(amq, agent_timeout, identity, serializer, mapper, redis=nil, callbacks = {})
def initialize(amq, agent_timeout, identity, serializer, mapper, state_configuration=nil, callbacks = {})
@amq = amq
@agent_timeout = agent_timeout
@identity = identity
@serializer = serializer
@mapper = mapper
@redis = redis
@state = state_configuration
@security = SecurityProvider.get
@callbacks = callbacks
if redis
Nanite::Log.info("using redis for state storage")
require 'nanite/state'
@nanites = ::Nanite::State.new(redis)
else
require 'nanite/local_state'
@nanites = Nanite::LocalState.new
end
setup_state
@reaper = Reaper.new(agent_timeout)
setup_queues
end
Expand Down Expand Up @@ -167,7 +160,7 @@ def setup_queues
end

def setup_heartbeat_queue
if @redis
if shared_state?
amq.queue("heartbeat").bind(amq.fanout('heartbeat', :durable => true)).subscribe do |ping|
Nanite::Log.debug('got heartbeat')
handle_ping(serializer.load(ping))
Expand All @@ -181,7 +174,7 @@ def setup_heartbeat_queue
end

def setup_registration_queue
if @redis
if shared_state?
amq.queue("registration").bind(amq.fanout('registration', :durable => true)).subscribe do |msg|
Nanite::Log.debug('got registration')
register(serializer.load(msg))
Expand All @@ -195,7 +188,7 @@ def setup_registration_queue
end

def setup_request_queue
if @redis
if shared_state?
amq.queue("request").bind(amq.fanout('request', :durable => true)).subscribe do |msg|
Nanite::Log.debug('got request')
handle_request(serializer.load(msg))
Expand All @@ -207,5 +200,24 @@ def setup_request_queue
end
end
end

def setup_state
case @state
when String
# backwards compatibility, we assume redis if the configuration option
# was a string
Nanite::Log.info("using redis for state storage")
require 'nanite/state'
@nanites = Nanite::State.new(@state)
when Hash
else
require 'nanite/local_state'
@nanites = Nanite::LocalState.new
end
end

def shared_state?
!@state.nil?
end
end
end
14 changes: 14 additions & 0 deletions spec/cluster_spec.rb
Expand Up @@ -93,6 +93,20 @@

end # Reaper

describe "State" do
require 'nanite/state'
it "should use a local state by default" do
cluster = Nanite::Cluster.new(@amq, 443, "the_identity", @serializer, @mapper)
cluster.nanites.instance_of?(Nanite::LocalState).should == true
end

it "should set up a redis state when requested" do
state = Nanite::State.new("")
Nanite::State.should_receive(:new).with("localhost:1234").and_return(state)
cluster = Nanite::Cluster.new(@amq, 443, "the_identity", @serializer, @mapper, "localhost:1234")
cluster.nanites.instance_of?(Nanite::State).should == true
end
end
end # Intialization


Expand Down

0 comments on commit ecd142f

Please sign in to comment.