Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

send blank message as heartbeat over TCP instead of UDP, when configured... #74

Merged
merged 4 commits into from

3 participants

@tagomoris
Owner

To solve issue#73, send TCP blank msgpack tag/event-array as heartbeat

lib/fluent/plugin/out_forward.rb
@@ -29,6 +29,15 @@ def initialize
end
config_param :send_timeout, :time, :default => 60
+ config_param :heartbeat, :default => :udp do |val|
+ if val == 'tcp' or val == 'TCP'
@repeatedly Owner

What do you think following code?

case val.downcase
when 'tcp'
  :tcp
...
@tagomoris Owner

I see, 'val.downcase' is more simple.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/fluent/plugin/out_forward.rb
@@ -29,6 +29,15 @@ def initialize
end
config_param :send_timeout, :time, :default => 60
+ config_param :heartbeat, :default => :udp do |val|
+ if val == 'tcp' or val == 'TCP'
+ :tcp
+ elsif val == 'udp' or val == 'UDP'
+ :udp
+ else
+ rasie ConfigError, "forward output heartbeat type is 'tcp' or 'udp'"
@repeatedly Owner

typo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/fluent/plugin/out_forward.rb
@@ -193,6 +204,21 @@ def rebuild_weight_array
# MessagePack FixArray length = 2
FORWARD_HEADER = [0x92].pack('C')
+ FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + MessagePack.pack([])
@repeatedly Owner

Why do you use two pack methods, to_msgpack and MessagePack.pack?

@tagomoris Owner

Maybe, these are result of copy&paste from other lines..... I'll fix it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@tagomoris
Owner

Modify branch as:

  • rebase from HEAD (IPv6 support)
  • fix in_forward not to emit blank tag-and-message (assumed as tcp heartbeat)
@repeatedly
Owner

How's your fluentd cluster going?

@tagomoris
Owner

Now (this 5 hours), all of our fluentd (124) processes with 'heartbeat tcp' option, seems to be working well...

@frsyuki frsyuki referenced this pull request
Closed

tcp heartbeat #91

@frsyuki
Owner

This change has a backward compatibility problem...
Could you see this change?: #91

@repeatedly
Owner

Okay. I merge this pull request soon.

@repeatedly repeatedly merged commit 68fe451 into fluent:master
@tagomoris
Owner

:+1:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
7 lib/fluent/plugin/in_forward.rb
@@ -113,6 +113,13 @@ def on_message(msg)
elsif entries.class == Array
# Forward
+
+ if tag == '' && entries.length == 0
+ # "{tag:'',entries:[]}" is TCP heartbeat message: ignored
+ $log.trace "TCP heartbeat message received"
+ return
+ end
+
es = MultiEventStream.new
entries.each {|e|
time = e[0].to_i
View
47 lib/fluent/plugin/out_forward.rb
@@ -30,6 +30,16 @@ def initialize
end
config_param :send_timeout, :time, :default => 60
+ config_param :heartbeat, :default => :udp do |val|
+ case val.downcase
+ when 'tcp'
+ :tcp
+ when 'udp'
+ :udp
+ else
+ raise ConfigError, "forward output heartbeat type is 'tcp' or 'udp'"
+ end
+ end
config_param :heartbeat_interval, :time, :default => 1
config_param :recover_wait, :time, :default => 10
config_param :hard_timeout, :time, :default => 60
@@ -89,11 +99,12 @@ def start
@loop = Coolio::Loop.new
- # Assume all hosts are same protocol.
- @usock = SocketUtil.create_udp_socket(@nodes.first.host)
- @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat))
- @loop.attach(@hb)
-
+ if @heartbeat == :udp
+ # Assume all hosts are same protocol.
+ @usock = SocketUtil.create_udp_socket(@nodes.first.host)
+ @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat))
+ @loop.attach(@hb)
+ end
@timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer))
@loop.attach(@timer)
@@ -105,7 +116,7 @@ def shutdown
@loop.watchers.each {|w| w.detach }
@loop.stop
@thread.join
- @usock.close
+ @usock.close if @usock
end
def run
@@ -196,6 +207,21 @@ def rebuild_weight_array
# MessagePack FixArray length = 2
FORWARD_HEADER = [0x92].pack('C')
+ FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack
+ def send_heartbeat_tcp(node)
+ sock = connect(node)
+ begin
+ opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; }
+ sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
+ opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval
+ sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)
+ sock.write FORWARD_TCP_HEARTBEAT_DATA
+ node.heartbeat(true)
+ ensure
+ sock.close
+ end
+ end
+
def send_data(node, tag, es)
sock = connect(node)
begin
@@ -258,8 +284,12 @@ def on_timer
rebuild_weight_array
end
begin
- #$log.trace "sending heartbeat #{n.host}:#{n.port}"
- @usock.send "\0", 0, Socket.pack_sockaddr_in(n.port, n.resolved_host)
+ #$log.trace "sending heartbeat #{n.host}:#{n.port} on #{@heartbeat}"
+ if @heartbeat == :tcp
+ send_heartbeat_tcp(n)
+ else
+ @usock.send "\0", 0, Socket.pack_sockaddr_in(n.port, n.resolved_host)
+ end
rescue
# TODO log
$log.debug "failed to send heartbeat packet to #{n.host}:#{n.port}", :error=>$!.to_s
@@ -496,4 +526,3 @@ def clear
end
-
Something went wrong with that request. Please try again.