Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

RCBCEM-1: Handle not-my-vbucket repsonses with a retry

  • Loading branch information...
commit ea08ee3215be36d40a1738fafc6f725c60302a66 1 parent 73944ad
@avsej authored
Showing with 52 additions and 8 deletions.
  1. +41 −4 lib/em-couchbase/client.rb
  2. +11 −4 lib/em-couchbase/node.rb
View
45 lib/em-couchbase/client.rb
@@ -30,7 +30,8 @@ class Client
def initialize
@opaque = 0
@nodes = []
- @handlers = {}
+ @packets = {}
+ @upgrade_queue = EM::Queue.new
end
# @param options [Hash]
@@ -51,14 +52,50 @@ def connect(options = {})
nodes[ii] = Node.connect(options)
end
end
+ do_retry = lambda do |packet|
+ opaque, key, handler, raw = packet.values_at(:opaque, :key, :handler, :raw)
+ register_handler(opaque, key, handler)
+ vbucket, node = locate(key)
+ raw[6...7] = [vbucket].pack("n")
+ register_packet(opaque, raw)
+ node.callback do
+ node.send_data(raw)
+ end
+ @upgrade_queue.pop(&do_retry)
+ end
+ @upgrade_queue.pop(&do_retry)
succeed
end
@config_listener.listen(options)
self
end
+ def register_packet(opaque, packet)
+ if packet.respond_to?(:force_encoding)
+ packet.force_encoding(Encoding::BINARY)
+ end
+ (@packets[opaque] ||= {})[:raw] = packet
+ end
+
+ def register_handler(opaque, key, handler)
+ packet = (@packets[opaque] ||= {})
+ packet[:key] = key
+ packet[:handler] = handler
+ end
+
+ def retry(reason, opaque)
+ packet = @packets.delete(opaque)
+ if packet
+ case reason
+ when :not_my_vbucket
+ @upgrade_queue.push([opaque, packet])
+ end
+ end
+ end
+
def run_callback(opaque, result)
- key, handler = @handlers.delete(opaque)
+ packet = @packets.delete(opaque)
+ key, handler = packet.values_at(:key, :handler)
if handler.respond_to?(:call)
result.key = key
handler.call(result)
@@ -77,7 +114,7 @@ def locate(key)
def set(key, val, options = {}, &block)
callback do
opaque = opaque_inc
- @handlers[opaque] = [key, block]
+ register_handler(opaque, key, block)
vbucket, node = locate(key)
node.callback do
node.set(opaque, vbucket, key, val, options)
@@ -92,7 +129,7 @@ def get(*keys, &block)
end
groups = keys.inject({}) do |acc, key|
opaque = opaque_inc
- @handlers[opaque] = [key, block]
+ register_handler(opaque, key, block)
vbucket, node = locate(key)
acc[node] ||= []
acc[node] << [opaque, vbucket, key]
View
15 lib/em-couchbase/node.rb
@@ -49,10 +49,14 @@ def self.connect(options)
def receive_data(data)
@data << data
Packet.parse(@data) do |op, opaque, result|
- if op == :sasl_auth
- raise result.error unless result.success?
+ if result.error.class == Error::NotMyVbucket
+ client.retry(:not_my_vbucket, opaque)
else
- client.run_callback(opaque, result)
+ if op == :sasl_auth
+ raise result.error unless result.success?
+ else
+ client.run_callback(opaque, result)
+ end
end
end
end
@@ -63,6 +67,7 @@ def set(opaque, vbucket, key, value, options = {})
options[:flags],
options[:expiration],
options[:cas])
+ client.register_packet(opaque, packet)
send_data(packet)
end
@@ -72,7 +77,9 @@ def set(opaque, vbucket, key, value, options = {})
def get(tuples, options = {})
packets = ""
tuples.each do |opaque, vbucket, key|
- packets << Packet.build(opaque, vbucket, :get, key)
+ packet = Packet.build(opaque, vbucket, :get, key)
+ client.register_packet(opaque, packet)
+ packets << packet
end
send_data(packets)
end
Please sign in to comment.
Something went wrong with that request. Please try again.