diff --git a/lib/reel/request.rb b/lib/reel/request.rb index 15eb0d3..63c164b 100644 --- a/lib/reel/request.rb +++ b/lib/reel/request.rb @@ -110,10 +110,10 @@ def websocket?; @request_info.websocket_request?; end # Return a Reel::WebSocket for this request, hijacking the socket from # the underlying connection - def websocket + def websocket(&block) @websocket ||= begin raise StateError, "can't upgrade this request to a websocket" unless websocket? - WebSocket.new(@request_info, @connection.hijack_socket) + WebSocket.new(@connection, &block) end end diff --git a/lib/reel/websocket.rb b/lib/reel/websocket.rb index 6f0968c..3943b64 100644 --- a/lib/reel/websocket.rb +++ b/lib/reel/websocket.rb @@ -1,105 +1,63 @@ require 'forwardable' -require 'websocket_parser' +require 'websocket/driver' module Reel class WebSocket extend Forwardable - include ConnectionMixin - include RequestMixin - attr_reader :socket - def_delegators :@socket, :addr, :peeraddr + NO_PREFIX_HEADERS = %w(CONTENT_TYPE CONTENT_LENGTH).freeze - def initialize(info, socket) - @request_info = info - @socket = socket + attr_reader :env, :url, :socket + def_delegators :driver, :text, :binary, :ping, :close + def_delegators :socket, :write - handshake = ::WebSocket::ClientHandshake.new(:get, url, headers) + def initialize(connection) + @connection = connection + @socket = @connection.socket - if handshake.valid? - response = handshake.accept_response - response.render(socket) - else - error = handshake.errors.first + @connection.detach + @connection.hijack_socket - response = Response.new(400) - response.reason = handshake.errors.first - response.render(@socket) + driver.on(:close) { close } + driver.on(:connect) { driver.start } - raise HandshakeError, "error during handshake: #{error}" - end - - @parser = ::WebSocket::Parser.new - - @parser.on_close do |status, reason| - # According to the spec the server must respond with another - # close message before closing the connection - @socket << ::WebSocket::Message.close.to_data - close - end + yield driver if block_given? - @parser.on_ping do |payload| - @socket << ::WebSocket::Message.pong(payload).to_data - end + start_listening end - [:next_message, :next_messages, :on_message, :on_error, :on_close, :on_ping, :on_pong].each do |meth| - define_method meth do |&proc| - @parser.send __method__, &proc - end + def close + socket.close unless socket.closed? end - def read_every(n, unit = :s) - cancel_timer! # only one timer allowed per stream - seconds = case unit.to_s - when /\Am/ - n * 60 - when /\Ah/ - n * 3600 - else - n + def start_listening + loop do + break if socket.closed? + buffer = socket.readpartial(@connection.buffer_size) + driver.parse(buffer) end - @timer = Celluloid.every(seconds) { read } + rescue EOFError + close end - alias read_interval read_every - alias read_frequency read_every - def read - @parser.append @socket.readpartial(Connection::BUFFER_SIZE) until msg = @parser.next_message - msg - rescue - cancel_timer! - raise - end + private - def body - nil + def driver + @driver ||= ::WebSocket::Driver.server(self) end - def write(msg) - @socket << ::WebSocket::Message.new(msg).to_data - msg - rescue IOError, Errno::ECONNRESET, Errno::EPIPE - cancel_timer! - raise SocketError, "error writing to socket" - rescue - cancel_timer! - raise - end - alias_method :<<, :write - - def closed? - @socket.closed? - end + def convert_headers(headers) + prefixed_headers = headers.map do |key, value| + header = key.upcase.gsub('-', '_') - def close - cancel_timer! - @socket.close unless closed? - end + if NO_PREFIX_HEADERS.member?(header) + [header, value] + else + ['HTTP_' + header, value] + end + end - def cancel_timer! - @timer && @timer.cancel + Hash[prefixed_headers] end - end end diff --git a/reel.gemspec b/reel.gemspec index 35f4005..a963400 100644 --- a/reel.gemspec +++ b/reel.gemspec @@ -19,7 +19,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency 'celluloid-io', '>= 0.15.0' gem.add_runtime_dependency 'http', '>= 0.6.0.pre' gem.add_runtime_dependency 'http_parser.rb', '>= 0.6.0' - gem.add_runtime_dependency 'websocket_parser', '>= 0.1.6' + gem.add_runtime_dependency 'websocket-driver', '>= 0.3.0' gem.add_development_dependency 'rake' gem.add_development_dependency 'rspec', '>= 2.11.0' diff --git a/spec/reel/websocket_spec.rb b/spec/reel/websocket_spec.rb index 7271971..ba6bd18 100644 --- a/spec/reel/websocket_spec.rb +++ b/spec/reel/websocket_spec.rb @@ -1,128 +1,188 @@ require 'spec_helper' describe Reel::WebSocket do - include WebSocketHelpers - - let(:example_message) { "Hello, World!" } - let(:another_message) { "What's going on?" } + let(:host) { '127.0.0.1' } + let(:port) { 10101 } + let(:hello) { "\x81\x85\xEF\xE1$\x92\x87\x84H\xFE\x80" } + let(:another_message) { "\x81\x8F\xCBd\x14\xC9\xAA\n{\xBD\xA3\x01f\xE9\xA6\x01g\xBA\xAA\x03q" } + let(:ping_frame) { "\x89\x80t\xE1\xD8\x17" } + let(:pong_opcode) { 10 } + let(:close_frame) { "\x88\x80\xB7\b\xEA\x14" } + let(:queue) { Queue.new } + + let(:handshake_request) do + txt = <<-TXT + GET /example HTTP/1.1\r + Host: www.example.com\r + Upgrade: websocket\r + Connection: Upgrade\r + Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r + Sec-WebSocket-Protocol: chat, superchat\r + Sec-WebSocket-Version: 13\r + Origin: http://example.com\r\n\r + TXT + + strip_heredoc(txt) + end - it "performs websocket handshakes" do - with_socket_pair do |client, peer| - connection = Reel::Connection.new(peer) - client << handshake.to_data + let(:handshake_response) do + txt = <<-TXT + HTTP/1.1 101 Switching Protocols\r + Upgrade: websocket\r + Connection: Upgrade\r + Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk=\r\n\r + TXT - request = connection.request - request.should be_websocket + strip_heredoc(txt) + end - websocket = request.websocket - websocket.should be_a Reel::WebSocket + before(:each) do + @server = Celluloid::IO::TCPServer.new(host, port) + @client = Celluloid::IO::TCPSocket.new(host, port) + @peer = @server.accept + @connection = Reel::Connection.new(@peer) + end - handshake.errors.should be_empty - end + after(:each) do + @client.close + @server.close end - it "raises an error if trying to close a connection upgraded to socket" do - with_socket_pair do |client, peer| - connection = Reel::Connection.new(peer) - client << handshake.to_data + it 'performs the websocket handshake' do + Thread.new { Reel::WebSocket.new(@connection) } - websocket = connection.request.websocket - websocket.should be_a Reel::WebSocket - expect { connection.close }.to raise_error(Reel::StateError) - end + @client.send(handshake_request, 0) + response = @client.read + + expect(response).to eq(handshake_response) end - it "knows its URL" do - with_websocket_pair do |_, websocket| - websocket.url.should == example_path + it 'responds to pings with a pong' do + with_websocket do + @client.send(ping_frame, 0) + + response = @client.read + opcode = extract_opcode(response) + + expect(opcode).to eq(pong_opcode) end end - it "knows its headers" do - with_websocket_pair do |_, websocket| - websocket['Host'].should == example_host + it 'raises an error if trying to close a connection upgraded to socket' do + with_websocket do + expect { @connection.close }.to raise_error(Reel::StateError) end end - it "reads frames" do - with_websocket_pair do |client, websocket| - client << WebSocket::Message.new(example_message).to_data - client << WebSocket::Message.new(another_message).to_data + it 'reads incoming frames' do + message_handler = ->(message, _ws) { queue << message } - websocket.read.should == example_message - websocket.read.should == another_message + with_websocket(message: message_handler) do + @client.send(hello, 0) + @client.send(another_message, 0) end + + expect(queue.pop).to eq('hello') + expect(queue.pop).to eq('another message') end - it "writes messages" do - with_websocket_pair do |client, websocket| - websocket.write example_message - websocket.write another_message + it 'writes outgoing data to its socket' do + test_msg = 'TEST MESSAGE' + test_msg2 = 'message received' - parser = WebSocket::Parser.new + open_handler = lambda do |ws| + ws.text(test_msg) + queue << :opened + end - parser.append client.readpartial(4096) until first_message = parser.next_message - first_message.should == example_message + message_handler = ->(_msg, ws) { ws.text(test_msg2) } - parser.append client.readpartial(4096) until next_message = parser.next_message - next_message.should == another_message - end - end + with_websocket(open: open_handler, message: message_handler) do |responses| + if responses.size > 1 + encoded_message = responses[1] + else + queue.pop + encoded_message = @client.read + end + + message1 = decode_message(encoded_message) + @client.send(hello, 0) + encoded_message2 = @client.read + message2 = decode_message(encoded_message2) - it "closes" do - with_websocket_pair do |_, websocket| - websocket.should_not be_closed - websocket.close - websocket.should be_closed + expect(message1).to eq(test_msg) + expect(message2).to eq(test_msg2) end end - it "raises a RequestError when connection used after it was upgraded" do - with_socket_pair do |client, peer| - connection = Reel::Connection.new(peer) - client << handshake.to_data + it 'closes the socket when it receives a close frame' do + close_handler = ->(_ws) { queue << :closed } - remote_host = connection.remote_host + with_websocket(close: close_handler) do + expect(@peer).not_to be_closed - request = connection.request - request.should be_websocket - websocket = request.websocket - websocket.should be_a Reel::WebSocket + @client.send(close_frame, 0) + queue.pop - expect { connection.remote_host }.to raise_error(Reel::StateError) - websocket.remote_host.should == remote_host + expect(@peer).to be_closed end end - it "performs websocket handshakes with header key case-insensitivity" do - with_socket_pair do |client, peer| - connection = Reel::Connection.new(peer) - client << case_handshake.to_data + it 'raises a RequestError when connection used after it was upgraded' do + Thread.new do + Reel::WebSocket.new(@connection) do |ws| + ws.on :open do + queue << :opened + end + end + end - request = connection.request - request.should be_websocket + @client.send(handshake_request, 0) + queue.pop + expect { @connection.remote_host }.to raise_error(Reel::StateError) + end - websocket = request.websocket - websocket.should be_a Reel::WebSocket + def strip_heredoc(str) + indent = str.scan(/^[ \t]*(?=\S)/).min.size || 0 + str.gsub(/^[ \t]{#{indent}}/, '') + end - case_handshake.errors.should be_empty + def with_websocket(handlers = {}) + message_handler = handlers[:message] + open_handler = handlers[:open] + close_handler = handlers[:close] + + Thread.new do + Reel::WebSocket.new(@connection) do |ws| + ws.on :message do |event| + message_handler.call(event.data, ws) if message_handler + end + + ws.on :open do + open_handler.call(ws) if open_handler + end + + ws.on :close do + close_handler.call(ws) if close_handler + end + end end - end - def with_websocket_pair - with_socket_pair do |client, peer| - connection = Reel::Connection.new(peer) - client << handshake.to_data - request = connection.request + @client.send(handshake_request, 0) + responses = @client.read.split("\r\n\r\n") - request.should be_websocket - websocket = request.websocket - websocket.should be_a Reel::WebSocket + yield(responses) + end - # Discard handshake - client.readpartial(4096) + def extract_opcode(message) + opcode_translation = 0b00001111 + bytes = message.each_byte.to_a + bytes[0] & opcode_translation + end - yield client, websocket - end + def decode_message(encoded_message) + encoded_bytes = encoded_message.each_byte.to_a + message_bytes = encoded_bytes[2..-1] + WebSocket::Driver.encode(message_bytes) end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 31680f5..f7d2063 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -42,41 +42,3 @@ def with_socket_pair peer.close rescue nil end end - -module WebSocketHelpers - def self.included(spec) - spec.instance_eval do - let(:example_host) { "www.example.com" } - let(:example_path) { "/example"} - let(:example_url) { "ws://#{example_host}#{example_path}" } - let :handshake_headers do - { - "Host" => example_host, - "Upgrade" => "websocket", - "Connection" => "Upgrade", - "Sec-WebSocket-Key" => "dGhlIHNhbXBsZSBub25jZQ==", - "Origin" => "http://example.com", - "Sec-WebSocket-Protocol" => "chat, superchat", - "Sec-WebSocket-Version" => "13" - } - end - - let :case_handshake_headers do - { - "HoSt" => example_host, - "UpgRAde" => "websocket", - "ConnECTion" => "Upgrade", - "Sec-WebsOCket-Key" => "dGhlIHNhbXBsZSBub25jZQ==", - "Origin" => "http://example.com", - "Sec-WEBsOCKET-pROTOCol" => "chat, superchat", - "Sec-WEBsOCKET-vERsion" => "13" - } - end - - let(:handshake) { WebSocket::ClientHandshake.new(:get, example_url, handshake_headers) } - let(:case_handshake) do - WebSocket::ClientHandshake.new(:get, example_url, case_handshake_headers) - end - end - end -end