Skip to content

Commit

Permalink
Reduce number of reads in ruby reaper by caching some data
Browse files Browse the repository at this point in the history
  • Loading branch information
francesmcmullin committed Mar 7, 2022
1 parent 17e2ccc commit 82c439f
Showing 1 changed file with 36 additions and 3 deletions.
39 changes: 36 additions & 3 deletions lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def initialize(conn)
@digests = SidekiqUniqueJobs::Digests.new
@scheduled = Redis::SortedSet.new(SCHEDULE)
@retried = Redis::SortedSet.new(RETRY)
check_expiry!
end

#
Expand Down Expand Up @@ -137,7 +138,7 @@ def enqueued?(digest)

def active?(digest) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a
procs = processes(conn)
return false if procs.empty?

procs.sort.each do |key|
Expand Down Expand Up @@ -188,12 +189,30 @@ def considered_active?(time_f)
# @yield queues one at a time
#
def queues(conn, &block)
conn.sscan_each("queues", &block)
check_expiry!
@saved_sizes["queues"] ||= conn.scard("queues")
if @saved_sizes["queues"] < reaper_count # never cache anything bigger than the reaper count
@saved_elems["queues"] ||= conn.smembers("queues")
@saved_elems["queues"].each(&block)
else
conn.sscan_each("queues", &block)
end
end

def processes(conn)
check_expiry!
@saved_sizes["processes"] ||= conn.scard("processes")
if @saved_sizes["processes"] < reaper_count # never cache anything bigger than the reaper count
@saved_elems["processes"] ||= conn.sscan_each("processes").to_a
@saved_elems["processes"]
else
conn.sscan_each("processes").to_a
end
end

def entries(conn, queue, &block) # rubocop:disable Metrics/MethodLength
queue_key = "queue:#{queue}"
initial_size = conn.llen(queue_key)
initial_size = queue_len(conn, queue_key)
deleted_size = 0
page = 0
page_size = 50
Expand Down Expand Up @@ -225,6 +244,20 @@ def entries(conn, queue, &block) # rubocop:disable Metrics/MethodLength
def in_sorted_set?(key, digest)
conn.zscan_each(key, match: "*#{digest}*", count: 1).to_a.any?
end

def queue_len(conn, queue_key)
check_expiry!
@saved_sizes[queue_key] ||= conn.llen(queue_key)
end

# a very clumsy local cache for frequently accessed data
def check_expiry!
if (Time.now - (@last_expired || 0)).to_f > 0.5 # only cache for max 500ms
@saved_sizes = {}
@saved_elems = {}
@last_expired = Time.now
end
end
end
# rubocop:enable Metrics/ClassLength
end
Expand Down

0 comments on commit 82c439f

Please sign in to comment.