Navigation Menu

Skip to content

Commit

Permalink
Don't forward messages older than the timestamp given as "process_mes…
Browse files Browse the repository at this point in the history
…sages_newer_than"
  • Loading branch information
piroor committed Apr 21, 2015
1 parent 010616d commit 8df8014
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions lib/droonga/forward_buffer.rb
Expand Up @@ -64,8 +64,13 @@ def add(message, destination)

def start_forward
logger.trace("start_forward: start")
fowarded = false
Pathname.glob("#{@data_directory}/*#{SUFFIX}").collect do |buffered_message_path|
forward(buffered_message_path)
fowarded = forward(buffered_message_path) || fowarded
end
if @process_messages_newer_than_timestamp and fowarded
logger.info("New message is detected and forwarded. The boundary is now cleared.")
@process_messages_newer_than_timestamp = nil
end
@serf.reset_have_unprocessed_messages_for(@target)
logger.trace("start_forward: done")
Expand All @@ -90,6 +95,8 @@ def forward(buffered_message_path)
message = buffered_message["message"]
destination = buffered_message["destination"]

forwarded = false

if @process_messages_newer_than_timestamp
message_timestamp = Time.parse(message["date"])
logger.trace("Checking boundary of obsolete message",
Expand All @@ -98,8 +105,9 @@ def forward(buffered_message_path)
if @process_messages_newer_than_timestamp >= message_timestamp
buffered_message = nil
else
logger.info("New message is detected. The boundary is now cleared.")
@process_messages_newer_than_timestamp = nil
logger.info("New message is detected.")
# Don't clear the boundary for now, because older messages
# forwarded by the dispatcher can be still buffered.
end
end

Expand All @@ -109,10 +117,13 @@ def forward(buffered_message_path)
:destination => destination)
message["xSender"] = "forward-buffer"
on_forward(message, destination)
forwarded = true
end

FileUtils.rm_f(buffered_message_path.to_s)
logger.trace("forward: done (#{buffered_message_path})")

forwarded
end

def file_path(time_stamp=Time.now)
Expand Down

0 comments on commit 8df8014

Please sign in to comment.