Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul websockets code #843

Merged
merged 14 commits into from Jun 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/ci.yml
Expand Up @@ -36,6 +36,15 @@ jobs:
version: 'nightly'
steps:
- uses: actions/checkout@v2
- name: setup python
if: ${{ matrix.arch == 'x64' && matrix.os != 'windows-latest'}}
uses: actions/setup-python@v4
with:
python-version: "2.x"
architecture: ${{ matrix.arch }}
- name: setup autobahn wstest
if: ${{ matrix.arch == 'x64' && matrix.os != 'windows-latest'}}
run: python -m pip install --upgrade pip autobahntestsuite
- uses: julia-actions/setup-julia@v1
with:
version: ${{ matrix.version }}
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Expand Up @@ -10,4 +10,5 @@
/docs/Manifest.toml
/Manifest.toml
/test/coverage/Manifest.toml
.idea/*
/test/websockets/reports/*
.idea/*
1 change: 1 addition & 0 deletions Project.toml
Expand Up @@ -14,6 +14,7 @@ MbedTLS = "739be429-bea8-5141-9913-cc70e7f3736d"
NetworkOptions = "ca575930-c2e3-43a9-ace4-1e988b2c1908"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
URIs = "5c2747f8-b7ea-4ff2-ba2e-563bfd36b1d4"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"

[compat]
IniFile = "0.5"
Expand Down
2 changes: 1 addition & 1 deletion src/HTTP.jl
@@ -1,7 +1,7 @@
module HTTP

export startwrite, startread, closewrite, closeread,
@logfmt_str, common_logfmt, combined_logfmt
@logfmt_str, common_logfmt, combined_logfmt, WebSockets

const DEBUG_LEVEL = Ref(0)

Expand Down
33 changes: 21 additions & 12 deletions src/Servers.jl
Expand Up @@ -232,10 +232,11 @@ function listen(f,
rate_limit::Union{Rational{Int}, Nothing}=nothing,
reuse_limit::Int=nolimit,
readtimeout::Int=0,
verbose::Bool=false,
verbose=false,
ready_to_accept::Ref{Bool}=Ref(false),
access_log::Union{Function,Nothing}=nothing,
on_shutdown::Union{Function, Vector{<:Function}, Nothing}=nothing)

ready_to_accept[] = false
inet = getinet(host, port)
if server !== nothing
tcpserver = server
Expand All @@ -244,7 +245,7 @@ function listen(f,
tcpserver = Sockets.TCPServer(; delay=false)
if Sys.isunix()
if Sys.isapple()
verbose && @warn "note that `reuseaddr=true` allows multiple processes to bind to the same addr/port, but only one process will accept new connections (if that process exits, another process listening will start accepting)"
verbose > 0 && @warn "note that `reuseaddr=true` allows multiple processes to bind to the same addr/port, but only one process will accept new connections (if that process exits, another process listening will start accepting)"
end
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Cvoid},), tcpserver.handle)
Sockets.bind(tcpserver, inet.host, inet.port; reuseaddr=true)
Expand All @@ -256,48 +257,56 @@ function listen(f,
else
tcpserver = Sockets.listen(inet)
end
verbose && @info "Listening on: $host:$port"
verbose > 0 && @info "Listening on: $host:$port"

tcpisvalid = let f=tcpisvalid
x -> f(x) && check_rate_limit(x, rate_limit)
end

s = Server(sslconfig, tcpserver, string(host), string(port), on_shutdown, access_log)
return listenloop(f, s, tcpisvalid, connection_count, max_connections,
reuse_limit, readtimeout, verbose)
if verbose > 0
LoggingExtras.withlevel(Logging.Debug; verbosity=verbose) do
listenloop(f, s, tcpisvalid, connection_count, max_connections,
reuse_limit, readtimeout, ready_to_accept, verbose)
end
else
return listenloop(f, s, tcpisvalid, connection_count, max_connections,
reuse_limit, readtimeout, ready_to_accept, verbose)
end
end

""""
Main server loop.
Accepts new tcp connections and spawns async tasks to handle them."
"""
function listenloop(f, server, tcpisvalid, connection_count,
max_connections, reuse_limit, readtimeout, verbose)
max_connections, reuse_limit, readtimeout, ready_to_accept, verbose)
sem = Base.Semaphore(max_connections)
count = 1
ready_to_accept[] = true
while isopen(server)
try
Base.acquire(sem)
io = accept(server)
if io === nothing
verbose && @warn "unable to accept new connection"
verbose > 0 && @warn "unable to accept new connection"
continue
elseif !tcpisvalid(io)
verbose && @info "Accept-Reject: $io"
verbose > 0 && @info "Accept-Reject: $io"
close(io)
continue
end
connection_count[] += 1
conn = Connection(io)
conn.host, conn.port = server.hostname, server.hostport
@async try
# verbose && @info "Accept ($count): $conn"
# verbose > 0 && @info "Accept ($count): $conn"
handle_connection(f, conn, server, reuse_limit, readtimeout)
# verbose && @info "Closed ($count): $conn"
# verbose > 0 && @info "Closed ($count): $conn"
catch e
if e isa Base.IOError &&
(e.code == -54 || e.code == -4077 || e.code == -104 || e.code == -131 || e.code == -232)
verbose && @warn "connection reset by peer (ECONNRESET)"
verbose > 0 && @warn "connection reset by peer (ECONNRESET)"
else
@error "" exception=(e, stacktrace(catch_backtrace()))
end
Expand Down