Skip to content

Commit

Permalink
Add support for consistent hashing, using the libketama algorithm.
Browse files Browse the repository at this point in the history
We build a weighted continuum of server mappings.  Each key hashes
to a value and we find the closest server in the continuum to that
value.  Now if we add a server, only 1/total keys will map to a
different server, dramatically reducing the impact of the addition.
  • Loading branch information
mperham committed Jan 14, 2009
1 parent 482bfe7 commit 7e07443
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 24 deletions.
99 changes: 78 additions & 21 deletions lib/memcache.rb
Expand Up @@ -5,6 +5,7 @@
require 'timeout'
require 'rubygems'
require 'zlib'
require 'digest/sha1'

##
# A Ruby client library for memcached.
Expand All @@ -18,7 +19,7 @@ class MemCache
##
# The version of MemCache you are using.

VERSION = '1.5.0.1'
VERSION = '1.5.0.7'

##
# Default options for the cache object.
Expand Down Expand Up @@ -128,7 +129,7 @@ def readonly?
# Set the servers that the requests will be distributed between. Entries
# can be either strings of the form "hostname:port" or
# "hostname:port:weight" or MemCache::Server objects.

#
def servers=(servers)
# Create the server objects.
@servers = Array(servers).collect do |server|
Expand All @@ -139,7 +140,7 @@ def servers=(servers)
weight ||= DEFAULT_WEIGHT
Server.new self, host, port, weight
when Server
if server.memcache.multithread != @multithread then
if server.multithread != @multithread then
raise ArgumentError, "can't mix threaded and non-threaded servers"
end
server
Expand All @@ -148,11 +149,10 @@ def servers=(servers)
end
end

# Create an array of server buckets for weight selection of servers.
@buckets = []
@servers.each do |server|
server.weight.times { @buckets.push(server) }
end
# There's no point in doing this if there's only one server
@continuum = create_continuum_for(@servers) if @servers.size > 1

@servers
end

##
Expand Down Expand Up @@ -443,6 +443,14 @@ def make_cache_key(key)
end
end

##
# Returns an interoperable hash value for +key+. (I think, docs are
# sketchy for down servers).

def hash_for(key)
Zlib.crc32(key)
end

##
# Pick a server to handle the request based on a hash of the key.

Expand All @@ -453,25 +461,17 @@ def get_server_for_key(key)
raise MemCacheError, "No servers available" if @servers.empty?
return @servers.first if @servers.length == 1

hkey = hash_for key
hkey = hash_for(key)

20.times do |try|
server = @buckets[hkey % @buckets.compact.size]
server = binary_search(@continuum, hkey) { |e| e.value }.server
return server if server.alive?
hkey += hash_for "#{try}#{key}"
hkey = hash_for "#{try}#{key}"
end

raise MemCacheError, "No servers available"
end

##
# Returns an interoperable hash value for +key+. (I think, docs are
# sketchy for down servers).

def hash_for(key)
(Zlib.crc32(key) >> 16) & 0x7fff
end

##
# Performs a raw decr for +cache_key+ from +server+. Returns nil if not
# found.
Expand Down Expand Up @@ -632,6 +632,41 @@ def raise_on_error_response!(response)
end
end

def create_continuum_for(servers)
total_weight = servers.inject(0) { |memo, srv| memo + srv.weight }
continuum = []

servers.each do |server|
entry_count_for(server, servers.size, total_weight).times do |idx|
hash = Digest::SHA1.hexdigest("#{server.host}:#{server.port}:#{idx}")
value = Integer("0x#{hash[0..7]}")
continuum << ContinuumEntry.new(value, server)
end
end

continuum.sort { |a, b| a.value <=> b.value }
end

def entry_count_for(server, total_servers, total_weight)
((total_servers * ContinuumEntry::POINTS_PER_SERVER * server.weight) / Float(total_weight)).floor
end

class ContinuumEntry
POINTS_PER_SERVER = 160 # this is the default in libmemcached

attr_reader :value
attr_reader :server

def initialize(val, srv)
@value = val
@server = srv
end

def inspect
"<#{value}, #{server.host}:#{server.port}>"
end
end

##
# This class represents a memcached server instance.

Expand Down Expand Up @@ -675,6 +710,8 @@ class Server

attr_reader :status

attr_reader :multithread

##
# Create a new MemCache::Server object for the memcached instance
# listening on the given host and port, weighted by the given weight.
Expand All @@ -683,12 +720,11 @@ def initialize(memcache, host, port = DEFAULT_PORT, weight = DEFAULT_WEIGHT)
raise ArgumentError, "No host specified" if host.nil? or host.empty?
raise ArgumentError, "No port specified" if port.nil? or port.to_i.zero?

@memcache = memcache
@host = host
@port = port.to_i
@weight = weight.to_i

@multithread = @memcache.multithread
@multithread = memcache.multithread
@mutex = Mutex.new

@sock = nil
Expand Down Expand Up @@ -779,5 +815,26 @@ def mark_dead(reason = "Unknown error")

class MemCacheError < RuntimeError; end


# Find the closest element in Array less than or equal to value.
def binary_search(ary, value, &block)
upper = ary.size - 1
lower = 0
idx = 0

result = while(lower <= upper) do
idx = (lower + upper) / 2
comp = block.call(ary[idx]) <=> value

if comp == 0
break idx
elsif comp > 0
upper = idx - 1
else
lower = idx + 1
end
end
result ? ary[result] : ary[upper]
end
end

35 changes: 32 additions & 3 deletions test/test_mem_cache.rb
@@ -1,6 +1,8 @@
# encoding: utf-8
require 'stringio'
require 'test/unit'
require 'rubygems'
require 'flexmock/test_unit'

$TESTING = true

Expand Down Expand Up @@ -62,6 +64,30 @@ def setup
@cache = MemCache.new 'localhost:1', :namespace => 'my_namespace'
end

def test_consistent_hashing
flexmock(MemCache::Server).new_instances.should_receive(:alive?).and_return(true)

# Setup a continuum of two servers
@cache.servers = ['mike1', 'mike2', 'mike3']

keys = []
1000.times do |idx|
keys << idx.to_s
end

before_continuum = keys.map {|key| @cache.get_server_for_key(key) }

@cache.servers = ['mike1', 'mike2', 'mike3', 'mike4']

after_continuum = keys.map {|key| @cache.get_server_for_key(key) }

cdiff = before_continuum.zip(after_continuum).find_all {|a| a[0].host == a[1].host }.size

# With continuum, we should see about 75% of the keys map to the same server
# With modulo, we would see about 25%.
assert cdiff > 700
end

def test_cache_get
server = util_setup_fake_server

Expand Down Expand Up @@ -212,7 +238,7 @@ def test_initialize_multiple_servers
assert_equal 'my_namespace', cache.namespace
assert_equal true, cache.readonly?
assert_equal false, cache.servers.empty?
assert !cache.instance_variable_get(:@buckets).empty?
assert !cache.instance_variable_get(:@continuum).empty?
end

def test_initialize_too_many_args
Expand Down Expand Up @@ -386,12 +412,15 @@ def test_get_server_for_key
def test_get_server_for_key_multiple
s1 = util_setup_server @cache, 'one.example.com', ''
s2 = util_setup_server @cache, 'two.example.com', ''
@cache.instance_variable_set :@servers, [s1, s2]
@cache.instance_variable_set :@buckets, [s1, s2]
@cache.servers = [s1, s2]

server = @cache.get_server_for_key 'keya'
assert_equal 'two.example.com', server.host
server = @cache.get_server_for_key 'keyb'
assert_equal 'two.example.com', server.host
server = @cache.get_server_for_key 'keyc'
assert_equal 'two.example.com', server.host
server = @cache.get_server_for_key 'keyd'
assert_equal 'one.example.com', server.host
end

Expand Down

0 comments on commit 7e07443

Please sign in to comment.