Skip to content

Commit

Permalink
Abstracted the idea of a connection.
Browse files Browse the repository at this point in the history
Added EventMachine+Fibers support.
  • Loading branch information
cjbottaro committed Jul 20, 2011
1 parent 2960f3b commit 68824d7
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 16 deletions.
37 changes: 21 additions & 16 deletions lib/qrack/client.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# encoding: utf-8

require "qrack/amq-client-url"
require "qrack/connection"

module Qrack

Expand Down Expand Up @@ -53,6 +54,7 @@ def initialize(connection_string_or_opts = Hash.new, opts = Hash.new)
@channel = create_channel()
@exchanges ||= {}
@queues ||= {}
@connection_type = opts[:connection_type] || default_connection_type
end


Expand Down Expand Up @@ -189,32 +191,35 @@ def send_command(cmd, *args)
def socket
return @socket if @socket and (@status == :connected) and not @socket.closed?

begin
# Attempt to connect.
@socket = Bunny::Timer::timeout(@connect_timeout, ConnectionTimeout) do
TCPSocket.new(host, port)
end
# The following line takes a lowercased, underscored symbol and converts it to a camel cased string, Ex:
# :fibered_em => FiberedEm
# :socket => Socket
connection_class_name = @connection_type.to_s.split("_").collect{ |s| s.capitalize }.join

if Socket.constants.include?('TCP_NODELAY') || Socket.constants.include?(:TCP_NODELAY)
@socket.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
end
# Get the connection class.
connection_class = Qrack::Connection.const_get(connection_class_name)

if @ssl
require 'openssl' unless defined? OpenSSL::SSL
@socket = OpenSSL::SSL::SSLSocket.new(@socket)
@socket.sync_close = true
@socket.connect
@socket.post_connection_check(host) if @verify_ssl
@socket
end
begin
@socket = connection_class.new host, port, :connect_timeout => @connect_timeout,
:ssl => @ssl,
:verify_ssl => @verify_ssl
rescue => e
raise
@status = :not_connected
raise Bunny::ServerDownError, e.message
end

@socket
end

def default_connection_type
if defined?(EM) and defined?(Fiber) and EM.reactor_running? and Fiber.respond_to?(:current)
:fibered_em
else
:socket
end
end

end

end
6 changes: 6 additions & 0 deletions lib/qrack/connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module Qrack
module Connection
autoload :Socket, "qrack/connection/socket"
autoload :FiberedEm, "qrack/connection/fibered_em"
end
end
27 changes: 27 additions & 0 deletions lib/qrack/connection/abstract_base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module Qrack
module Connection
class AbstractBase

def initialize(host, port)
raise "implement me"
end

def read(len)
raise "implement me"
end

def write(data)
raise "implement me"
end

def close
raise "implement me"
end

def closed?
raise "implement me"
end

end
end
end
81 changes: 81 additions & 0 deletions lib/qrack/connection/fibered_em.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
require "eventmachine"
require "fiber"
require "qrack/connection/abstract_base"

module Qrack
module Connection
class FiberedEm < AbstractBase

module EmHandler

def post_init
@buffer = ""
@closed = false
end

def connection_completed
fiber_resume
end

def receive_data(data)
@buffer += data
fiber_resume
end

def unbind
@closed = true
end

def fiber_yield
@fiber = Fiber.current
Fiber.yield
end

def fiber_resume
@fiber.tap{ @fiber = nil }.resume if @fiber
end

def read(len)
fiber_yield while @buffer.length < len
@buffer[0,len].tap{ @buffer = @buffer[len..-1] || "" }
end

def write(data)
send_data(data)
end

def close
close_connection
@closed = true
end

def closed?
@closed
end

end

def initialize(host, port, options = {})
@connection = EM.connect(host, port, EmHandler)
@connection.fiber_yield
end

def read(len)
@connection.read(len)
end

def write(data)
@connection.write(data)
end

def closed?
@connection.closed?
end

def close
@connection.close
end

end
end
end
44 changes: 44 additions & 0 deletions lib/qrack/connection/socket.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
require "socket"
require "qrack/connection/abstract_base"

module Qrack
module Connection
class Socket < AbstractBase

def initialize(host, port, options = {})
Bunny::Timer::timeout(options[:connect_timeout], ConnectionTimeout) do
@socket = TCPSocket.new(host, port)
end

if Socket.constants.include?('TCP_NODELAY') || Socket.constants.include?(:TCP_NODELAY)
@socket.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
end

if options[:ssl]
require 'openssl' unless defined? OpenSSL::SSL
@socket = OpenSSL::SSL::SSLSocket.new(@socket)
@socket.sync_close = true
@socket.connect
@socket.post_connection_check(host) if options[:verify_ssl]
end
end

def read(*args)
@socket.read(*args)
end

def write(*args)
@socket.write(*args)
end

def close
@socket.close
end

def closed?
@socket.closed?
end

end
end
end

0 comments on commit 68824d7

Please sign in to comment.