Permalink
Fetching contributors…
Cannot retrieve contributors at this time
332 lines (284 sloc) 7.62 KB
module EventMachine
module Protocols
# Implements the Memcache protocol (http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt).
# Requires memcached >= 1.2.4 w/ noreply support
#
# == Usage example
#
# EM.run{
# cache = EM::P::Memcache.connect 'localhost', 11211
#
# cache.set :a, 'hello'
# cache.set :b, 'hi'
# cache.set :c, 'how are you?'
# cache.set :d, ''
#
# cache.get(:a){ |v| p v }
# cache.get_hash(:a, :b, :c, :d){ |v| p v }
# cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }
#
# cache.get(:a,:z,:b,:y,:d){ |a,z,b,y,d| p [a,z,b,y,d] }
#
# cache.get(:missing){ |m| p [:missing=, m] }
# cache.set(:missing, 'abc'){ p :stored }
# cache.get(:missing){ |m| p [:missing=, m] }
# cache.del(:missing){ p :deleted }
# cache.get(:missing){ |m| p [:missing=, m] }
# }
#
module Memcache
include EM::Deferrable
##
# constants
unless defined? Cempty
# @private
Cstored = 'STORED'.freeze
# @private
Cend = 'END'.freeze
# @private
Cdeleted = 'DELETED'.freeze
# @private
Cunknown = 'NOT_FOUND'.freeze
# @private
Cerror = 'ERROR'.freeze
# @private
Cempty = ''.freeze
# @private
Cdelimiter = "\r\n".freeze
end
##
# commands
# Get the value associated with one or multiple keys
#
# cache.get(:a){ |v| p v }
# cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }
#
def get *keys
raise ArgumentError unless block_given?
callback{
keys = keys.map{|k| k.to_s.gsub(/\s/,'_') }
send_data "get #{keys.join(' ')}\r\n"
@get_cbs << [keys, proc{ |values|
yield *keys.map{ |k| values[k] }
}]
}
end
# Set the value for a given key
#
# cache.set :a, 'hello'
# cache.set(:missing, 'abc'){ puts "stored the value!" }
#
def set key, val, exptime = 0, &cb
callback{
val = val.to_s
send_cmd :set, key, 0, exptime, val.respond_to?(:bytesize) ? val.bytesize : val.size, !block_given?
send_data val
send_data Cdelimiter
@set_cbs << cb if cb
}
end
# Gets multiple values as a hash
#
# cache.get_hash(:a, :b, :c, :d){ |h| puts h[:a] }
#
def get_hash *keys
raise ArgumentError unless block_given?
get *keys do |*values|
yield keys.inject({}){ |hash, k| hash.update k => values[keys.index(k)] }
end
end
# Delete the value associated with a key
#
# cache.del :a
# cache.del(:b){ puts "deleted the value!" }
#
def delete key, expires = 0, &cb
callback{
send_data "delete #{key} #{expires}#{cb ? '' : ' noreply'}\r\n"
@del_cbs << cb if cb
}
end
alias del delete
# Connect to a memcached server (must support NOREPLY, memcached >= 1.2.4)
def self.connect host = 'localhost', port = 11211
EM.connect host, port, self, host, port
end
def send_cmd cmd, key, flags = 0, exptime = 0, bytes = 0, noreply = false
send_data "#{cmd} #{key} #{flags} #{exptime} #{bytes}#{noreply ? ' noreply' : ''}\r\n"
end
private :send_cmd
##
# errors
# @private
class ParserError < StandardError
end
##
# em hooks
# @private
def initialize host, port = 11211
@host, @port = host, port
end
# @private
def connection_completed
@get_cbs = []
@set_cbs = []
@del_cbs = []
@values = {}
@reconnecting = false
@connected = true
succeed
# set_delimiter "\r\n"
# set_line_mode
end
#--
# 19Feb09 Switched to a custom parser, LineText2 is recursive and can cause
# stack overflows when there is too much data.
# include EM::P::LineText2
# @private
def receive_data data
(@buffer||='') << data
while index = @buffer.index(Cdelimiter)
begin
line = @buffer.slice!(0,index+2)
process_cmd line
rescue ParserError
@buffer[0...0] = line
break
end
end
end
#--
# def receive_line line
# @private
def process_cmd line
case line.strip
when /^VALUE\s+(.+?)\s+(\d+)\s+(\d+)/ # VALUE <key> <flags> <bytes>
bytes = Integer($3)
# set_binary_mode bytes+2
# @cur_key = $1
if @buffer.size >= bytes + 2
@values[$1] = @buffer.slice!(0,bytes)
@buffer.slice!(0,2) # \r\n
else
raise ParserError
end
when Cend # END
if entry = @get_cbs.shift
keys, cb = entry
cb.call(@values)
end
@values = {}
when Cstored # STORED
if cb = @set_cbs.shift
cb.call(true)
end
when Cdeleted # DELETED
if cb = @del_cbs.shift
cb.call(true)
end
when Cunknown # NOT_FOUND
if cb = @del_cbs.shift
cb.call(false)
end
else
p [:MEMCACHE_UNKNOWN, line]
end
end
#--
# def receive_binary_data data
# @values[@cur_key] = data[0..-3]
# end
# @private
def unbind
if @connected or @reconnecting
EM.add_timer(1){ reconnect @host, @port }
@connected = false
@reconnecting = true
@deferred_status = nil
else
raise 'Unable to connect to memcached server'
end
end
end
end
end
if __FILE__ == $0
# ruby -I ext:lib -r eventmachine -rubygems lib/protocols/memcache.rb
require 'em/spec'
# @private
class TestConnection
include EM::P::Memcache
def send_data data
sent_data << data
end
def sent_data
@sent_data ||= ''
end
def initialize
connection_completed
end
end
EM.describe EM::Protocols::Memcache do
before{
@c = TestConnection.new
}
should 'send get requests' do
@c.get('a'){}
@c.sent_data.should == "get a\r\n"
done
end
should 'send set requests' do
@c.set('a', 1){}
@c.sent_data.should == "set a 0 0 1\r\n1\r\n"
done
end
should 'use noreply on set without block' do
@c.set('a', 1)
@c.sent_data.should == "set a 0 0 1 noreply\r\n1\r\n"
done
end
should 'send delete requests' do
@c.del('a')
@c.sent_data.should == "delete a 0 noreply\r\n"
done
end
should 'work when get returns no values' do
@c.get('a'){ |a|
a.should.be.nil
done
}
@c.receive_data "END\r\n"
end
should 'invoke block on set' do
@c.set('a', 1){
done
}
@c.receive_data "STORED\r\n"
end
should 'invoke block on delete' do
@c.delete('a'){ |found|
found.should.be.false
}
@c.delete('b'){ |found|
found.should.be.true
done
}
@c.receive_data "NOT_FOUND\r\n"
@c.receive_data "DELETED\r\n"
end
should 'parse split responses' do
@c.get('a'){ |a|
a.should == 'abc'
done
}
@c.receive_data "VAL"
@c.receive_data "UE a 0 "
@c.receive_data "3\r\n"
@c.receive_data "ab"
@c.receive_data "c"
@c.receive_data "\r\n"
@c.receive_data "EN"
@c.receive_data "D\r\n"
end
end
end