Skip to content

Commit

Permalink
Refactor sidekiq#1984
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Oct 6, 2014
1 parent 24c6615 commit d54dc1c
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 85 deletions.
39 changes: 30 additions & 9 deletions lib/sidekiq/api.rb
Expand Up @@ -545,32 +545,53 @@ def retry_all
class ProcessSet
include Enumerable

def initialize(clean_plz=true)
self.class.cleanup if clean_plz
end

# Cleans up dead processes recorded in Redis.
# Returns the number of processes cleaned.
def self.cleanup
count = 0
Sidekiq.redis do |conn|
procs = conn.smembers('processes').sort
heartbeats = conn.pipelined do
procs.each do |key|
conn.hget(key, 'info')
end
end

# the hash named key has an expiry of 60 seconds.
# if it's not found, that means the process has not reported
# in to Redis and probably died.
to_prune = []
heartbeats.each_with_index do |beat, i|
to_prune << procs[i] if beat.nil?
end
count = conn.srem('processes', to_prune) unless to_prune.empty?
end
count
end

def each(&block)
procs = Sidekiq.redis { |conn| conn.smembers('processes') }
procs = Sidekiq.redis { |conn| conn.smembers('processes') }.sort

to_prune = []
sorted = procs.sort
Sidekiq.redis do |conn|
# We're making a tradeoff here between consuming more memory instead of
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
# you'll be happier this way
result = conn.pipelined do
sorted.each do |key|
procs.each do |key|
conn.hmget(key, 'info', 'busy', 'beat')
end
end

result.each_with_index do |(info, busy, at_s), i|
# the hash named key has an expiry of 60 seconds.
# if it's not found, that means the process has not reported
# in to Redis and probably died.
(to_prune << sorted[i]; next) if info.nil?
hash = Sidekiq.load_json(info)
yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f))
end
end

Sidekiq.redis {|conn| conn.srem('processes', to_prune) } unless to_prune.empty?
nil
end

Expand Down
3 changes: 2 additions & 1 deletion lib/sidekiq/scheduled.rb
@@ -1,6 +1,7 @@
require 'sidekiq'
require 'sidekiq/util'
require 'sidekiq/actor'
require 'sidekiq/api'

module Sidekiq
module Scheduled
Expand Down Expand Up @@ -75,7 +76,7 @@ def poll(first_time=false)
# We only do this if poll_interval is unset (the default).
def poll_interval
Sidekiq.options[:poll_interval] ||= begin
cleanup_dead_process_records
Sidekiq::ProcessSet.cleanup
pcount = Sidekiq.redis {|c| c.scard('processes') } || 1
pcount * 15
end
Expand Down
16 changes: 0 additions & 16 deletions lib/sidekiq/util.rb
Expand Up @@ -44,21 +44,5 @@ def fire_event(event)
end
end

# Cleans up dead processes recorded in Redis.
def cleanup_dead_process_records
Sidekiq.redis do |conn|
procs = conn.smembers('processes').sort
heartbeats = conn.pipelined do
procs.each do |key|
conn.hget(key, 'beat')
end
end

heartbeats.each_with_index do |beat, i|
conn.srem('processes', procs[i]) if beat.nil?
end
end
end

end
end
2 changes: 1 addition & 1 deletion test/test_api.rb
Expand Up @@ -469,7 +469,7 @@ class ApiWorker
end

ps = Sidekiq::ProcessSet.new
assert_equal 3, ps.size
assert_equal 1, ps.size
assert_equal 1, ps.to_a.size
end

Expand Down
58 changes: 0 additions & 58 deletions test/test_util.rb

This file was deleted.

0 comments on commit d54dc1c

Please sign in to comment.