Skip to content

Commit

Permalink
calculate bytes when write to socket
Browse files Browse the repository at this point in the history
  • Loading branch information
Fivell committed Sep 26, 2017
1 parent 3679470 commit 1e9007b
Showing 1 changed file with 35 additions and 24 deletions.
59 changes: 35 additions & 24 deletions lib/net/tcp_client/tcp_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ class TCPClient
include SemanticLogger::Loggable if defined?(SemanticLogger::Loggable)

attr_accessor :connect_timeout, :read_timeout, :write_timeout,
:connect_retry_count, :connect_retry_interval, :retry_count,
:policy, :close_on_error, :buffered, :ssl, :buffered,
:proxy_server
:connect_retry_count, :connect_retry_interval, :retry_count,
:policy, :close_on_error, :buffered, :ssl, :buffered,
:proxy_server
attr_reader :servers, :address, :socket, :ssl_handshake_timeout

# Supports embedding user supplied data along with this connection
Expand Down Expand Up @@ -243,22 +243,22 @@ def self.connect(params={})
# }
# )
def initialize(parameters={})
params = parameters.dup
@read_timeout = (params.delete(:read_timeout) || 60.0).to_f
@write_timeout = (params.delete(:write_timeout) || 60.0).to_f
@connect_timeout = (params.delete(:connect_timeout) || 10).to_f
buffered = params.delete(:buffered)
@buffered = buffered.nil? ? true : buffered
@connect_retry_count = params.delete(:connect_retry_count) || 10
@retry_count = params.delete(:retry_count) || 3
params = parameters.dup
@read_timeout = (params.delete(:read_timeout) || 60.0).to_f
@write_timeout = (params.delete(:write_timeout) || 60.0).to_f
@connect_timeout = (params.delete(:connect_timeout) || 10).to_f
buffered = params.delete(:buffered)
@buffered = buffered.nil? ? true : buffered
@connect_retry_count = params.delete(:connect_retry_count) || 10
@retry_count = params.delete(:retry_count) || 3
@connect_retry_interval = (params.delete(:connect_retry_interval) || 0.5).to_f
@on_connect = params.delete(:on_connect)
@proxy_server = params.delete(:proxy_server)
@policy = params.delete(:policy) || :ordered
@close_on_error = params.delete(:close_on_error)
@close_on_error = true if @close_on_error.nil?
@on_connect = params.delete(:on_connect)
@proxy_server = params.delete(:proxy_server)
@policy = params.delete(:policy) || :ordered
@close_on_error = params.delete(:close_on_error)
@close_on_error = true if @close_on_error.nil?
if @ssl = params.delete(:ssl)
@ssl = {} if @ssl == true
@ssl = {} if @ssl == true
@ssl_handshake_timeout = (@ssl.delete(:handshake_timeout) || @connect_timeout).to_f
end

Expand Down Expand Up @@ -305,7 +305,7 @@ def initialize(parameters={})
# and create a new connection
def connect
start_time = Time.now
retries = 0
retries = 0
close

# Number of times to try
Expand Down Expand Up @@ -352,7 +352,7 @@ def connect
def write(data, timeout = write_timeout)
data = data.to_s
if respond_to?(:logger)
payload = {timeout: timeout}
payload = { timeout: timeout }
# With trace level also log the sent data
payload[:data] = data if logger.trace?
logger.benchmark_debug('#write', payload: payload) do
Expand Down Expand Up @@ -404,9 +404,9 @@ def write(data, timeout = write_timeout)
# a new connection
def read(length, buffer = nil, timeout = read_timeout)
if respond_to?(:logger)
payload = {bytes: length, timeout: timeout}
payload = { bytes: length, timeout: timeout }
logger.benchmark_debug('#read', payload: payload) do
data = socket_read(length, buffer, timeout)
data = socket_read(length, buffer, timeout)
# With trace level also log the received data
payload[:data] = data if logger.trace?
data
Expand Down Expand Up @@ -487,7 +487,7 @@ def retry_on_connection_failure
# Logs a warning if an error occurs trying to close the socket
def close
socket.close if socket && !socket.closed?
@socket = nil
@socket = nil
@address = nil
true
rescue IOError => exception
Expand Down Expand Up @@ -612,8 +612,19 @@ def socket_write(data, timeout)
socket.write(data)
else
deadline = Time.now.utc + timeout
length = data.bytesize
total_count = 0
non_blocking(socket, deadline) do
socket.write_nonblock(data)
loop do
begin
count = socket.write_nonblock(data)
rescue Errno::EWOULDBLOCK
retry
end
total_count += count
return total_count if total_count >= length
data = data.byteslice(count..-1)
end
end
end
rescue NonBlockingTimeout
Expand Down Expand Up @@ -681,7 +692,7 @@ def ssl_connect(socket, address, timeout)
ssl_context = OpenSSL::SSL::SSLContext.new
ssl_context.set_params(ssl.is_a?(Hash) ? ssl : {})

ssl_socket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context)
ssl_socket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context)
ssl_socket.sync_close = true

begin
Expand Down

0 comments on commit 1e9007b

Please sign in to comment.