Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change SocketCache to be shared between nodes #2501

Merged
merged 6 commits into from
Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 19 additions & 27 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,28 +203,23 @@ def configure(conf)
end
end

socket_cache = @keepalive ? SocketCache.new(@keepalive_timeout, @log) : nil
@connection_manager = ConnectionManager.new(
log: @log,
secure: !!@security,
connection_factory: method(:create_transfer_socket),
socket_cache: socket_cache,
)

@servers.each do |server|
failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f)
name = server.name || "#{server.host}:#{server.port}"

socket_cache =
if @keepalive
SocketCache.new(@keepalive_timeout, @log)
else
nil
end
connection_manager = ConnectionManager.new(
log: @log,
secure: !!@security,
connection_factory: method(:create_transfer_socket),
socket_cache: socket_cache,
)

log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id
if @heartbeat_type == :none
@nodes << NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: connection_manager)
@nodes << NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: @connection_manager)
else
node = Node.new(self, server, failure: failure, connection_manager: connection_manager)
node = Node.new(self, server, failure: failure, connection_manager: @connection_manager)
begin
node.validate_host_resolution!
rescue => e
Expand Down Expand Up @@ -315,12 +310,17 @@ def close
@usock.close rescue nil
end

if @keepalive && @keepalive_timeout
@nodes.each(&:clear)
end
super
end

def stop
super

if @keepalive
@connection_manager.stop
end
end

def write(chunk)
return if chunk.empty?
tag = chunk.metadata.tag
Expand Down Expand Up @@ -425,7 +425,7 @@ def on_heartbeat(sockaddr, msg)
end

def on_purge_obsolete_socks
@nodes.each(&:purge_obsolete_socks)
@connection_manager.purge_obsolete_socks
end

# return chunk id to be committed
Expand Down Expand Up @@ -670,14 +670,6 @@ def send_data(tag, chunk)
nil
end

def clear
@connection_manager.stop
end

def purge_obsolete_socks
@connection_manager.purge_obsolete_socks
end

def close(sock)
@connection_manager.close(sock)
end
Expand Down
12 changes: 8 additions & 4 deletions lib/fluent/plugin/out_forward/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class ForwardOutput < Output
class ConnectionManager
RequestInfo = Struct.new(:state, :shared_key_nonce, :auth)

# @param log [Logger]
# @param secure [Boolean]
# @param connection_factory [Proc]
# @param SocketCache [Fluent::ForwardOutput::SocketCache]
def initialize(log:, secure:, connection_factory:, socket_cache:)
@log = log
@secure = secure
Expand Down Expand Up @@ -64,7 +68,7 @@ def purge_obsolete_socks

def close(sock)
if @socket_cache
@socket_cache.dec_ref_by_value(sock)
@socket_cache.checkin(sock)
else
sock.close_write rescue nil
sock.close rescue nil
Expand All @@ -75,7 +79,7 @@ def close(sock)

def connect_keepalive(host:, port:, hostname:, require_ack:)
request_info = RequestInfo.new(:established)
socket = @socket_cache.fetch_or do
socket = @socket_cache.checkout_or([host, port, hostname]) do
s = @connection_factory.call(host, port, hostname)
request_info = RequestInfo.new(@secure ? :helo : :established) # overwrite if new connection
s
Expand All @@ -89,11 +93,11 @@ def connect_keepalive(host:, port:, hostname:, require_ack:)
begin
ret = yield(socket, request_info)
rescue
@socket_cache.revoke
@socket_cache.revoke(socket)
raise
else
unless require_ack
@socket_cache.dec_ref
@socket_cache.checkin(socket)
end
end

Expand Down
153 changes: 68 additions & 85 deletions lib/fluent/plugin/out_forward/socket_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,135 +19,118 @@
module Fluent::Plugin
class ForwardOutput < Output
class SocketCache
TimedSocket = Struct.new(:timeout, :sock, :ref)
TimedSocket = Struct.new(:timeout, :key, :sock)

def initialize(timeout, log)
@log = log
@timeout = timeout
@active_socks = {}
@inactive_socks = {}
@available_sockets = Hash.new { |obj, k| obj[k] = [] }
@inflight_sockets = {}
@inactive_sockets = []
@mutex = Mutex.new
end

def revoke(key = Thread.current.object_id)
def checkout_or(key)
@mutex.synchronize do
if @active_socks[key]
@inactive_socks[key] = @active_socks.delete(key)
@inactive_socks[key].ref = 0
tsock = pick_socket(key)

unless tsock
val = yield
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sock or new_sock is better than val

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed 14985b3

new_tsock = TimedSocket.new(timeout, key, val)
@log.debug("connect new socket #{new_tsock}")

@inflight_sockets[val] = new_tsock
return new_tsock.sock
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce return by if

if tsock
  tsock.sock
else
  # same
end

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed 5dd10e6

end
tsock.sock
end
end

def clear
def checkin(sock)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... checkin is unclear for me. After checkin, we are inflight for airplane but this method changes inflight to available. checkout too. Maybe, we don't use checkout for flight.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I referred connection_pool gem to determine the name of them.
https://github.com/mperham/connection_pool/blob/99b81aa0a9dccb3b423260740ce2d603f1479356/lib/connection_pool.rb#L80

Any name is okay for me if it's clear for readers. Do you have any suggestions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.... I see. If this naming is popular in ruby, it is okay for me.

@mutex.synchronize do
@inactive_socks.values.each do |s|
s.sock.close rescue nil
if (s = @inflight_sockets.delete(sock))
@available_sockets[s.key] << s
else
@log.debug("there is no socket #{sock}")
end
@inactive_socks.clear
end
end

@active_socks.values.each do |s|
s.sock.close rescue nil
def revoke(sock)
@mutex.synchronize do
if (s = @inflight_sockets.delete(sock))
@inactive_sockets << s
else
@log.debug("there is no socket #{sock}")
end
@active_socks.clear
end
end

def purge_obsolete_socks
sockets = []

@mutex.synchronize do
@inactive_socks.keys.each do |k|
# 0 means sockets stored in this class received all acks
if @inactive_socks[k].ref <= 0
s = @inactive_socks.delete(k)
s.sock.close rescue nil
@log.debug("purged obsolete socket #{s.sock}")
# don't touch @inflight_sockets

@available_sockets.each do |_, socks|
socks.each do |sock|
if expired_socket?(sock)
sockets << sock
socks.delete(sock)
end
end
end
@available_sockets = @available_sockets.select { |_, v| !v.empty? }

@active_socks.keys.each do |k|
if expired?(k) && @active_socks[k].ref <= 0
@inactive_socks[k] = @active_socks.delete(k)
end
end
sockets += @inactive_sockets
@inactive_sockets.clear
end

while (s = sockets.pop)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each is more lightweight due to no sockets update.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right……… fixed eda267c

s.sock.close rescue nil
end
end

# We expect that `yield` returns a unique object in this class
def fetch_or(key = Thread.current.object_id)
def clear
sockets = []
@mutex.synchronize do
unless @active_socks[key]
@active_socks[key] = TimedSocket.new(timeout, yield, 1)
@log.debug("connect new socket #{@active_socks[key]}")
return @active_socks[key].sock
end

if expired?(key)
# Do not close this socket here in case of it will be used by other place (e.g. wait for receiving ack)
@inactive_socks[key] = @active_socks.delete(key)
@log.debug("connection #{@inactive_socks[key]} is expired. reconnecting...")
@active_socks[key] = TimedSocket.new(timeout, yield, 0)
end
sockets += @available_sockets.values.flat_map { |v| v }
sockets += @inflight_sockets.values
sockets += @inactive_sockets

@active_socks[key].ref += 1
@active_socks[key].sock
@available_sockets.clear
@inflight_sockets.clear
@inactive_sockets.clear
end
end

def dec_ref(key = Thread.current.object_id)
@mutex.synchronize do
if @active_socks[key]
@active_socks[key].ref -= 1
elsif @inactive_socks[key]
@inactive_socks[key].ref -= 1
else
@log.warn("Not found key for dec_ref: #{key}")
end
while (s = sockets.pop)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

s.sock.close rescue nil
end
end

# This method is expected to be called in class which doesn't call #inc_ref
def dec_ref_by_value(val)
@mutex.synchronize do
sock = @active_socks.detect { |_, v| v.sock == val }
if sock
key = sock.first
@active_socks[key].ref -= 1
return
end
private

sock = @inactive_socks.detect { |_, v| v.sock == val }
if sock
key = sock.first
@inactive_socks[key].ref -= 1
return
else
@log.warn("Not found key for dec_ref_by_value: #{key}")
end
# this method is not thread safe
def pick_socket(key)
if @available_sockets[key].empty?
return nil
end
end

# This method is expected to be called in class which doesn't call #fetch_or
def revoke_by_value(val)
@mutex.synchronize do
sock = @active_socks.detect { |_, v| v.sock == val }
if sock
key = sock.first
@inactive_socks[key] = @active_socks.delete(key)
@inactive_socks[key].ref = 0
else
@log.debug("Not found for revoke_by_value :#{val}")
end
t = Time.now
if (s = @available_sockets[key].find { |sock| !expired_socket?(sock, time: t) })
@inflight_sockets[s.sock] = @available_sockets[key].delete(s)
s
else
nil
end
end

private

def timeout
@timeout && Time.now + @timeout
end

# This method is thread unsafe
def expired?(key = Thread.current.object_id)
@active_socks[key].timeout ? @active_socks[key].timeout < Time.now : false
def expired_socket?(sock, time: Time.now)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is private so normal argument seems enough for time. Not important comment.

sock.timeout ? sock.timeout < time : false
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions test/plugin/out_forward/test_connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ConnectionManager < Test::Unit::TestCase
sub_test_case 'when socket_cache exists' do
test 'calls connect_keepalive' do
cache = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
mock(cache).dec_ref.never
mock(cache).checkin('sock').never

cm = Fluent::Plugin::ForwardOutput::ConnectionManager.new(
log: $log,
Expand All @@ -100,7 +100,7 @@ class ConnectionManager < Test::Unit::TestCase

test 'calls connect_keepalive and closes socket with block' do
cache = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
mock(cache).dec_ref.once
mock(cache).checkin('sock').once

cm = Fluent::Plugin::ForwardOutput::ConnectionManager.new(
log: $log,
Expand All @@ -119,7 +119,7 @@ class ConnectionManager < Test::Unit::TestCase

test 'does not call dec_ref when require_ack is true' do
cache = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
mock(cache).dec_ref.never
mock(cache).checkin('sock').never

cm = Fluent::Plugin::ForwardOutput::ConnectionManager.new(
log: $log,
Expand Down