Navigation Menu

Skip to content

Commit

Permalink
Make NodeMetadata required parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Jan 16, 2015
1 parent 0802bed commit 869df87
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 37 deletions.
10 changes: 3 additions & 7 deletions lib/droonga/cluster.rb
Expand Up @@ -26,13 +26,13 @@ class Cluster
attr_accessor :catalog
attr_writer :on_change

def initialize(loop, options={})
def initialize(loop, params)
@loop = loop

@catalog = nil
@state = nil
@on_change = nil
@node_metadata = options[:metadata]
@node_metadata = params[:metadata]

reload
end
Expand Down Expand Up @@ -161,18 +161,14 @@ def create_engine_nodes
EngineNode.new(name,
node_state,
@loop,
:metadata => node_metadata)
:metadata => @node_metadata)
end
end

def default_state
{}
end

def node_metadata
@node_metadata ||= NodeMetadata.new
end

def log_tag
"cluster_state"
end
Expand Down
20 changes: 10 additions & 10 deletions lib/droonga/engine.rb
Expand Up @@ -32,20 +32,24 @@ class Engine

attr_writer :on_ready
def initialize(loop, name, internal_name)
@node_metadata = NodeMetadata.new
@state = EngineState.new(loop, name,
internal_name,
:metadata => node_metadata)
:metadata => @node_metadata)
@cluster = Cluster.new(loop,
:metadata => node_metadata)
:metadata => @node_metadata)
@catalog = load_catalog
@state.catalog = @cluster.catalog = @catalog

@dispatcher = create_dispatcher

@node_metadata_observer = FileObserver.new(loop, Path.node_metadata)
@node_metadata_observer.on_change = lambda do
logger.trace("reloading node_metadata: start")
node_metadata.reload
@node_metadata.reload
logger.trace("reloading node_metadata: done")
end

@on_ready = nil
end

Expand Down Expand Up @@ -103,10 +107,6 @@ def process(message)
@dispatcher.process_message(message)
end

def node_metadata
@node_metadata ||= NodeMetadata.new
end

private
def load_catalog
catalog_path = Path.catalog
Expand All @@ -125,7 +125,7 @@ def create_dispatcher
def save_last_processed_message_timestamp
logger.trace("output_last_processed_message_timestamp: start")
if @last_processed_message_timestamp
node_metadata.set(:last_processed_message_timestamp, @last_processed_message_timestamp.to_s)
@node_metadata.set(:last_processed_message_timestamp, @last_processed_message_timestamp.to_s)
end
logger.trace("output_last_processed_message_timestamp: done")
end
Expand All @@ -147,13 +147,13 @@ def effective_message?(message)
return false if effective_timestamp >= message_timestamp

logger.trace("deleting obsolete effective_message_timestamp: start")
node_metadata.delete(:effective_message_timestamp)
@node_metadata.delete(:effective_message_timestamp)
logger.trace("deleting obsolete effective_message_timestamp: done")
true
end

def effective_message_timestamp
timestamp = node_metadata.get(:effective_message_timestamp)
timestamp = @node_metadata.get(:effective_message_timestamp)
return nil unless timestamp

begin
Expand Down
10 changes: 3 additions & 7 deletions lib/droonga/engine_node.rb
Expand Up @@ -23,10 +23,10 @@ class EngineNode

attr_reader :name

def initialize(name, state, loop, options={})
def initialize(name, state, loop, params)
@name = name
@state = state
@sender_node_metadata ||= options[:metadata]
@sender_node_metadata = params[:metadata]

parsed_name = parse_node_name(@name)
@sender = FluentMessageSender.new(loop,
Expand Down Expand Up @@ -134,11 +134,7 @@ def absorb_destination?
end

def sender_role
sender_node_metadata.role
end

def sender_node_metadata
@sender_node_metadata ||= NodeMetadata.new
@sender_node_metadata.role
end

def output(message, destination)
Expand Down
10 changes: 3 additions & 7 deletions lib/droonga/engine_state.rb
Expand Up @@ -36,7 +36,7 @@ class EngineState
attr_accessor :catalog
attr_accessor :on_finish

def initialize(loop, name, internal_name, options={})
def initialize(loop, name, internal_name, params)
@loop = loop
@name = name
@internal_name = internal_name
Expand All @@ -47,7 +47,7 @@ def initialize(loop, name, internal_name, options={})
@on_ready = nil
@on_finish = nil
@catalog = nil
@node_metadata = options[:metadata]
@node_metadata = params[:metadata]
end

def start
Expand Down Expand Up @@ -107,18 +107,14 @@ def have_session?
end

def role
node_metadata.role
@node_metadata.role
end

def on_ready
@on_ready.call if @on_ready
end

private
def node_metadata
@node_metadata ||= NodeMetadata.new
end

def log_tag
"engine_state"
end
Expand Down
9 changes: 3 additions & 6 deletions lib/droonga/serf.rb
Expand Up @@ -40,6 +40,7 @@ def initialize(name)
@serf = nil
@name = name
@service_installation = ServiceInstallation.new
@node_metadata = NodeMetadata.new
end

def run_agent(loop)
Expand Down Expand Up @@ -142,13 +143,13 @@ def update_cluster_id
end

def role
node_metadata.role
@node_metadata.role
end

def role=(new_role)
new_role ||= NodeMetadata::Role::SERVICE_PROVIDER
set_tag("role", new_role)
node_metadata.role = new_role
@node_metadata.role = new_role
end

def cluster_id
Expand Down Expand Up @@ -206,10 +207,6 @@ def rpc_port
7373
end

def node_metadata
@node_metadata ||= NodeMetadata.new
end

def agent_port
Agent::PORT
end
Expand Down

0 comments on commit 869df87

Please sign in to comment.