Navigation Menu

Skip to content

Commit

Permalink
Output target node for each log from fluent message sender
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 6, 2015
1 parent 0025451 commit 04f9d20
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions lib/droonga/fluent_message_sender.rb
Expand Up @@ -39,30 +39,30 @@ def initialize(loop, host, port, options={})
end

def start
logger.trace("start: start")
logger.trace("start: done")
logger.trace("start: start for #{target_node}")
logger.trace("start: done for #{target_node}")
end

def shutdown
logger.trace("shutdown: start")
logger.trace("shutdown: start for #{target_node}")
shutdown_socket
logger.trace("shutdown: done")
logger.trace("shutdown: done for #{target_node}")
end

def send(tag, data)
logger.trace("send: start")
logger.trace("send: start for #{target_node}")
packed_fluent_message = create_packed_fluent_message(tag, data)
unless connected?
logger.trace("send: reconnect to #{@host}:#{@port}")
logger.trace("send: reconnect to #{target_node}")
connect
end
@socket.write(packed_fluent_message)
logger.trace("send: done")
logger.trace("send: done for #{target_node}")
end

def resume
unless connected?
logger.trace("resume: reconnect to #{@host}:#{@port}")
logger.trace("resume: reconnect to #{target_node}")
connect
end
end
Expand All @@ -73,25 +73,25 @@ def connected?
end

def connect
logger.trace("connect: start")
logger.trace("connect: start for #{target_node}")

log_write_complete = lambda do
logger.trace("write completed")
logger.trace("write completed for #{target_node}")
end
log_connect = lambda do
logger.trace("connected to #{@host}:#{@port}")
logger.trace("connected to #{target_node}")
end
log_failed = lambda do
logger.error("failed to connect to #{@host}:#{@port}")
logger.error("failed to connect to #{target_node}")
@socket = nil
end
on_close = lambda do
logger.trace("connection to #{@host}:#{@port} is closed by someone")
logger.trace("connection to #{target_node} is closed by someone")
@socket = nil
end

if @buffering
data_directory = Path.accidental_buffer + "#{@host}:#{@port}"
data_directory = Path.accidental_buffer + "#{target_node}"
FileUtils.mkdir_p(data_directory.to_s)
@socket = BufferedTCPSocket.connect(@host, @port, data_directory)
@socket.resume
Expand All @@ -116,7 +116,7 @@ def connect
:host => @host,
:port => @port)

logger.trace("connect: done")
logger.trace("connect: done for #{target_node}")
end

def shutdown_socket
Expand All @@ -137,6 +137,10 @@ def create_packed_fluent_message(tag, data)
packed_fluent_message
end

def target_node
"#{@host}:#{@port}"
end

def log_tag
"[#{Process.ppid}] fluent-message-sender"
end
Expand Down

0 comments on commit 04f9d20

Please sign in to comment.