Skip to content

Commit

Permalink
Add support for fill lock with lock wait to avoid thundering herd pro…
Browse files Browse the repository at this point in the history
…blem (#373)
  • Loading branch information
dylanahsmith committed Jan 15, 2021
1 parent d0452f1 commit c33861d
Show file tree
Hide file tree
Showing 16 changed files with 549 additions and 97 deletions.
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ inherit_from:

AllCops:
TargetRubyVersion: 2.4

Layout/BeginEndAlignment:
EnforcedStyleAlignWith: start_of_line
2 changes: 1 addition & 1 deletion dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ up:
- mysql-client@5.7:
or: [mysql@5.7]
conflicts: [mysql-connector-c, mysql, mysql-client]
- ruby: 2.6.5
- ruby: 2.7.2
- railgun
- bundler

Expand Down
9 changes: 7 additions & 2 deletions lib/identity_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class UnsupportedAssociationError < StandardError; end

class DerivedModelError < StandardError; end

class LockWaitTimeout < StandardError; end

mattr_accessor :cache_namespace
self.cache_namespace = "IDC:#{CACHE_VERSION}:"

Expand Down Expand Up @@ -141,10 +143,13 @@ def should_use_cache? # :nodoc:
#
# == Parameters
# +key+ A cache key string
# +cache_fetcher_options+ A hash of options to pass to the cache backend
#
def fetch(key)
def fetch(key, cache_fetcher_options = {})
if should_use_cache?
unmap_cached_nil_for(cache.fetch(key) { map_cached_nil_for(yield) })
unmap_cached_nil_for(cache.fetch(key, cache_fetcher_options) do
map_cached_nil_for(yield)
end)
else
yield
end
Expand Down
248 changes: 232 additions & 16 deletions lib/identity_cache/cache_fetcher.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,52 @@
# frozen_string_literal: true

require 'securerandom'

module IdentityCache
class CacheFetcher
attr_accessor :cache_backend

EMPTY_HASH = {}.freeze

class FillLock
FILL_LOCKED = :fill_locked
FAILED_CLIENT_ID = 'fill_failed'

class << self
def from_cache(marker, client_id, data_version)
raise ArgumentError unless marker == FILL_LOCKED
new(client_id: client_id, data_version: data_version)
end

def cache_value?(cache_value)
cache_value.is_a?(Array) && cache_value.length == 3 && cache_value.first == FILL_LOCKED
end
end

attr_reader :client_id, :data_version

def initialize(client_id:, data_version:)
@client_id = client_id
@data_version = data_version
end

def cache_value
[FILL_LOCKED, client_id, data_version]
end

def mark_failed
@client_id = FAILED_CLIENT_ID
end

def fill_failed?
@client_id == FAILED_CLIENT_ID
end

def ==(other)
self.class == other.class && client_id == other.client_id && data_version == other.data_version
end
end

def initialize(cache_backend)
@cache_backend = cache_backend
end
Expand All @@ -25,27 +69,198 @@ def fetch_multi(keys, &block)
results
end

def fetch(key)
result = nil
yielded = false
@cache_backend.cas(key) do |value|
yielded = true
unless IdentityCache::DELETED == value
result = value
break
def fetch(key, fill_lock_duration: nil, lock_wait_tries: 2)
if fill_lock_duration && IdentityCache.should_fill_cache?
fetch_with_fill_lock(key, fill_lock_duration, lock_wait_tries) do
yield
end
else
fetch_without_fill_lock(key) { yield }
end
end

private

def fetch_without_fill_lock(key)
data = nil
upsert(key) do |value|
value = nil if value == IdentityCache::DELETED || FillLock.cache_value?(value)
unless value.nil?
return value
end
result = yield
data = yield
break unless IdentityCache.should_fill_cache?
result
data
end
data
end

def fetch_with_fill_lock(key, fill_lock_duration, lock_wait_tries)
raise ArgumentError, 'fill_lock_duration must be greater than 0.0' unless fill_lock_duration > 0.0
raise ArgumentError, 'lock_wait_tries must be greater than 0' unless lock_wait_tries > 0
lock = nil
using_fallback_key = false
expiration_options = EMPTY_HASH
(lock_wait_tries + 2).times do # +2 is for first attempt and retry with fallback key
result = fetch_or_take_lock(key, old_lock: lock, **expiration_options)
case result
when FillLock
lock = result
if lock.client_id == client_id # have lock
data = begin
yield
rescue
mark_fill_failure_on_lock(key, expiration_options)
raise
end

if !fill_with_lock(key, data, lock, expiration_options) && !using_fallback_key
# fallback to storing data in the fallback key so it is available to clients waiting on the lock
expiration_options = fallback_key_expiration_options(fill_lock_duration)
@cache_backend.write(lock_fill_fallback_key(key, lock), data, expiration_options)
end
return data
else
raise LockWaitTimeout if lock_wait_tries <= 0
lock_wait_tries -= 1

# If fill failed in the other client, then it might be failing fast
# so avoid waiting the typical amount of time for a lock wait. The
# semian gem can be used to handle failing fast when the database is slow.
if lock.fill_failed?
return fetch_without_fill_lock(key) { yield }
end

# lock wait
sleep(fill_lock_duration)
# loop around to retry fetch_or_take_lock
end
when IdentityCache::DELETED # interrupted by cache invalidation
if using_fallback_key
raise "unexpected cache invalidation of versioned fallback key"
elsif lock
# Cache invalidated during lock wait, use a versioned fallback key
# to avoid further cache invalidation interruptions.
using_fallback_key = true
key = lock_fill_fallback_key(key, lock)
expiration_options = fallback_key_expiration_options(fill_lock_duration)
# loop around to retry with fallback key
else
# Cache invalidation prevented lock from being taken or read, so we don't
# have a data version to use to build a shared fallback key. In the future
# we could add the data version to the cache invalidation value so a fallback
# key could be used here. For now, we assume that a cache invalidation occuring
# just after the cache wasn't filled is more likely a sign of a key that is
# written more than read (which this cache isn't a good fit for), rather than
# a thundering herd or reads.
return yield
end
when nil # Errors talking to memcached
return yield
else # hit
return result
end
end
raise "unexpected number of loop iterations"
end

def mark_fill_failure_on_lock(key, expiration_options)
@cache_backend.cas(key, expiration_options) do |value|
break unless FillLock.cache_value?(value)
lock = FillLock.from_cache(*value)
break if lock.client_id != client_id
lock.mark_failed
lock.cache_value
end
end

def upsert(key, expiration_options = EMPTY_HASH)
yielded = false
upserted = @cache_backend.cas(key, expiration_options) do |value|
yielded = true
yield value
end
unless yielded
result = yield
add(key, result)
data = yield nil
upserted = add(key, data, expiration_options)
end
result
upserted
end

private
def fetch_or_take_lock(key, old_lock:, **expiration_options)
new_lock = nil
upserted = upsert(key, expiration_options) do |value|
if value.nil? || value == IdentityCache::DELETED
if old_lock # cache invalidated
return value
else
new_lock = FillLock.new(client_id: client_id, data_version: SecureRandom.uuid)
end
elsif FillLock.cache_value?(value)
fetched_lock = FillLock.from_cache(*value)
if old_lock == fetched_lock
# preserve data version since there hasn't been any cache invalidations
new_lock = FillLock.new(client_id: client_id, data_version: old_lock.data_version)
elsif old_lock && fetched_lock.data_version != old_lock.data_version
# Cache was invalidated, then another lock was taken during a lock wait.
# Treat it as any other cache invalidation, where the caller will switch
# to the fallback key.
return IdentityCache::DELETED
else
return fetched_lock
end
else # hit
return value
end
new_lock.cache_value # take lock
end

return new_lock if upserted

value = @cache_backend.read(key)
if FillLock.cache_value?(value)
FillLock.from_cache(*value)
else
value
end
end

def fill_with_lock(key, data, my_lock, expiration_options)
upserted = upsert(key, expiration_options) do |value|
return false if value.nil? || value == IdentityCache::DELETED
return true unless FillLock.cache_value?(value) # already filled
current_lock = FillLock.from_cache(*value)
if current_lock.data_version != my_lock.data_version
return false # invalidated then relocked
end
data
end

upserted
end

def lock_fill_fallback_key(key, lock)
"lock_fill:#{lock.data_version}:#{key}"
end

def fallback_key_expiration_options(fill_lock_duration)
# Override the default TTL for the fallback key lock since it won't be used for very long.
expires_in = fill_lock_duration * 2

# memcached uses integer number of seconds for TTL so round up to avoid having
# the cache store round down with `to_i`
expires_in = expires_in.ceil

# memcached TTL only gets part of the first second (https://github.com/memcached/memcached/issues/307),
# so increase TTL by 1 to compensate
expires_in += 1

{ expires_in: expires_in }
end

def client_id
@client_id ||= SecureRandom.uuid
end

def cas_multi(keys)
result = nil
Expand Down Expand Up @@ -81,8 +296,9 @@ def add_multi(keys)
result.each { |k, v| add(k, v) }
end

def add(key, value)
@cache_backend.write(key, value, unless_exist: true) if IdentityCache.should_fill_cache?
def add(key, value, expiration_options = EMPTY_HASH)
return false unless IdentityCache.should_fill_cache?
@cache_backend.write(key, value, { unless_exist: true, **expiration_options })
end
end
end
4 changes: 2 additions & 2 deletions lib/identity_cache/cache_key_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ class << self
# @param cache_fetcher [_CacheFetcher]
# @param db_key Reference to what to load from the database.
# @return The database value corresponding to the database key.
def load(cache_fetcher, db_key)
def load(cache_fetcher, db_key, cache_fetcher_options = {})
cache_key = cache_fetcher.cache_key(db_key)

db_value = nil

cache_value = IdentityCache.fetch(cache_key) do
cache_value = IdentityCache.fetch(cache_key, cache_fetcher_options) do
db_value = cache_fetcher.load_one_from_db(db_key)
cache_fetcher.cache_encode(db_value)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/identity_cache/cached/primary_index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ def initialize(model)
@model = model
end

def fetch(id)
def fetch(id, cache_fetcher_options)
id = cast_id(id)
return unless id
record = if model.should_use_cache?
object = CacheKeyLoader.load(self, id)
object = CacheKeyLoader.load(self, id, cache_fetcher_options)
if object && object.id != id
IdentityCache.logger.error(
<<~MSG.squish
Expand Down
5 changes: 4 additions & 1 deletion lib/identity_cache/fallback_fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def fetch_multi(keys)
results
end

def fetch(key)
def fetch(key, **cache_fetcher_options)
unless cache_fetcher_options.empty?
raise ArgumentError, "unsupported cache_fetcher options: #{cache_fetcher_options.keys.join(', ')}"
end
result = @cache_backend.read(key)
if result.nil?
result = yield
Expand Down
4 changes: 2 additions & 2 deletions lib/identity_cache/memoized_cache_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def delete(key)
end
end

def fetch(key)
def fetch(key, cache_fetcher_options = {})
memo_misses = 0
cache_misses = 0

Expand All @@ -78,7 +78,7 @@ def fetch(key)

value = fetch_memoized(key) do
memo_misses = 1
@cache_fetcher.fetch(key) do
@cache_fetcher.fetch(key, **cache_fetcher_options) do
cache_misses = 1
instrument_duration(payload, :resolve_miss_time) do
yield
Expand Down
Loading

0 comments on commit c33861d

Please sign in to comment.