Skip to content
This repository has been archived by the owner on Dec 7, 2018. It is now read-only.

pass websockets through rack environment #17

Merged
merged 10 commits into from Nov 20, 2012
92 changes: 92 additions & 0 deletions examples/websocket.ru
@@ -0,0 +1,92 @@
require 'rubygems'
require 'bundler/setup'
require 'reel'

class TimeServer
include Celluloid
include Celluloid::Notifications

def initialize
run!
end

def run
now = Time.now.to_f
sleep now.ceil - now + 0.001

every(1) { publish 'time_change', Time.now }
end
end

class TimeClient
include Celluloid
include Celluloid::Notifications
include Celluloid::Logger

def initialize(websocket)
info "Streaming time changes to client"
@socket = websocket
subscribe('time_change', :notify_time_change)
end

def notify_time_change(topic, new_time)
@socket << new_time.inspect
rescue Reel::SocketError
info "Time client disconnected"
terminate
end
end

class Web
include Celluloid::Logger

def render_index
info "200 OK: /"
<<-HTML
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Reel WebSockets time server example</title>
<style>
body {
font-family: "HelveticaNeue-Light", "Helvetica Neue Light", "Helvetica Neue", Helvetica, Arial, "Lucida Grande", sans-serif;
font-weight: 300;
text-align: center;
}

#content {
width: 800px;
margin: 0 auto;
background: #EEEEEE;
padding: 1em;
}
</style>
</head>
<script>
var SocketKlass = "MozWebSocket" in window ? MozWebSocket : WebSocket;
var ws = new SocketKlass('ws://' + window.location.host + '/timeinfo');
ws.onmessage = function(msg){
document.getElementById('current-time').innerHTML = msg.data;
}
</script>
<body>
<div id="content">
<h1>Time Server Example</h1>
<div>The time is now: <span id="current-time">...</span></div>
</div>
</body>
</html>
HTML
end
end

TimeServer.supervise_as :time_server

run Rack::URLMap.new(
"/" => Proc.new{ [200, {"Content-Type" => "text/html"}, [Web.new.render_index]]},
"/timeinfo" => Proc.new{ |env|
TimeClient.new(env["websocket.rack"])
[200, {}, []] # Fake response for middleware.
}
)
1 change: 1 addition & 0 deletions examples/websocket_rack.sh
@@ -0,0 +1 @@
rackup -s reel websocket.ru -Enone
3 changes: 3 additions & 0 deletions lib/reel.rb
Expand Up @@ -4,6 +4,9 @@

require 'reel/version'

require 'reel/remote_connection'
require 'reel/uri_parts'

require 'reel/connection'
require 'reel/logger'
require 'reel/request'
Expand Down
14 changes: 2 additions & 12 deletions lib/reel/connection.rb
@@ -1,6 +1,8 @@
module Reel
# A connection to the HTTP server
class Connection
include RemoteConnection

class StateError < RuntimeError; end # wrong state for a given operation

CONNECTION = 'Connection'.freeze
Expand Down Expand Up @@ -39,18 +41,6 @@ def detach
self
end

# Obtain the IP address of the remote connection
def remote_ip
@socket.peeraddr(false)[3]
end
alias_method :remote_addr, :remote_ip

# Obtain the hostname of the remote connection
def remote_host
# NOTE: Celluloid::IO does not yet support non-blocking reverse DNS
@socket.peeraddr(true)[2]
end

# Reset the current request state
def reset_request(state = :header)
@request_state = state
Expand Down
44 changes: 33 additions & 11 deletions lib/reel/rack_worker.rb
@@ -1,6 +1,7 @@
module Reel
class RackWorker
include Celluloid
include Celluloid::Logger

INITIAL_BODY = ''

Expand Down Expand Up @@ -41,6 +42,7 @@ class RackWorker
RACK_URL_SCHEME = 'rack.url_scheme'.freeze
ASYNC_CALLBACK = 'async.callback'.freeze
ASYNC_CLOSE = 'async.close'.freeze
RACK_WEBSOCKET = 'rack.websocket'.freeze

PROTO_RACK_ENV = {
RACK_VERSION => Rack::VERSION,
Expand All @@ -61,19 +63,31 @@ def initialize(handler)

def handle(connection)
while request = connection.request
begin
env = rack_env(request, connection)
status, headers, body_parts = @app.call(env)
body = response_body(body_parts)

connection.respond Response.new(status, headers, body)
ensure
body.close if body.respond_to?(:close)
body_parts.close if body_parts.respond_to?(:close)
case request
when Request
handle_request(request, connection)
when WebSocket
handle_websocket(request, connection)
end
end
end

def handle_request(request, connection)
env = rack_env(request, connection)
status, headers, body_parts = @app.call(env)
body = response_body(body_parts)

connection.respond Response.new(status, headers, body)
ensure
body.close if body.respond_to?(:close)
body_parts.close if body_parts.respond_to?(:close)
end

def handle_websocket(request, connection)
env = rack_env(request, connection)
@app.call(env)
end

def response_body(body_parts)
if body_parts.respond_to?(:to_path)
File.new(body_parts.to_path)
Expand All @@ -90,8 +104,16 @@ def rack_env(request, connection)
env[SERVER_NAME] = request[HOST].to_s.split(':').first || @handler[:Host]
env[SERVER_PORT] = @handler[:port].to_s

env[REMOTE_ADDR] = connection.remote_ip
env[REMOTE_HOST] = connection.remote_host
case request
when WebSocket
remote_connection = request
env[RACK_WEBSOCKET] = request
when Request
remote_connection = connection
end

env[REMOTE_ADDR] = remote_connection.remote_ip
env[REMOTE_HOST] = remote_connection.remote_host

env[PATH_INFO] = request.path
env[REQUEST_METHOD] = request.method.to_s.upcase
Expand Down
13 changes: 13 additions & 0 deletions lib/reel/remote_connection.rb
@@ -0,0 +1,13 @@
module RemoteConnection
# Obtain the IP address of the remote connection
def remote_ip
@socket.peeraddr(false)[3]
end
alias_method :remote_addr, :remote_ip

# Obtain the hostname of the remote connection
def remote_host
# NOTE: Celluloid::IO does not yet support non-blocking reverse DNS
@socket.peeraddr(true)[2]
end
end
20 changes: 3 additions & 17 deletions lib/reel/request.rb
Expand Up @@ -2,6 +2,8 @@

module Reel
class Request
include URIParts

attr_accessor :method, :version, :url, :headers

UPGRADE = 'Upgrade'.freeze
Expand All @@ -22,7 +24,7 @@ def self.read(connection)

upgrade = headers[UPGRADE]
if upgrade && upgrade.downcase == WEBSOCKET
WebSocket.new(connection.socket, parser.url, headers)
WebSocket.new(connection.socket, parser.http_method, parser.url, headers)
else
Request.new(parser.http_method, parser.url, parser.http_version, headers, connection)
end
Expand All @@ -39,22 +41,6 @@ def [](header)
@headers[header]
end

def uri
@uri ||= URI(url)
end

def path
uri.path
end

def query_string
uri.query
end

def fragment
uri.fragment
end

def body
@body ||= begin
raise "no connection given" unless @connection
Expand Down
17 changes: 17 additions & 0 deletions lib/reel/uri_parts.rb
@@ -0,0 +1,17 @@
module URIParts
def uri
@uri ||= URI(url)
end

def path
uri.path
end

def query_string
uri.query
end

def fragment
uri.fragment
end
end
13 changes: 10 additions & 3 deletions lib/reel/websocket.rb
Expand Up @@ -2,10 +2,13 @@

module Reel
class WebSocket
attr_reader :url, :headers
include RemoteConnection
include URIParts

def initialize(socket, url, headers)
@socket, @url, @headers = socket, url, headers
attr_reader :url, :headers, :method

def initialize(socket, method, url, headers)
@socket, @method, @url, @headers = socket, method, url, headers

handshake = ::WebSocket::ClientHandshake.new(:get, url, headers)

Expand Down Expand Up @@ -45,6 +48,10 @@ def read
msg
end

def body
nil
end

def write(msg)
@socket << ::WebSocket::Message.new(msg).to_data
msg
Expand Down
15 changes: 14 additions & 1 deletion spec/reel/rack_worker_spec.rb
@@ -1,7 +1,6 @@
require 'spec_helper'

describe Reel::RackWorker do

let(:endpoint) { URI(example_url) }

let(:worker) do
Expand Down Expand Up @@ -45,6 +44,20 @@
end
end

context "WebSocket" do
include WebSocketHelpers

it "places websocket into rack env" do
with_socket_pair do |client, connection|
client << handshake.to_data
request = connection.request
env = worker.rack_env(request, connection)

env["rack.websocket"].should == request
end
end
end

it "delegates web requests to the rack app" do
ex = nil

Expand Down
19 changes: 2 additions & 17 deletions spec/reel/websocket_spec.rb
@@ -1,26 +1,11 @@
require 'spec_helper'

describe Reel::WebSocket do
let(:example_host) { "www.example.com" }
let(:example_path) { "/example"}
let(:example_url) { "ws://#{example_host}#{example_path}" }
include WebSocketHelpers

let(:example_message) { "Hello, World!" }
let(:another_message) { "What's going on?" }

let :handshake_headers do
{
"Host" => example_host,
"Upgrade" => "websocket",
"Connection" => "Upgrade",
"Sec-WebSocket-Key" => "dGhlIHNhbXBsZSBub25jZQ==",
"Origin" => "http://example.com",
"Sec-WebSocket-Protocol" => "chat, superchat",
"Sec-WebSocket-Version" => "13"
}
end

let(:handshake) { WebSocket::ClientHandshake.new(:get, example_url, handshake_headers) }

it "performs websocket handshakes" do
with_socket_pair do |client, connection|
client << handshake.to_data
Expand Down