Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 20 additions & 23 deletions src/Connections.jl
Original file line number Diff line number Diff line change
Expand Up @@ -501,40 +501,37 @@ function getconnection(::Type{TCPSocket},
# alive in the face of heavy workloads where Julia's task scheduler might take a while to
# keep up with midflight requests
keepalive::Bool=true,
connect_timeout::Int=10,
readtimeout::Int=0,
kw...)::TCPSocket

p::UInt = isempty(port) ? UInt(80) : parse(UInt, port)
@debugv 2 "TCP connect: $host:$p..."
addrs = Sockets.getalladdrinfo(host)
connect_timeout = connect_timeout == 0 && readtimeout > 0 ? readtimeout : connect_timeout
lasterr = ErrorException("unknown connection error")
ch = Channel{TCPSocket}(1)
n = Ref(length(addrs))
for addr in addrs
try
if connect_timeout > 0
tcp = Sockets.TCPSocket()
Sockets.connect!(tcp, addr, p)
try
try_with_timeout(connect_timeout) do _
Sockets.wait_connected(tcp)
keepalive && keepalive!(tcp)
end
catch
close(tcp)
rethrow()
end
else
tcp = Sockets.connect(addr, p)
Threads.@spawn begin
try
isready(ch) && return
tcp = Sockets.connect($addr, p)
isready(ch) && return
keepalive && keepalive!(tcp)
Base.@lock ch begin
isready(ch) && return
put!(ch, tcp)
end
catch e
Base.@lock ch begin
# if we're the last task to fail, and assuming
# all other tasks also failed, then we close
# the channel w/ our exception so the fetch call throws and
# our error propagates
(n[] -= 1) == 0 && close(ch, e)
end
end
return tcp
catch e
lasterr = e isa ConcurrentUtilities.TimeoutException ? ConnectTimeout(host, port) : e
end
end
# If no connetion could be set up, to any address, throw last error
throw(lasterr)
return fetch(ch)
end

const nosslconfig = SSLConfig()
Expand Down
11 changes: 9 additions & 2 deletions src/clientlayers/ConnectionRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Close the connection if the request throws an exception.
Otherwise leave it open so that it can be reused.
"""
function connectionlayer(handler)
return function connections(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, logerrors::Bool=false, logtag=nothing, kw...)
return function connections(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, connect_timeout::Int=10, logerrors::Bool=false, logtag=nothing, kw...)
local io, stream
if proxy !== nothing
target_url = req.url
Expand All @@ -73,10 +73,17 @@ function connectionlayer(handler)
url = target_url = req.url
end

connect_timeout = connect_timeout == 0 && readtimeout > 0 ? readtimeout : connect_timeout
IOType = sockettype(url, socket_type, socket_type_tls)
start_time = time()
try
io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, kw...)
io = if connect_timeout > 0
try_with_timeout(connect_timeout) do _
newconnection(IOType, url.host, url.port; readtimeout=readtimeout, kw...)
end
else
newconnection(IOType, url.host, url.port; readtimeout=readtimeout, kw...)
end
catch e
if logerrors
err = current_exceptions_to_string()
Expand Down