Navigation Menu

Skip to content

Commit

Permalink
Prepare required information on demand
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 30, 2015
1 parent 1710f65 commit a832ca7
Showing 1 changed file with 20 additions and 10 deletions.
30 changes: 20 additions & 10 deletions lib/droonga/forward_buffer.rb
Expand Up @@ -41,16 +41,11 @@ def initialize(node_name)
@unpacker = MessagePack::Unpacker.new

@target = node_name
@serf = Serf.new(ENV["DROONGA_ENGINE_NAME"] || node_name)

dirname = node_name.gsub("/", ":")
@data_directory = Path.intentional_buffer + dirname
FileUtils.mkdir_p(@data_directory.to_s)
end

def add(message, destination)
logger.trace("add: start")
@serf.set_have_unprocessed_messages_for(@target)
serf.set_have_unprocessed_messages_for(@target)
buffered_message = {
"message" => message,
"destination" => destination,
Expand Down Expand Up @@ -81,25 +76,40 @@ def start_forward
# (ex. messages generated by Dispatcher)
@process_messages_newer_than_timestamp = nil
end
@serf.reset_have_unprocessed_messages_for(@target)
serf.reset_have_unprocessed_messages_for(@target)
logger.trace("start_forward: done")
end

def buffered_messages
Pathname.glob("#{@data_directory}/*#{SUFFIX}").sort_by do |path|
Pathname.glob("#{data_directory}/*#{SUFFIX}").sort_by do |path|
path
end
end

def empty?
@data_directory.children.empty?
data_directory.children.empty?
end

def process_messages_newer_than(timestamp)
@process_messages_newer_than_timestamp = timestamp
end

private
def data_directory
@data_directory ||= resolve_data_directory
end

def resolve_data_directory
dirname = @target.gsub("/", ":")
data_directory = Path.intentional_buffer + dirname
FileUtils.mkdir_p(data_directory.to_s)
data_directory
end

def serf
@serf ||= Serf.new(ENV["DROONGA_ENGINE_NAME"] || @target)
end

def forward(buffered_message_path)
logger.trace("forward: start (#{buffered_message_path})")

Expand Down Expand Up @@ -146,7 +156,7 @@ def forward(buffered_message_path)

def create_buffered_message_path(time_stamp=Time.now)
basename = Timestamp.stringify(time_stamp)
Path.unique_file_path(@data_directory, basename, SUFFIX)
Path.unique_file_path(data_directory, basename, SUFFIX)
end

def on_forward(message, destination)
Expand Down

0 comments on commit a832ca7

Please sign in to comment.