Navigation Menu

Skip to content

Commit

Permalink
Remoev obsolete codes to confrol buffering by forwarder
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Jan 6, 2015
1 parent f6587af commit 165dd0f
Showing 1 changed file with 1 addition and 37 deletions.
38 changes: 1 addition & 37 deletions lib/droonga/forwarder.rb
Expand Up @@ -17,8 +17,6 @@

require "droonga/loggable"
require "droonga/path"
require "droonga/event_loop"
require "droonga/buffered_tcp_socket"
require "droonga/fluent_message_sender"

module Droonga
Expand All @@ -27,13 +25,11 @@ class Forwarder

def initialize(loop, options={})
@loop = loop
@buffering = options[:buffering]
@senders = {}
end

def start
logger.trace("start: start")
resume
logger.trace("start: done")
end

Expand All @@ -54,35 +50,6 @@ def forward(message, destination)
logger.trace("forward: done")
end

def resume
return unless Path.buffer.exist?
Pathname.glob("#{Path.buffer}/*") do |path|
next unless path.directory?

destination = path.basename.to_s
sender = @senders[destination]
if sender
sender.resume
next
end

chunk_loader = BufferedTCPSocket::ChunkLoader.new(path)
unless chunk_loader.have_any_chunk?
#FileUtils.rm_rf(path.to_s) # TODO re-enable this
next
end

components = destination.split(":")
port = components.pop.to_i
next if port.zero?
host = components.join(":")

sender = create_sender(host, port)
sender.resume
@senders[destination] = sender
end
end

private
def output(receiver, message, command, arguments, options={})
logger.trace("output: start")
Expand Down Expand Up @@ -139,10 +106,7 @@ def extract_connection_id(params)
end

def create_sender(host, port)
options = {
:buffering => @buffering,
}
sender = FluentMessageSender.new(@loop, host, port, options)
sender = FluentMessageSender.new(@loop, host, port)
sender.start
sender
end
Expand Down

0 comments on commit 165dd0f

Please sign in to comment.