Skip to content
This repository has been archived by the owner on Jan 2, 2023. It is now read-only.

Commit

Permalink
Changed Connection to use futures for async methods
Browse files Browse the repository at this point in the history
  • Loading branch information
iconara committed Jan 29, 2013
1 parent 3d73db9 commit d52c0b4
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 129 deletions.
4 changes: 2 additions & 2 deletions bin/cqlexec
Expand Up @@ -14,7 +14,7 @@ class CqlExecutor
end

def run(io)
@connection = Cql::Io::Connection.new(host: @options[:host], port: @options[:port]).connect
@connection = Cql::Io::Connection.new(host: @options[:host], port: @options[:port]).connect!

begin
start
Expand All @@ -32,7 +32,7 @@ class CqlExecutor
rescue Interrupt
exit
ensure
@connection.close if @connection && !@connection.closed?
@connection.close! if @connection && !@connection.closed?
end
end

Expand Down
1 change: 1 addition & 0 deletions lib/cql/io.rb
Expand Up @@ -4,6 +4,7 @@ module Cql
module Io
IoError = Class.new(CqlError)
ConnectionError = Class.new(IoError)
IllegalStateError = Class.new(IoError)
end
end

Expand Down
119 changes: 63 additions & 56 deletions lib/cql/io/connection.rb
Expand Up @@ -14,30 +14,45 @@ def initialize(options={})
end

def connect
return if @io_reactor
@io_reactor = IoReactor.new
@io_reactor.add_connection(@host, @port, @timeout)
@reactor_thread = Thread.start do
Thread.current.abort_on_exception = true
@io_reactor.run
unless @io_reactor
@io_reactor = IoReactor.new
f1 = @io_reactor.start
f2 = @io_reactor.add_connection(@host, @port, @timeout)
@connect_future = Future.combine(f1, f2)
end
self
@connect_future
end

def connect!
connect.get
end

def connected?
!!@io_reactor && @io_reactor.connected?
end

def close
@io_reactor && @io_reactor.close
raise IllegalStateError, 'Cannot close a connection that has not connected!' unless @io_reactor
@io_reactor.close
end

def close!
close.get
end

def closed?
@io_reactor && @io_reactor.closed?
!!@io_reactor && @io_reactor.closed?
end

def on_event(&listener)
@io_reactor.add_event_listener(listener)
end

def execute(request, &handler)
@io_reactor.add_request(request, handler)
response_future = Future.new
response_future.on_complete(&handler) if handler
@io_reactor.add_request(request, response_future)
response_future
end

def execute!(request)
Expand Down Expand Up @@ -109,7 +124,7 @@ def handle_write
@write_buffer.slice!(0, bytes_written)
end

def close
def close!
@io.close
end

Expand Down Expand Up @@ -156,7 +171,7 @@ def handle_read
def handle_write
end

def close
def close!
@io.close
end

Expand All @@ -173,48 +188,67 @@ def next_command
end

class IoReactor
def initialize
def initialize()
@lock = Mutex.new
@streams = []
@command_queue = []
@queue_signal_receiver, @queue_signal_sender = IO.pipe
@connected = false
end

def close
raise IllegalStateError, 'IO reactor not running' unless @connected
@closed = true
@closed_future
end

def closed?
@closed
end

def run
@lock.synchronize do
def connected?
@connected
end

def start
unless @connected_future
@connected_future = Future.new
@closed_future = Future.new
@streams << CommandDispatcher.new(@queue_signal_receiver, @command_queue, @lock, @streams)
@closed = false
@connected = true
@reactor_thread = Thread.start do
Thread.current.abort_on_exception = true
@connected_future.complete!
io_loop
@closed_future.complete!
end
end
@closed = false
io_loop
@connected_future
end

def add_connection(host, port, timeout)
@lock.synchronize do
begin
@streams = @streams + [NodeConnection.new(connect(host, port, timeout))]
rescue Errno::EHOSTUNREACH, Errno::EBADF, Errno::EINVAL, SystemCallError, SocketError => e
raise ConnectionError, "Could not connect to #@host:#@port: #{e.message} (#{e.class.name})", e.backtrace
future = Future.new
begin
socket = connect!(host, port, timeout)
@lock.synchronize do
@streams << NodeConnection.new(socket)
end
future.complete!
rescue Errno::EHOSTUNREACH, Errno::EBADF, Errno::EINVAL, SystemCallError, SocketError => e
error = ConnectionError.new("Could not connect to #@host:#@port: #{e.message} (#{e.class.name})")
error.set_backtrace(e.backtrace)
future.fail!(error)
end
future
end

def add_event_listener(listener)
command_queue_push(:event_listener, listener)
end

def add_request(request, listener)
future = ResponseFuture.new
future.on_complete(&listener) if listener
def add_request(request, future)
command_queue_push(:request, request, future)
future
end

private
Expand All @@ -238,15 +272,16 @@ def io_loop
rescue Errno::ECONNRESET, IOError => e
close
ensure
@connected = false
@streams.each do |stream|
begin
stream.close
stream.close!
rescue IOError
end
end
end

def connect(host, port, timeout)
def connect!(host, port, timeout)
socket = nil
exception = nil
addrinfo = Socket.getaddrinfo(host, port, nil, Socket::Constants::SOCK_STREAM)
Expand Down Expand Up @@ -281,34 +316,6 @@ def connect(host, port, timeout)
raise exception
end
end

class ResponseFuture
def initialize
@listeners = []
end

def on_complete(&listener)
if @response
listener.call(@response)
else
@listeners << listener
end
end

def complete!(response)
@response = response
@lock << :ping if @lock
@listeners.each { |l| l.call(response) }
@listeners.clear
end

def get
return @response if @response
@lock = Queue.new
@lock.pop
@response
end
end
end
end
end

0 comments on commit d52c0b4

Please sign in to comment.