Skip to content

Commit

Permalink
Merge pull request sidekiq#2142 from ismaelga/patch-2
Browse files Browse the repository at this point in the history
Load stats with pipeline
  • Loading branch information
mperham committed Jan 17, 2015
2 parents 483a376 + 7cb9f42 commit 585bec7
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 48 deletions.
131 changes: 100 additions & 31 deletions lib/sidekiq/api.rb
Expand Up @@ -3,12 +3,89 @@

module Sidekiq
class Stats
def initialize
fetch_stats!
end

def processed
Sidekiq.redis { |conn| conn.get("stat:processed") }.to_i
stat :processed
end

def failed
Sidekiq.redis { |conn| conn.get("stat:failed") }.to_i
stat :failed
end

def scheduled_size
stat :scheduled_size
end

def retry_size
stat :retry_size
end

def dead_size
stat :dead_size
end

def enqueued
stat :enqueued
end

def processes_size
stat :processes_size
end

def workers_size
stat :workers_size
end

def default_queue_latency
stat :default_queue_latency
end

def fetch_stats!
pipe1_res = Sidekiq.redis do |conn|
conn.pipelined do
conn.get('stat:processed')
conn.get('stat:failed')
conn.zcard('schedule')
conn.zcard('retry')
conn.zcard('dead')
conn.scard('processes')
conn.lrange("queue:default", -1, -1)
conn.smembers('processes')
conn.smembers('queues'.freeze)
end
end

pipe2_res = Sidekiq.redis do |conn|
conn.pipelined do
pipe1_res[7].each {|key| conn.hget(key, 'busy') }
pipe1_res[8].each {|queue| conn.llen("queue:#{queue}") }
end
end

s = pipe1_res[7].size
workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+)
enqueued = pipe2_res[s..-1].map(&:to_i).inject(0, &:+)

default_queue_latency = if (entry = pipe1_res[6].first)
Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at']
else
0
end
@stats = {
processed: pipe1_res[0].to_i,
failed: pipe1_res[1].to_i,
scheduled_size: pipe1_res[2],
retry_size: pipe1_res[3],
dead_size: pipe1_res[4],
processes_size: pipe1_res[5],

default_queue_latency: default_queue_latency,
workers_size: workers_size,
enqueued: enqueued
}
end

def reset(*stats)
Expand All @@ -25,41 +102,33 @@ def reset(*stats)
end
end

def queues
Sidekiq.redis do |conn|
queues = conn.smembers('queues'.freeze)

lengths = conn.pipelined do
queues.each do |queue|
conn.llen("queue:#{queue}")
end
end

i = 0
array_of_arrays = queues.inject({}) do |memo, queue|
memo[queue] = lengths[i]
i += 1
memo
end.sort_by { |_, size| size }
private

Hash[array_of_arrays.reverse]
end
def stat(s)
@stats[s]
end

def enqueued
queues.values.inject(&:+) || 0
end
class Queues
def lengths
Sidekiq.redis do |conn|
queues = conn.smembers('queues'.freeze)

def scheduled_size
Sidekiq.redis {|c| c.zcard('schedule') }
end
lengths = conn.pipelined do
queues.each do |queue|
conn.llen("queue:#{queue}")
end
end

def retry_size
Sidekiq.redis {|c| c.zcard('retry') }
end
i = 0
array_of_arrays = queues.inject({}) do |memo, queue|
memo[queue] = lengths[i]
i += 1
memo
end.sort_by { |_, size| size }

def dead_size
Sidekiq.redis {|c| c.zcard('dead') }
Hash[array_of_arrays.reverse]
end
end
end

class History
Expand Down
23 changes: 11 additions & 12 deletions lib/sidekiq/web.rb
Expand Up @@ -210,32 +210,31 @@ def custom_tabs

get '/stats' do
sidekiq_stats = Sidekiq::Stats.new
queue = Sidekiq::Queue.new
redis_stats = redis_info.select { |k, v| REDIS_KEYS.include? k }

content_type :json
Sidekiq.dump_json(
sidekiq: {
processed: sidekiq_stats.processed,
failed: sidekiq_stats.failed,
busy: workers.size,
processes: processes.size,
enqueued: sidekiq_stats.enqueued,
scheduled: sidekiq_stats.scheduled_size,
retries: sidekiq_stats.retry_size,
dead: sidekiq_stats.dead_size,
default_latency: queue.latency
processed: sidekiq_stats.processed,
failed: sidekiq_stats.failed,
busy: sidekiq_stats.workers_size,
processes: sidekiq_stats.processes_size,
enqueued: sidekiq_stats.enqueued,
scheduled: sidekiq_stats.scheduled_size,
retries: sidekiq_stats.retry_size,
dead: sidekiq_stats.dead_size,
default_latency: sidekiq_stats.default_queue_latency
},
redis: redis_stats
)
end

get '/stats/queues' do
stats = Sidekiq::Stats.new
queue_stats = Sidekiq::Stats::Queues.new

content_type :json
Sidekiq.dump_json(
stats.queues
queue_stats.lengths
)
end

Expand Down
10 changes: 5 additions & 5 deletions test/test_api.rb
Expand Up @@ -77,8 +77,8 @@ class TestApi < Sidekiq::Test

describe "queues" do
it "is initially empty" do
s = Sidekiq::Stats.new
assert_equal 0, s.queues.size
s = Sidekiq::Stats::Queues.new
assert_equal 0, s.lengths.size
end

it "returns a hash of queue and size in order" do
Expand All @@ -90,9 +90,9 @@ class TestApi < Sidekiq::Test
conn.sadd 'queues', 'bar'
end

s = Sidekiq::Stats.new
assert_equal ({ "foo" => 1, "bar" => 3 }), s.queues
assert_equal "bar", s.queues.first.first
s = Sidekiq::Stats::Queues.new
assert_equal ({ "foo" => 1, "bar" => 3 }), s.lengths
assert_equal "bar", s.lengths.first.first
end
end

Expand Down

0 comments on commit 585bec7

Please sign in to comment.