/
web_socket.cr
193 lines (171 loc) · 6.09 KB
/
web_socket.cr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
require "./client"
require "./headers"
# NOTE: To use `WebSocket`, you must explicitly import it with `require "http/web_socket"`
class HTTP::WebSocket
getter? closed = false
# :nodoc:
def initialize(io : IO, sync_close = true)
initialize(Protocol.new(io, sync_close: sync_close))
end
# :nodoc:
def initialize(@ws : Protocol)
@buffer = Bytes.new(4096)
@current_message = IO::Memory.new
end
# Opens a new websocket using the information provided by the URI. This will also handle the handshake
# and will raise an exception if the handshake did not complete successfully. This method will also raise
# an exception if the URI is missing the host and/or the path.
#
# Please note that the scheme will only be used to identify if TLS should be used or not. Therefore, schemes
# apart from `wss` and `https` will be treated as the default which is `ws`.
#
# ```
# require "http/web_socket"
#
# HTTP::WebSocket.new(URI.parse("ws://websocket.example.com/chat")) # Creates a new WebSocket to `websocket.example.com`
# HTTP::WebSocket.new(URI.parse("wss://websocket.example.com/chat")) # Creates a new WebSocket with TLS to `websocket.example.com`
# HTTP::WebSocket.new(URI.parse("http://websocket.example.com:8080/chat")) # Creates a new WebSocket to `websocket.example.com` on port `8080`
# HTTP::WebSocket.new(URI.parse("ws://websocket.example.com/chat"), # Creates a new WebSocket to `websocket.example.com` with an Authorization header
# HTTP::Headers{"Authorization" => "Bearer authtoken"})
# HTTP::WebSocket.new(
# URI.parse("ws://user:password@websocket.example.com/chat")) # Creates a new WebSocket to `websocket.example.com` with an HTTP basic auth Authorization header
# ```
def self.new(uri : URI | String, headers = HTTP::Headers.new)
new(Protocol.new(uri, headers: headers))
end
# Opens a new websocket to the target host. This will also handle the handshake
# and will raise an exception if the handshake did not complete successfully.
#
# ```
# require "http/web_socket"
#
# HTTP::WebSocket.new("websocket.example.com", "/chat") # Creates a new WebSocket to `websocket.example.com`
# HTTP::WebSocket.new("websocket.example.com", "/chat", tls: true) # Creates a new WebSocket with TLS to `ẁebsocket.example.com`
# ```
def self.new(host : String, path : String, port = nil, tls : HTTP::Client::TLSContext = nil, headers = HTTP::Headers.new)
new(Protocol.new(host, path, port, tls, headers))
end
# Called when a PING frame is received.
def on_ping(&@on_ping : String ->)
end
# Called when a PONG frame is received.
#
# An unsolicited PONG frame should not be responded to.
def on_pong(&@on_pong : String ->)
end
# Called when a text message is received.
def on_message(&@on_message : String ->)
end
# Called when a binary message is received.
def on_binary(&@on_binary : Bytes ->)
end
# Called when the connection is closed by the other party.
def on_close(&@on_close : CloseCode, String ->)
end
protected def check_open
raise IO::Error.new "Closed socket" if closed?
end
# Sends a message payload (message).
def send(message) : Nil
check_open
@ws.send(message)
end
# Sends a PING frame. Received pings will call `#on_ping`.
#
# The receiving party must respond with a PONG.
def ping(message = nil)
check_open
@ws.ping(message)
end
# Sends a PONG frame, which must be in response to a previously received PING frame from `#on_ping`.
def pong(message = nil) : Nil
check_open
@ws.pong(message)
end
def stream(binary = true, frame_size = 1024, &)
check_open
@ws.stream(binary: binary, frame_size: frame_size) do |io|
yield io
end
end
# Sends a close frame, and closes the connection.
# The close frame may contain a body (message) that indicates the reason for closing.
def close(code : CloseCode | Int? = nil, message = nil) : Nil
return if closed?
@closed = true
@ws.close(code, message)
end
# Continuously receives messages and calls previously set callbacks until the websocket is closed.
# Ping and pong messages are automatically handled.
#
# ```
# # Open websocket connection
# ws = HTTP::WebSocket.new("websocket.example.com", "/chat")
#
# # Set callback
# ws.on_message do |msg|
# ws.send "response"
# end
#
# # Start infinite loop
# ws.run
# ```
def run : Nil
loop do
begin
info = @ws.receive(@buffer)
rescue
@on_close.try &.call(CloseCode::AbnormalClosure, "")
@closed = true
break
end
case info.opcode
in .ping?
@current_message.write @buffer[0, info.size]
if info.final
message = @current_message.to_s
@on_ping.try &.call(message)
pong(message) unless closed?
@current_message.clear
end
in .pong?
@current_message.write @buffer[0, info.size]
if info.final
@on_pong.try &.call(@current_message.to_s)
@current_message.clear
end
in .text?
@current_message.write @buffer[0, info.size]
if info.final
@on_message.try &.call(@current_message.to_s)
@current_message.clear
end
in .binary?
@current_message.write @buffer[0, info.size]
if info.final
@on_binary.try &.call(@current_message.to_slice)
@current_message.clear
end
in .close?
@current_message.write @buffer[0, info.size]
if info.final
@current_message.rewind
if @current_message.size >= 2
code = @current_message.read_bytes(UInt16, IO::ByteFormat::NetworkEndian).to_i
code = CloseCode.new(code)
else
code = CloseCode::NoStatusReceived
end
message = @current_message.gets_to_end
@on_close.try &.call(code, message)
close
@current_message.clear
break
end
in .continuation?
# TODO: (asterite) I think this is good, but this case wasn't originally handled
end
end
end
end
require "./web_socket/*"