Skip to content
This repository has been archived by the owner on Jan 2, 2023. It is now read-only.

Commit

Permalink
Merge fd50dd5 into cdb51a4
Browse files Browse the repository at this point in the history
  • Loading branch information
grddev committed May 10, 2016
2 parents cdb51a4 + fd50dd5 commit 5eb4b3a
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 57 deletions.
33 changes: 17 additions & 16 deletions lib/cql/client/connection_manager.rb
Expand Up @@ -7,48 +7,49 @@ class ConnectionManager
include Enumerable

def initialize
@connections = []
@connections = [].freeze
@lock = Mutex.new
end

def add_connections(connections)
@lock.synchronize do
@connections.concat(connections)
@connections = (@connections + connections).freeze
connections.each do |connection|
connection.on_closed do
@lock.synchronize do
@connections.delete(connection)
@connections = (@connections - [connection]).freeze
end
end
end
end
end

def connected?
@lock.synchronize do
@connections.any?
end
!snapshot.empty?
end

def snapshot
@lock.synchronize do
@connections.dup
connections = nil
@lock.lock
begin
connections = @connections
ensure
@lock.unlock
end
connections
end

def random_connection
raise NotConnectedError unless connected?
@lock.synchronize do
@connections.sample
end
connections = snapshot
raise NotConnectedError if connections.empty?
connections.sample
end

def each_connection(&callback)
return self unless block_given?
raise NotConnectedError unless connected?
@lock.synchronize do
@connections.each(&callback)
end
connections = snapshot
raise NotConnectedError if connections.empty?
connections.each(&callback)
end
alias_method :each, :each_connection
end
Expand Down
38 changes: 20 additions & 18 deletions lib/cql/protocol/cql_protocol_handler.rb
Expand Up @@ -26,6 +26,7 @@ def initialize(connection, scheduler, protocol_version, compressor=nil)
@connection.on_data(&method(:receive_data))
@connection.on_closed(&method(:socket_closed))
@promises = Array.new(128) { nil }
@free_promises = (0...@promises.size).to_a
@read_buffer = CqlByteBuffer.new
@frame_encoder = FrameEncoder.new(protocol_version, @compressor)
@frame_decoder = FrameDecoder.new(@compressor)
Expand Down Expand Up @@ -128,7 +129,7 @@ def send_request(request, timeout=nil)
id = nil
@lock.lock
begin
if (id = next_stream_id)
if (id = @free_promises.pop)
@promises[id] = promise
end
ensure
Expand Down Expand Up @@ -195,28 +196,37 @@ def encode_frame
def receive_data(data)
@read_buffer << data
@current_frame = @frame_decoder.decode_frame(@read_buffer, @current_frame)
promise_responses = []
while @current_frame.complete?
id = @current_frame.stream_id
body = @current_frame.body
if id == -1
notify_event_listeners(@current_frame.body)
notify_event_listeners(body)
else
complete_request(id, @current_frame.body)
promise_responses << complete_request(id, body)
end
@current_frame = @frame_decoder.decode_frame(@read_buffer)
end
unless promise_responses.empty?
flush_request_queue
promise_responses.each do |(promise, response)|
unless promise.timed_out?
promise.fulfill(response)
end
end
end
end

def notify_event_listeners(event_response)
event_listeners = nil
@lock.lock
begin
event_listeners = @event_listeners
return if event_listeners.empty?
ensure
@lock.unlock
end
event_listeners.each do |listener|
listener.call(@current_frame.body) rescue nil
listener.call(event_response) rescue nil
end
end

Expand All @@ -226,16 +236,14 @@ def complete_request(id, response)
begin
promise = @promises[id]
@promises[id] = nil
@free_promises << id
ensure
@lock.unlock
end
if response.is_a?(Protocol::SetKeyspaceResultResponse)
@keyspace = response.keyspace
end
flush_request_queue
unless promise.timed_out?
promise.fulfill(response)
end
[promise, response]
end

def flush_request_queue
Expand All @@ -253,9 +261,10 @@ def flush_request_queue
frame = nil
@lock.lock
begin
if @request_queue_out.any? && (id = next_stream_id)
if @request_queue_out.any? && (id = @free_promises.pop)
promise = @request_queue_out.shift
if promise.timed_out?
@free_promises << id
next
else
frame = promise.frame
Expand All @@ -282,6 +291,7 @@ def socket_closed(cause)
promises_to_fail.concat(@request_queue_in)
promises_to_fail.concat(@request_queue_out)
@promises.fill(nil)
@free_promises = (0...@promises.size).to_a
@request_queue_in.clear
@request_queue_out.clear
end
Expand All @@ -294,14 +304,6 @@ def socket_closed(cause)
@closed_promise.fulfill
end
end

def next_stream_id
if (stream_id = @promises.index(nil))
stream_id
else
nil
end
end
end
end
end
83 changes: 60 additions & 23 deletions spec/cql/protocol/cql_protocol_handler_spec.rb
Expand Up @@ -69,20 +69,40 @@ module Protocol

describe '#send_request' do
before do
connection.stub(:write).and_yield(buffer)
connection.stub(:write) do |*args, &block|
if block
block.call(buffer)
else
buffer.append(*args)
end
end
connection.stub(:closed?).and_return(false)
connection.stub(:connected?).and_return(true)
end

it 'encodes a request frame and writes to the socket handler' do
protocol_handler.send_request(request)
buffer.to_s.should == [1, 0, 0, 5, 0].pack('C4N')
version, flags, stream_id, opcode, size, body = *buffer.to_s.unpack('C4Na*')
version.should eq(1)
flags.should eq(0)
stream_id.should be_between(0, 127)
opcode.should eq(5)
size.should eq(0)
body.should be_empty
end

it 'encodes a frame with the next available stream ID' do
protocol_handler.send_request(request)
protocol_handler.send_request(request)
buffer.to_s.should == [1, 0, 0, 5, 0].pack('C4N') + [1, 0, 1, 5, 0].pack('C4N')
version1, flags1, stream_id1, opcode1, size1,
version2, flags2, stream_id2, opcode2, size2, body2 = *buffer.to_s.unpack('C4NC4Na*')
version2.should eq(version1)
flags2.should eq(flags1)
stream_id2.should be_between(0, 127)
stream_id2.should_not eq(stream_id1)
opcode2.should eq(opcode1)
size2.should eq(size1)
body2.should be_empty
end

it 'returns a future' do
Expand All @@ -91,34 +111,40 @@ module Protocol

it 'succeeds the future when it receives a response frame with the corresponding stream ID' do
3.times { protocol_handler.send_request(request) }
buffer.discard(buffer.size)
future = protocol_handler.send_request(request)
connection.data_listener.call([0x81, 0, 3, 2, 0].pack('C4N'))
stream_id = buffer.to_s.getbyte(2)
connection.data_listener.call([0x81, 0, stream_id, 2, 0].pack('C4N'))
await(0.1) { future.resolved? }
end

it 'handles multiple response frames in the same data packet' do
futures = Array.new(4) { protocol_handler.send_request(request) }
connection.data_listener.call([0x81, 0, 2, 2, 0].pack('C4N') + [0x81, 0, 3, 2, 0].pack('C4N'))
stream_ids = buffer.to_s.unpack('C4NC4NC4NC4N').values_at(2, 7, 12, 17)
connection.data_listener.call([0x81, 0, stream_ids[2], 2, 0].pack('C4N') + [0x81, 0, stream_ids[3], 2, 0].pack('C4N'))
await(0.1) { futures[2].resolved? && futures[3].resolved? }
end

it 'queues the request when there are too many in flight, sending it as soon as a stream is available' do
connection.stub(:write)
futures = Array.new(130) { protocol_handler.send_request(request) }
128.times { |i| connection.data_listener.call([0x81, 0, i, 2, 0].pack('C4N')) }
futures[127].should be_resolved
futures[128].should_not be_resolved
2.times { |i| connection.data_listener.call([0x81, 0, i, 2, 0].pack('C4N')) }
futures[128].should be_resolved
futures = Array.new(128) { protocol_handler.send_request(request) }
buffer.discard(buffer.size)
delayed = Array.new(2) { protocol_handler.send_request(request) }
futures.size.times { |i| connection.data_listener.call([0x81, 0, i, 2, 0].pack('C4N')) }
futures.each { |future| future.should be_resolved }
delayed.each { |future| future.should_not be_resolved }
stream_ids = buffer.to_s.unpack('C4NC4N').values_at(2, 7)
stream_ids.each { |id| connection.data_listener.call([0x81, 0, id, 2, 0].pack('C4N')) }
delayed.each { |future| future.should be_resolved }
end

it 'flushes the request queue before it resolves the future of the just completed request' do
connection.stub(:write)
futures = Array.new(130) { protocol_handler.send_request(request) }
f = futures[0].map do
f = protocol_handler.send_request(request)
stream_id = buffer.to_s.getbyte(2)
129.times { protocol_handler.send_request(request) }
f = f.map do
connection.should have_received(:write).exactly(129).times
end
connection.data_listener.call([0x81, 0, 0, 2, 0].pack('C4N'))
connection.data_listener.call([0x81, 0, stream_id, 2, 0].pack('C4N'))
f.value
end

Expand All @@ -142,7 +168,13 @@ module Protocol

it 'compresses request frames' do
protocol_handler.send_request(request)
buffer.to_s.should == [1, 1, 0, 9, 18].pack('C4N') + 'FAKECOMPRESSEDBODY'
version, flags, stream_id, opcode, size, body = *buffer.to_s.unpack('C4Na*')
version.should eq(1)
flags.should eq(1)
stream_id.should be_between(0, 127)
opcode.should eq(9)
size.should eq(18)
body.should eq('FAKECOMPRESSEDBODY')
end

it 'compresses queued request frames' do
Expand All @@ -155,8 +187,10 @@ module Protocol
compressor.stub(:decompress).with('FAKECOMPRESSEDBODY').and_return("\x00\x00\x00\x04" + "\x00\x10" + id + "\x00\x00\x00\x01\x00\x00\x00\x01\x00\ncql_rb_911\x00\x05users\x00\tuser_name\x00\r")
f1 = protocol_handler.send_request(request)
f2 = protocol_handler.send_request(request)
connection.data_listener.call("\x81\x01\x00\x08\x00\x00\x00\x12FAKECOMPRESSEDBODY")
connection.data_listener.call("\x81\x01\x01\x08\x00\x00\x00\x12FAKECOMPRESSEDBODY")
stream_ids = buffer.to_s.unpack('C4Na18C4N').values_at(2, 8)
stream_ids.each do |id|
connection.data_listener.call([0x81, 1, id, 8, 18, 'FAKECOMPRESSEDBODY'].pack('C4Na*'))
end
f1.value.should == Protocol::PreparedResultResponse.new(id, [["cql_rb_911", "users", "user_name", :varchar]], nil, nil)
f2.value.should == Protocol::PreparedResultResponse.new(id, [["cql_rb_911", "users", "user_name", :varchar]], nil, nil)
end
Expand Down Expand Up @@ -189,7 +223,8 @@ module Protocol
it 'fails all requests with ConnectionClosedError if there is no specific error' do
protocol_handler.send_request(request)
future = protocol_handler.send_request(request)
connection.data_listener.call([0x81, 0, 0, 2, 0].pack('C4N'))
stream_id = buffer.to_s.getbyte(2)
connection.data_listener.call([0x81, 0, stream_id, 2, 0].pack('C4N'))
connection.closed_listener.call(nil)
begin
future.value
Expand Down Expand Up @@ -234,8 +269,9 @@ module Protocol

it 'does not attempt to fulfill the promise when the request has already timed out' do
f = protocol_handler.send_request(request, 3)
stream_id = buffer.to_s.getbyte(2)
timer_promise.fulfill
expect { connection.data_listener.call([0x81, 0, 0, 2, 0].pack('C4N')) }.to_not raise_error
expect { connection.data_listener.call([0x81, 0, stream_id, 2, 0].pack('C4N')) }.to_not raise_error
end

it 'never sends a request when it has already timed out' do
Expand Down Expand Up @@ -300,7 +336,7 @@ module Protocol
before do
connection.stub(:closed?).and_return(false)
connection.stub(:connected?).and_return(true)
connection.stub(:write)
connection.stub(:write).and_yield(buffer)
end

it 'is not in a keyspace initially' do
Expand All @@ -309,7 +345,8 @@ module Protocol

it 'registers the keyspace it has changed to' do
f = protocol_handler.send_request(Protocol::QueryRequest.new('USE hello', nil, nil, :one))
connection.data_listener.call([0x81, 0, 0, 8, 4 + 2 + 5, 3, 5].pack('C4N2n') + 'hello')
stream_id = buffer.to_s.getbyte(2)
connection.data_listener.call([0x81, 0, stream_id, 8, 4 + 2 + 5, 3, 5].pack('C4N2n') + 'hello')
f.value
protocol_handler.keyspace.should == 'hello'
end
Expand Down

0 comments on commit 5eb4b3a

Please sign in to comment.