Skip to content

Commit

Permalink
Implement incr/decr commands
Browse files Browse the repository at this point in the history
  • Loading branch information
avsej committed Jul 23, 2012
1 parent ae0c46e commit 905c9e0
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 1 deletion.
9 changes: 8 additions & 1 deletion README.markdown
Expand Up @@ -13,5 +13,12 @@ Like any Deferrable eventmachine-based protocol implementation, using EM-Couchba
end
end
end
couchbase.incr "bar", :initial => 100500 do |response|
if response.success?
couchbase.get "bar" do |response|
puts response.inspect
EM.stop
end
end
end
end

22 changes: 22 additions & 0 deletions lib/em-couchbase/client.rb
Expand Up @@ -164,6 +164,28 @@ def get(*keys, &block)
end
end

def incr(key, options = {}, &block)
callback do
opaque = opaque_inc
register_handler(opaque, key, block)
vbucket, node = locate(key)
node.callback do
node.arithm(:incr, opaque, vbucket, key, options)
end
end
end

def decr(key, options = {}, &block)
callback do
opaque = opaque_inc
register_handler(opaque, key, block)
vbucket, node = locate(key)
node.callback do
node.arithm(:decr, opaque, vbucket, key, options)
end
end
end

protected

def unbind
Expand Down
14 changes: 14 additions & 0 deletions lib/em-couchbase/node.rb
Expand Up @@ -71,6 +71,20 @@ def set(opaque, vbucket, key, value, options = {})
send_data(packet)
end

def arithm(opcode, opaque, vbucket, key, options = {})
if options.is_a?(Fixnum)
options = {:delta => options}
end
packet = Packet.build(opaque, vbucket,
opcode, key,
options[:delta],
options[:initial],
options[:expiration],
options[:cas])
client.register_packet(opaque, packet)
send_data(packet)
end

# @param opaque [Fixnum]
# @param pairs [Array] array of tuples +[opaque, vbucket, key]+
# @param options [Hash]
Expand Down
31 changes: 31 additions & 0 deletions lib/em-couchbase/packet.rb
Expand Up @@ -117,6 +117,29 @@ def self.build(opaque, vbucket, opcode, *args)
0, # uint64_t cas
key
].pack("#{REQUEST_HEADER_FMT}a*")
when :incr, :decr
cmd_id = opcode == :incr ? CMD_INCREMENT : CMD_DECREMENT
key, delta, initial, expiration, cas = args.shift(5)
delta ||= 1
initial ||= 0
bodylen = key.size + 20
[
0x80, # uint8_t magic
cmd_id, # uint8_t opcode
key.size, # uint16_t keylen
20, # uint8_t extlen (delta + initial + expiration)
0, # uint8_t datatype
vbucket, # uint16_t vbucket
bodylen, # uint32_t bodylen
opaque || 0, # uint32_t opaque
cas || 0, # uint64_t cas
delta >> 32, # uint64_t
delta & 0xffffffff, #
initial >> 32, # uint64_t
initial & 0xffffffff, #
expiration || 0, # uint32_t
key
].pack("#{REQUEST_HEADER_FMT}NNNNNa*")
when :sasl_auth
mech, username, password = args.shift(3)
value = "\0#{username}\0#{password}"
Expand Down Expand Up @@ -185,6 +208,14 @@ def self.parse(data)
when CMD_GET
result.operation = :get
result.flags, _ = ext.unpack("N")
when CMD_INCREMENT
result.operation = :incr
hi, lo = result.value.unpack("NN")
result.value = hi << 32 | lo
when CMD_DECREMENT
result.operation = :decr
hi, lo = result.value.unpack("NN")
result.value = hi << 32 | lo
else
raise Couchbase::Error::UnknownCommand, header.inspect
end
Expand Down

0 comments on commit 905c9e0

Please sign in to comment.