Skip to content

Commit

Permalink
Change socket shareable between threads
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
  • Loading branch information
ganmacs committed Jul 16, 2019
1 parent 1a2d3c6 commit c684e2e
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 175 deletions.
7 changes: 1 addition & 6 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,11 @@ def configure(conf)
end
end

socket_cache = @keepalive ? SocketCache.new(@keepalive_timeout, @log) : nil
@servers.each do |server|
failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f)
name = server.name || "#{server.host}:#{server.port}"

socket_cache =
if @keepalive
SocketCache.new(@keepalive_timeout, @log)
else
nil
end
connection_manager = ConnectionManager.new(
log: @log,
secure: !!@security,
Expand Down
8 changes: 4 additions & 4 deletions lib/fluent/plugin/out_forward/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def purge_obsolete_socks

def close(sock)
if @socket_cache
@socket_cache.dec_ref_by_value(sock)
@socket_cache.checkin(sock)
else
sock.close_write rescue nil
sock.close rescue nil
Expand All @@ -75,7 +75,7 @@ def close(sock)

def connect_keepalive(host:, port:, hostname:, require_ack:)
request_info = RequestInfo.new(:established)
socket = @socket_cache.fetch_or do
socket = @socket_cache.checkout_or([host, port, hostname]) do
s = @connection_factory.call(host, port, hostname)
request_info = RequestInfo.new(@secure ? :helo : :established) # overwrite if new connection
s
Expand All @@ -89,11 +89,11 @@ def connect_keepalive(host:, port:, hostname:, require_ack:)
begin
ret = yield(socket, request_info)
rescue
@socket_cache.revoke
@socket_cache.revoke(socket)
raise
else
unless require_ack
@socket_cache.dec_ref
@socket_cache.checkin(socket)
end
end

Expand Down
153 changes: 68 additions & 85 deletions lib/fluent/plugin/out_forward/socket_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,135 +19,118 @@
module Fluent::Plugin
class ForwardOutput < Output
class SocketCache
TimedSocket = Struct.new(:timeout, :sock, :ref)
TimedSocket = Struct.new(:timeout, :key, :sock)

def initialize(timeout, log)
@log = log
@timeout = timeout
@active_socks = {}
@inactive_socks = {}
@available_sockets = Hash.new { |obj, k| obj[k] = [] }
@inflight_sockets = {}
@inactive_sockets = []
@mutex = Mutex.new
end

def revoke(key = Thread.current.object_id)
def checkout_or(key)
@mutex.synchronize do
if @active_socks[key]
@inactive_socks[key] = @active_socks.delete(key)
@inactive_socks[key].ref = 0
tsock = pick_socket(key)

unless tsock
val = yield
new_tsock = TimedSocket.new(timeout, key, val)
@log.debug("connect new socket #{new_tsock}")

@inflight_sockets[val] = new_tsock
return new_tsock.sock
end
tsock.sock
end
end

def clear
def checkin(sock)
@mutex.synchronize do
@inactive_socks.values.each do |s|
s.sock.close rescue nil
if (s = @inflight_sockets.delete(sock))
@available_sockets[s.key] << s
else
@log.debug("there is no socket #{sock}")
end
@inactive_socks.clear
end
end

@active_socks.values.each do |s|
s.sock.close rescue nil
def revoke(sock)
@mutex.synchronize do
if (s = @inflight_sockets.delete(sock))
@inactive_sockets << s
else
@log.debug("there is no socket #{sock}")
end
@active_socks.clear
end
end

def purge_obsolete_socks
sockets = []

@mutex.synchronize do
@inactive_socks.keys.each do |k|
# 0 means sockets stored in this class received all acks
if @inactive_socks[k].ref <= 0
s = @inactive_socks.delete(k)
s.sock.close rescue nil
@log.debug("purged obsolete socket #{s.sock}")
# don't touch @inflight_sockets

@available_sockets.each do |_, socks|
socks.each do |sock|
if expired_socket?(sock)
sockets << sock
socks.delete(sock)
end
end
end
@available_sockets = @available_sockets.select { |_, v| !v.empty? }

@active_socks.keys.each do |k|
if expired?(k) && @active_socks[k].ref <= 0
@inactive_socks[k] = @active_socks.delete(k)
end
end
sockets += @inactive_sockets
@inactive_sockets.clear
end

while (s = sockets.pop)
s.sock.close rescue nil
end
end

# We expect that `yield` returns a unique object in this class
def fetch_or(key = Thread.current.object_id)
def clear
sockets = []
@mutex.synchronize do
unless @active_socks[key]
@active_socks[key] = TimedSocket.new(timeout, yield, 1)
@log.debug("connect new socket #{@active_socks[key]}")
return @active_socks[key].sock
end

if expired?(key)
# Do not close this socket here in case of it will be used by other place (e.g. wait for receiving ack)
@inactive_socks[key] = @active_socks.delete(key)
@log.debug("connection #{@inactive_socks[key]} is expired. reconnecting...")
@active_socks[key] = TimedSocket.new(timeout, yield, 0)
end
sockets += @available_sockets.values.flat_map { |v| v }
sockets += @inflight_sockets.values
sockets += @inactive_sockets

@active_socks[key].ref += 1
@active_socks[key].sock
@available_sockets.clear
@inflight_sockets.clear
@inactive_sockets.clear
end
end

def dec_ref(key = Thread.current.object_id)
@mutex.synchronize do
if @active_socks[key]
@active_socks[key].ref -= 1
elsif @inactive_socks[key]
@inactive_socks[key].ref -= 1
else
@log.warn("Not found key for dec_ref: #{key}")
end
while (s = sockets.pop)
s.sock.close rescue nil
end
end

# This method is expected to be called in class which doesn't call #inc_ref
def dec_ref_by_value(val)
@mutex.synchronize do
sock = @active_socks.detect { |_, v| v.sock == val }
if sock
key = sock.first
@active_socks[key].ref -= 1
return
end
private

sock = @inactive_socks.detect { |_, v| v.sock == val }
if sock
key = sock.first
@inactive_socks[key].ref -= 1
return
else
@log.warn("Not found key for dec_ref_by_value: #{key}")
end
# this method is not thread safe
def pick_socket(key)
if @available_sockets[key].empty?
return nil
end
end

# This method is expected to be called in class which doesn't call #fetch_or
def revoke_by_value(val)
@mutex.synchronize do
sock = @active_socks.detect { |_, v| v.sock == val }
if sock
key = sock.first
@inactive_socks[key] = @active_socks.delete(key)
@inactive_socks[key].ref = 0
else
@log.debug("Not found for revoke_by_value :#{val}")
end
t = Time.now
if (s = @available_sockets[key].find { |sock| !expired_socket?(sock, time: t) })
@inflight_sockets[s.sock] = @available_sockets[key].delete(s)
s
else
nil
end
end

private

def timeout
@timeout && Time.now + @timeout
end

# This method is thread unsafe
def expired?(key = Thread.current.object_id)
@active_socks[key].timeout ? @active_socks[key].timeout < Time.now : false
def expired_socket?(sock, time: Time.now)
sock.timeout ? sock.timeout < time : false
end
end
end
Expand Down

0 comments on commit c684e2e

Please sign in to comment.