Skip to content

Commit

Permalink
[feature] Add bulk_operations invalidation limit
Browse files Browse the repository at this point in the history
Pull Request Branch: donaldong/feature_add_bulk_operat97f3f74
Pull Request Link: #31
  • Loading branch information
donaldong committed Feb 11, 2021
1 parent 5bb1f32 commit 2c80ca1
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 31 deletions.
6 changes: 4 additions & 2 deletions lib/redis_memo/memoize_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ def self.invalidate_all(model_class)
end
end

def self.invalidate(record)
RedisMemo::Memoizable.invalidate(to_memos(record))
def self.invalidate(*records)
RedisMemo::Memoizable.invalidate(
records.map { |record| to_memos(record) }.flatten,
)
end

def self.to_memos(record)
Expand Down
89 changes: 64 additions & 25 deletions lib/redis_memo/memoize_query/invalidation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ def self.rewrite_import_method(model_class, method_name)
define_method method_name do |*args, &blk|
options = args.last.is_a?(Hash) ? args.last : {}
records = args[args.last.is_a?(Hash) ? -2 : -1]
unique_by = options[:on_duplicate_key_update]
if unique_by.is_a?(Hash)
unique_by = unique_by[:columns]
columns_to_update = options[:on_duplicate_key_update]
if columns_to_update.is_a?(Hash)
columns_to_update = columns_to_update[:columns]
end

if records.last.is_a?(Hash)
Expand All @@ -123,50 +123,89 @@ def self.rewrite_import_method(model_class, method_name)
# - default values filled by the database
# - updates on conflict conditions
records_to_invalidate =
if unique_by
if columns_to_update
RedisMemo::MemoizeQuery::Invalidation.send(
:select_by_uniq_index,
:select_by_columns,
records,
unique_by,
columns_to_update,
)
else
[]
end

result = send(:"#{method_name}_without_redis_memo_invalidation", *args, &blk)

records_to_invalidate += RedisMemo.without_memo do
# Not all databases support "RETURNING", which is useful when
# invaldating records after bulk creation
model_class.where(model_class.primary_key => result.ids).to_a
# Offload the records to invalidate while selecting the next set of
# records to invalidate
case records_to_invalidate
when Array
RedisMemo::MemoizeQuery.invalidate(*records_to_invalidate) unless records_to_invalidate.empty?

RedisMemo::MemoizeQuery.invalidate(*RedisMemo::MemoizeQuery::Invalidation.send(
:select_by_id,
model_class,
# Not all databases support "RETURNING", which is useful when
# invaldating records after bulk creation
result.ids,
))
else
RedisMemo::MemoizeQuery.invalidate_all(model_class)
end

memos_to_invalidate = records_to_invalidate.map do |record|
RedisMemo::MemoizeQuery.to_memos(record)
end
RedisMemo::Memoizable.invalidate(memos_to_invalidate.flatten)

result
end
end
end

def self.select_by_uniq_index(records, unique_by)
def self.select_by_columns(records, columns_to_update)
model_class = records.first.class
or_chain = nil

records.each do |record|
conditions = {}
unique_by.each do |column|
conditions[column] = record.send(column)
columns_to_select = columns_to_update & RedisMemo::MemoizeQuery
.memoized_columns(model_class)
.to_a.flatten.uniq

# Nothing to invalidate here
return [] if columns_to_select.empty?

RedisMemo::Tracer.trace(
'redis_memo.memoize_query.invalidation',
"#{__method__}##{model_class.name}",
) do
records.each do |record|
conditions = {}
columns_to_select.each do |column|
conditions[column] = record.send(column)
end
if or_chain
or_chain = or_chain.or(model_class.where(conditions))
else
or_chain = model_class.where(conditions)
end
end
if or_chain
or_chain = or_chain.or(model_class.where(conditions))

record_count = RedisMemo.without_memo { or_chain.count }
if record_count > bulk_operations_invalidation_limit
nil
else
or_chain = model_class.where(conditions)
RedisMemo.without_memo { or_chain.to_a }
end
end
end

def self.select_by_id(model_class, ids)
RedisMemo::Tracer.trace(
'redis_memo.memoize_query.invalidation',
"#{__method__}##{model_class.name}",
) do
RedisMemo.without_memo do
model_class.where(model_class.primary_key => ids).to_a
end
end
end

RedisMemo.without_memo { or_chain.to_a }
def self.bulk_operations_invalidation_limit
ENV['REDIS_MEMO_BULK_OPERATIONS_INVALIDATION_LIMIT']&.to_i ||
RedisMemo::DefaultOptions.bulk_operations_invalidation_limit ||
10000
end
end
9 changes: 5 additions & 4 deletions lib/redis_memo/options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ def global_cache_key_version(&blk)
end

attr_accessor :async
attr_accessor :bulk_operations_invalidation_limit
attr_accessor :cache_out_of_date_handler
attr_accessor :cache_validation_sampler
attr_accessor :compress
attr_accessor :compress_threshold
attr_accessor :redis_error_handler
attr_accessor :expires_in
attr_accessor :cache_validation_sampler
attr_accessor :cache_out_of_date_handler
attr_accessor :connection_pool
attr_accessor :expires_in
attr_accessor :redis_error_handler

attr_writer :global_cache_key_version
attr_writer :redis
Expand Down
32 changes: 32 additions & 0 deletions spec/memoize_query_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,38 @@ def expect_to_eq_with_or_without_redis
end
end

it 'invalidates all records if there are too many records to invalidate' do
allow(RedisMemo::DefaultOptions).to receive(:bulk_operations_invalidation_limit).and_return(2)
expect(RedisMemo::MemoizeQuery).to receive(:invalidate_all).once.and_call_original

records = 3.times.map { Site.create!(a: 0) }
new_records = [Site.create!(a: 1), Site.create!(a: 1)]
new_records.each do |record|
record.a = 0
# it does not access this field for querying records to invalidate
expect(record).not_to receive(:not_memoized)
end

RedisMemo::Cache.with_local_cache do
records.each do |record|
Site.find(record.id)

# Cached locally
expect_not_to_use_redis do
Site.find(record.id)
end
end

Site.import(new_records, on_duplicate_key_update: [:a, :not_memoized])

records.each do |record|
expect_to_eq_with_or_without_redis do
Site.find(record.id)
end
end
end
end

it 'recalculates after update_all' do
expect(Site.a_count(0)).to eq(0)

Expand Down

0 comments on commit 2c80ca1

Please sign in to comment.