Skip to content
This repository has been archived by the owner on Dec 7, 2018. It is now read-only.

Commit

Permalink
Merge ffc1fe2 into 907126a
Browse files Browse the repository at this point in the history
  • Loading branch information
robertjpayne committed Oct 13, 2013
2 parents 907126a + ffc1fe2 commit ab2720a
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 85 deletions.
42 changes: 42 additions & 0 deletions examples/websockets_new.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
require 'reel'

class Server < Reel::Server
include Celluloid::Logger

def initialize(host = '0.0.0.0', port = (ENV['PORT'] || 5000).to_i)
super(host, port, &method(:on_connection))
end

def on_connection(connection)
connection.each_request do |request|
if request.websocket?
handle_websocket_request(request, connection)
else
handle_http_request(request, connection)
end
end
end

def handle_http_request(request, connection)
request.respond :ok, "Hello Lame"
end

def handle_websocket_request(request, connection)
debug("[handle_websocket_request] method: #{request.method}, url: #{request.url}, uri: #{request.uri}, query_string: #{request.query_string}, fragment: #{request.fragment}, headers: #{request.headers}")
request.websocket.on :open do |event|
debug('[ws] open')
end
request.websocket.on :message do |event|
debug('[ws] message')
end
request.websocket.on :close do |event|
debug('[ws] close')
end
request.websocket.on :error do |event|
debug('[ws] error')
end
request.websocket.run
debug("[handle_websocket_request] finished")
end

end
4 changes: 2 additions & 2 deletions lib/reel/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ def websocket?; @request_info.websocket_request?; end
# the underlying connection
def websocket
@websocket ||= begin
raise StateError, "can't upgrade this request to a websocket" unless websocket?
WebSocket.new(@request_info, @connection.hijack_socket)
raise StateError, "can't upgrade this request to a websocket" unless websocket?
WebSocket.new(self, @connection)
end
end

Expand Down
134 changes: 52 additions & 82 deletions lib/reel/websocket.rb
Original file line number Diff line number Diff line change
@@ -1,104 +1,74 @@
require 'forwardable'
require 'websocket_parser'
require 'websocket/driver'

module Reel
class WebSocket
include Celluloid::Logger
extend Forwardable
include ConnectionMixin
include RequestMixin

attr_reader :socket
def_delegators :@socket, :addr, :peeraddr

def initialize(info, socket)
@request_info = info
@socket = socket

handshake = ::WebSocket::ClientHandshake.new(:get, url, headers)

if handshake.valid?
response = handshake.accept_response
response.render(socket)
else
error = handshake.errors.first

response = Response.new(400)
response.reason = handshake.errors.first
response.render(@socket)

raise HandshakeError, "error during handshake: #{error}"
end

@parser = ::WebSocket::Parser.new

@parser.on_close do |status, reason|
# According to the spec the server must respond with another
# close message before closing the connection
@socket << ::WebSocket::Message.close.to_data
close
end

@parser.on_ping do |payload|
@socket << ::WebSocket::Message.pong(payload).to_data
end
def initialize(request, connection)
@request = request
@connection = connection
@socket = nil
end

[:next_message, :next_messages, :on_message, :on_error, :on_close, :on_ping, :on_pong].each do |meth|
define_method meth do |&proc|
@parser.send __method__, &proc
end
end

def read_every(n, unit = :s)
cancel_timer! # only one timer allowed per stream
seconds = case unit.to_s
when /\Am/
n * 60
when /\Ah/
n * 3600
else
n

def_delegators :driver, :on, :text, :binary, :ping, :close

def run
# detach the connection and manage it ourselves
@connection.detach

# grab socket
@socket = @connection.socket

# start the driver
driver.start

# hook into close message from client
driver.on(:close) { @connection.close }

begin
loop do
break unless @connection.alive?
buffer = @socket.readpartial(@connection.buffer_size)
driver.parse(buffer)
end
ensure
@connection.close
end
@timer = Celluloid.every(seconds) { read }
rescue EOFError
@connection.close
end
alias read_interval read_every
alias read_frequency read_every

def read
@parser.append @socket.readpartial(Connection::BUFFER_SIZE) until msg = @parser.next_message
msg
rescue
cancel_timer!
raise
def url
@request.url
end

def body
nil
end

def write(msg)
@socket << ::WebSocket::Message.new(msg).to_data
msg
rescue IOError, Errno::ECONNRESET, Errno::EPIPE
cancel_timer!
raise SocketError, "error writing to socket"
rescue
cancel_timer!
raise
def env
@env ||= begin
e = {
:method => @request.method,
:input => @request.body.to_s,
'REMOTE_ADDR' => @request.remote_addr
}.merge(Hash[@request.headers.map { |key, value| ['HTTP_' + key.upcase.gsub('-','_'),value ] }])
::Rack::MockRequest.env_for(url, e)
end
end
alias_method :<<, :write

def closed?
@socket.closed?
def write(buffer)
# should probably raise an error here if
# writing to socket that has not been started up yet
@socket.write(buffer)
end

def close
cancel_timer!
@socket.close unless closed?
@connection.close if @connection.alive? && !@connection.attached?
end

def cancel_timer!
@timer && @timer.cancel
protected

def driver
@driver ||= ::WebSocket::Driver.rack(self)
end

end
Expand Down
2 changes: 1 addition & 1 deletion reel.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency 'celluloid-io', '>= 0.15.0'
gem.add_runtime_dependency 'http', '>= 0.5.0'
gem.add_runtime_dependency 'http_parser.rb', '>= 0.6.0.beta.2'
gem.add_runtime_dependency 'websocket_parser', '>= 0.1.4'
gem.add_runtime_dependency 'websocket-driver', '>= 0.3.0'

gem.add_development_dependency 'rake'
gem.add_development_dependency 'rspec'
Expand Down

0 comments on commit ab2720a

Please sign in to comment.