This repository has been archived by the owner on Aug 6, 2018. It is now read-only.
/
web_socket.rb
150 lines (125 loc) · 3.82 KB
/
web_socket.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
require "websocket"
require "thread"
require "nio"
# TODO: Global flag could interfere with other code.
::WebSocket.should_raise = true
module LivereloadRails
# Embodies a WebSocket connection as a separate thread.
class WebSocket
PING_TIMEOUT = 1
class << self
# Same as #initialize, but first checks if the request is a websocket upgrade.
#
# @example
# WebSocket.from_rack(env) do |ws|
# …
# ws.on(:open) { … }
# ws.on(:message) { … }
# ws.on(:close) { … }
# end
#
# @return [WebSocket, nil] a websocket instance, or nil if request was not a websocket.
def from_rack(env, &block)
new(env, &block) if env["HTTP_UPGRADE"] == "websocket"
end
end
# @param env a rack environment hash
def initialize(env)
raise ArgumentError, "no block given" unless block_given?
@env = env
@handlers = { open: Set.new, close: Set.new, message: Set.new }
@handshake = ::WebSocket::Handshake::Server.new(secure: false)
queue = Queue.new
@thread = Thread.new do
begin
finish_initialize = proc do |event|
finish_initialize = nil
queue << event
end
hijack do
yield self
finish_initialize[:connected]
end
ensure
finish_initialize[$!] if finish_initialize
end
end
message = queue.pop
raise message if message.is_a?(Exception)
end
attr_reader :thread
# Register an event handler.
#
# @example
# handler = websocket.on(:open) { … }
#
# @note If an event handler raises an error, handlers after it will not run.
# @param [Symbol] event (one of :open, :close, :message)
def on(event, &handler)
raise ArgumentError, "no event named #{event.inspect}" unless @handlers.has_key?(event)
@handlers[event].add(handler)
handler
end
# Queues data for writing. It is not guaranteed that client will receive message.
#
# @param [#to_s] data
# @param [Symbol] type
def write(data, type: :text)
frame = ::WebSocket::Frame::Outgoing::Server.new(data: data, type: type, version: @handshake.version)
@stream.write(frame.to_s)
end
# Close the connection.
#
# Can safely be called multiple times.
def close
@stream.close if @stream
end
private
# Trigger all handlers for the given event with the given arguments.
#
# @param [Symbol] event
def trigger(event, *args)
@handlers[event].each { |handler| handler.call(*args) }
end
# Main loop of the WebSocket thread.
def hijack
unless @env["rack.hijack?"]
raise HijackingNotSupported, "server does not support hijacking"
end
# See http://www.rubydoc.info/github/rack/rack/file/SPEC
@env["rack.hijack"].call
@io = @env["rack.hijack_io"]
yield
@handshake.from_rack(@env)
raise @handshake.error unless @handshake.valid?
frame_parser = ::WebSocket::Frame::Incoming::Server.new(version: @handshake.version)
@stream = LivereloadRails::Stream.new(@io) do |input|
handle_frames(frame_parser, input)
end
@stream.write(@handshake.to_s)
trigger :open
handle_frames(frame_parser, @handshake.leftovers)
@stream.loop
ensure
close
trigger :close, *$!
end
def handle_frames(frame_parser, data)
frame_parser << data
while frame = frame_parser.next
case frame.type
when :text, :binary
trigger :message, frame
when :ping
write(nil, :pong)
when :pong
# TODO: reset timeout timer.
when :close
close
else
raise "unknown frame type #{frame.type}"
end
end
end
end
end