Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets: implement reading fragmented message [WIP] #638

Merged
merged 2 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions src/WebSockets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,18 @@ Base.isopen(ws::WebSocket) = !ws.rxclosed

Base.eof(ws::WebSocket) = ws.rxclosed || eof(ws.io)

Base.readavailable(ws::WebSocket) = collect(readframe(ws))
Base.readavailable(ws::WebSocket) = readmessage(ws)

function readmessage(ws::WebSocket)
payload, header = _readframe(ws)
bytes = collect(payload)
while !(header.final)
payload, header = _readframe(ws)
@assert header.opcode == WS_CONTINUATION
append!(bytes, payload)
end
return bytes
end

function readheader(io::IO)
b = UInt8[0,0]
Expand All @@ -265,40 +276,45 @@ function readheader(io::IO)
b[2] & WS_MASK > 0 ? read(io, UInt32) : UInt32(0))
end

function readframe(ws::WebSocket)
readframe(ws::WebSocket) = first(_readframe(ws))

function _readframe(ws::WebSocket)
h = readheader(ws.io)
@debug 1 "WebSocket ➡️ $h"

if h.length > 0
if length(ws.rxpayload) < h.length
resize!(ws.rxpayload, h.length)
len = Int(h.length)

if len > 0
if length(ws.rxpayload) < len
resize!(ws.rxpayload, len)
end
unsafe_read(ws.io, pointer(ws.rxpayload), h.length)
@debug 2 " ➡️ \"$(String(ws.rxpayload[1:h.length]))\""
unsafe_read(ws.io, pointer(ws.rxpayload), len)
@debug 2 " ➡️ \"$(String(ws.rxpayload[1:len]))\""
end
l = Int(h.length)

if h.hasmask
mask!(ws.rxpayload, ws.rxpayload, l, reinterpret(UInt8, [h.mask]))
mask!(ws.rxpayload, ws.rxpayload, len, reinterpret(UInt8, [h.mask]))
end

if h.opcode == WS_CLOSE
ws.rxclosed = true
if l >= 2
if len >= 2
status = UInt16(ws.rxpayload[1]) << 8 | ws.rxpayload[2]
if status != 1000
message = String(ws.rxpayload[3:l])
message = String(ws.rxpayload[3:len])
status_descr = get(STATUS_CODE_DESCRIPTION, Int(status), "")
msg = "Status: $(status_descr), Internal Code: $(message)"
throw(WebSocketError(status, msg))
end
end
return UInt8[]
return view(ws.rxpayload, 1:0), h
elseif h.opcode == WS_PING
wswrite(ws, WS_FINAL | WS_PONG, ws.rxpayload[1:len])
return _readframe(ws)
elseif h.opcode == WS_PONG
return _readframe(ws)
else
if h.opcode == WS_PING
wswrite(ws, WS_FINAL | WS_PONG, ws.rxpayload[1:l])
return readframe(ws)
end
return view(ws.rxpayload, 1:l)
return view(ws.rxpayload, 1:len), h
end
end

Expand Down
73 changes: 45 additions & 28 deletions test/websockets.jl
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
using Test
using HTTP
using HTTP.IOExtras, HTTP.Sockets
using HTTP.IOExtras, HTTP.Sockets, HTTP.WebSockets
using Sockets

@testset "websockets.jl" begin
@testset "WebSockets" begin
p = 8085 # rand(8000:8999)
socket_type = ["wss", "ws"]

function listen_localhost()
@async HTTP.listen(Sockets.localhost, p) do http
if HTTP.WebSockets.is_upgrade(http.message)
HTTP.WebSockets.upgrade(http) do ws
if WebSockets.is_upgrade(http.message)
WebSockets.upgrade(http) do ws
while !eof(ws)
data = readavailable(ws)
write(ws, data)
Expand All @@ -21,51 +21,68 @@ using Sockets
end

@testset "External Host - $s" for s in socket_type
HTTP.WebSockets.open("$s://echo.websocket.org") do io
write(io, "Foo")
@test !eof(io)
@test String(readavailable(io)) == "Foo"
WebSockets.open("$s://echo.websocket.org") do ws
write(ws, "Foo")
@test !eof(ws)
@test String(readavailable(ws)) == "Foo"

write(ws, "Foo"," Bar")
@test !eof(ws)
@test String(readavailable(ws)) == "Foo Bar"

write(io, "Hello")
write(io, " There")
write(io, " World", "!")
closewrite(io)
# send fragmented message manually with ping in between frames
WebSockets.wswrite(ws, ws.frame_type, "Hello ")
WebSockets.wswrite(ws, WebSockets.WS_FINAL | WebSockets.WS_PING, "things")
WebSockets.wswrite(ws, WebSockets.WS_FINAL, "again!")
@test String(readavailable(ws)) == "Hello again!"

write(ws, "Hello")
write(ws, " There")
write(ws, " World", "!")
closewrite(ws)

buf = IOBuffer()
write(buf, io)
write(buf, ws)
@test String(take!(buf)) == "Hello There World!"
end
end

@testset "Localhost" begin
listen_localhost()
listen_localhost()

HTTP.WebSockets.open("ws://127.0.0.1:$(p)") do ws
WebSockets.open("ws://127.0.0.1:$(p)") do ws
write(ws, "Foo")
@test String(readavailable(ws)) == "Foo"

write(ws, "Bar")
@test String(readavailable(ws)) == "Bar"

write(ws, "This", " is", " a", " fragmented", " message.")
@test String(readavailable(ws)) == "This is a fragmented message."

# send fragmented message manually with ping in between frames
WebSockets.wswrite(ws, ws.frame_type, "Ping ")
WebSockets.wswrite(ws, WebSockets.WS_FINAL | WebSockets.WS_PING, "stuff")
WebSockets.wswrite(ws, WebSockets.WS_FINAL, "pong!")
@test String(readavailable(ws)) == "Ping pong!"
end
end

@testset "extened feautre support for listen" begin
@testset "Extended feature support for listen" begin
port=UInt16(8086)
tcpserver = listen(port)
target = "/query?k1=v1&k2=v2"

servertask = @async HTTP.WebSockets.listen("127.0.0.1", port; server=tcpserver) do ws
@testset "request access" begin
@test ws.request isa HTTP.Request
write(ws, ws.request.target)
while !eof(ws)
write(ws, readavailable(ws))
end
close(ws)

servertask = @async WebSockets.listen("127.0.0.1", port; server=tcpserver) do ws
@test ws.request isa HTTP.Request
write(ws, ws.request.target)
while !eof(ws)
write(ws, readavailable(ws))
end
close(ws)
end

HTTP.WebSockets.open("ws://127.0.0.1:$(port)$(target)") do ws
WebSockets.open("ws://127.0.0.1:$(port)$(target)") do ws
@test String(readavailable(ws)) == target
@test write(ws, "Bye!") == 4
@test String(readavailable(ws)) == "Bye!"
Expand All @@ -74,6 +91,6 @@ using Sockets

close(tcpserver)
@test timedwait(()->servertask.state === :failed, 5.0) === :ok
@test_throws Exception wait(servertask)
@test_throws Exception wait(servertask)
end
end
end