From 6b277de4d593073af998749abc8b5dd8c87d96ee Mon Sep 17 00:00:00 2001 From: Brandon Hilkert Date: Tue, 17 Nov 2015 13:07:30 -0500 Subject: [PATCH] Revert "Revert "Merge pull request #2659 from mperham/queue-testing-api"" This reverts commit 56ebcfd161e1b32dca835399ce7fd877a73a79c3. --- 4.0-Upgrade.md | 10 ++++ Changes.md | 10 ++++ lib/sidekiq/testing.rb | 98 +++++++++++++++++++++++++++++++++------ test/test_testing_fake.rb | 65 +++++++++++++++++++++++++- 4 files changed, 168 insertions(+), 15 deletions(-) diff --git a/4.0-Upgrade.md b/4.0-Upgrade.md index 58945edac..1a35f0bb2 100644 --- a/4.0-Upgrade.md +++ b/4.0-Upgrade.md @@ -25,6 +25,16 @@ gem 'redis-namespace' `concurrency + 2` connections in your pool or Sidekiq will exit. When in doubt, let Sidekiq size the connection pool for you. +* There's a new testing API based off the `Sidekiq::Queues` namespace. All + assertions made against the Worker class still work as expected. +```ruby +assert_equal 0, Sidekiq::Queues["default"].size +HardWorker.perform_async("log") +assert_equal 1, Sidekiq::Queues["default"].size +assert_equal "log", Sidekiq::Queues["default"].first['args'][0] +Sidekiq::Queues.clear_all +``` + ## Upgrade First, make sure you are using Redis 2.8 or greater. Next: diff --git a/Changes.md b/Changes.md index 370a0afb3..fe7542c0a 100644 --- a/Changes.md +++ b/Changes.md @@ -13,6 +13,16 @@ and to remove dependencies. This has resulted in major speedups, as [detailed on my blog](http://www.mikeperham.com/2015/10/14/optimizing-sidekiq/). - See the [4.0 upgrade notes](4.0-Upgrade.md) for more detail. +- There's a new testing API based off the `Sidekiq::Queues` namespace. All + assertions made against the Worker class still work as expected. + [#2659, brandonhilkert] +```ruby +assert_equal 0, Sidekiq::Queues["default"].size +HardWorker.perform_async("log") +assert_equal 1, Sidekiq::Queues["default"].size +assert_equal "log", Sidekiq::Queues["default"].first['args'][0] +Sidekiq::Queues.clear_all +``` 3.5.3 ----------- diff --git a/lib/sidekiq/testing.rb b/lib/sidekiq/testing.rb index cfc253248..b08f1a373 100644 --- a/lib/sidekiq/testing.rb +++ b/lib/sidekiq/testing.rb @@ -68,15 +68,15 @@ class Client def raw_push(payloads) if Sidekiq::Testing.fake? payloads.each do |job| - job['class'].constantize.jobs << Sidekiq.load_json(Sidekiq.dump_json(job)) + Queues.jobs[job['queue']] << Sidekiq.load_json(Sidekiq.dump_json(job)) end true elsif Sidekiq::Testing.inline? payloads.each do |job| - job['jid'] ||= SecureRandom.hex(12) klass = job['class'].constantize - klass.jobs.unshift Sidekiq.load_json(Sidekiq.dump_json(job)) - klass.perform_one + job['id'] ||= SecureRandom.hex(12) + job_hash = Sidekiq.load_json(Sidekiq.dump_json(job)) + klass.process_job(job_hash) end true else @@ -85,6 +85,64 @@ def raw_push(payloads) end end + module Queues + ## + # The Queues class is only for testing the fake queue implementation. + # The data is structured as a hash with queue name as hash key and array + # of job data as the value. + # + # { + # "default"=>[ + # { + # "class"=>"TestTesting::QueueWorker", + # "args"=>[1, 2], + # "retry"=>true, + # "queue"=>"default", + # "jid"=>"abc5b065c5c4b27fc1102833", + # "created_at"=>1447445554.419934 + # } + # ] + # } + # + # Example: + # + # require 'sidekiq/testing' + # + # assert_equal 0, Sidekiq::Queues["default"].size + # HardWorker.perform_async(:something) + # assert_equal 1, Sidekiq::Queues["default"].size + # assert_equal :something, Sidekiq::Queues["default"].first['args'][0] + # + # You can also clear all workers' jobs: + # + # assert_equal 0, Sidekiq::Queues["default"].size + # HardWorker.perform_async(:something) + # Sidekiq::Queues.clear_all + # assert_equal 0, Sidekiq::Queues["default"].size + # + # This can be useful to make sure jobs don't linger between tests: + # + # RSpec.configure do |config| + # config.before(:each) do + # Sidekiq::Queues.clear_all + # end + # end + # + class << self + def [](queue) + jobs[queue] + end + + def jobs + @jobs ||= Hash.new { |hash, key| hash[key] = [] } + end + + def clear_all + jobs.clear + end + end + end + module Worker ## # The Sidekiq testing infrastructure overrides perform_async @@ -143,28 +201,36 @@ module Worker # module ClassMethods + # Queue for this worker + def queue + self.sidekiq_options["queue"] + end + # Jobs queued for this worker def jobs - Worker.jobs[self] + Queues.jobs[queue].select { |job| job["class"] == self.to_s } end # Clear all jobs for this worker def clear - jobs.clear + Queues.jobs[queue].clear end # Drain and run all jobs for this worker def drain - while job = jobs.shift do - process_job(job) + while jobs.any? + next_job = jobs.first + Queues.jobs[queue].delete_if { |job| job["jid"] == next_job["jid"] } + process_job(next_job) end end # Pop out a single job and perform it def perform_one raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty? - job = jobs.shift - process_job(job) + next_job = jobs.first + Queues.jobs[queue].delete_if { |job| job["jid"] == next_job["jid"] } + process_job(next_job) end def process_job(job) @@ -183,18 +249,22 @@ def execute_job(worker, args) class << self def jobs # :nodoc: - @jobs ||= Hash.new { |hash, key| hash[key] = [] } + Queues.jobs.values.flatten end # Clear all queued jobs across all workers def clear_all - jobs.clear + Queues.clear_all end # Drain all queued jobs across all workers def drain_all - until jobs.values.all?(&:empty?) do - jobs.keys.each(&:drain) + while jobs.any? + worker_classes = jobs.map { |job| job["class"] }.uniq + + worker_classes.each do |worker_class| + worker_class.constantize.drain + end end end end diff --git a/test/test_testing_fake.rb b/test/test_testing_fake.rb index 4084ea140..bffd29813 100644 --- a/test/test_testing_fake.rb +++ b/test/test_testing_fake.rb @@ -54,6 +54,7 @@ def bar(str) after do Sidekiq::Testing.disable! + Sidekiq::Queues.clear_all end it 'stubs the async call' do @@ -93,7 +94,7 @@ def self.foo(x) it 'stubs the enqueue_to call' do assert_equal 0, EnqueuedWorker.jobs.size assert Sidekiq::Client.enqueue_to('someq', EnqueuedWorker, 1, 2) - assert_equal 1, EnqueuedWorker.jobs.size + assert_equal 1, Sidekiq::Queues['someq'].size end it 'executes all stored jobs' do @@ -263,6 +264,68 @@ def perform it 'can execute a job' do DirectWorker.execute_job(DirectWorker.new, [2, 3]) end + end + + describe 'queue testing' do + before do + require 'sidekiq/testing' + Sidekiq::Testing.fake! + end + + after do + Sidekiq::Testing.disable! + Sidekiq::Queues.clear_all + end + class QueueWorker + include Sidekiq::Worker + def perform(a, b) + a + b + end + end + + class AltQueueWorker + include Sidekiq::Worker + sidekiq_options queue: :alt + def perform(a, b) + a + b + end + end + + it 'finds enqueued jobs' do + assert_equal 0, Sidekiq::Queues["default"].size + + QueueWorker.perform_async(1, 2) + QueueWorker.perform_async(1, 2) + AltQueueWorker.perform_async(1, 2) + + assert_equal 2, Sidekiq::Queues["default"].size + assert_equal [1, 2], Sidekiq::Queues["default"].first["args"] + + assert_equal 1, Sidekiq::Queues["alt"].size + end + + it 'clears out all queues' do + assert_equal 0, Sidekiq::Queues["default"].size + + QueueWorker.perform_async(1, 2) + QueueWorker.perform_async(1, 2) + AltQueueWorker.perform_async(1, 2) + + Sidekiq::Queues.clear_all + + assert_equal 0, Sidekiq::Queues["default"].size + assert_equal 0, Sidekiq::Queues["alt"].size + end + + it 'finds jobs enqueued by client' do + Sidekiq::Client.push( + 'class' => 'NonExistentWorker', + 'queue' => 'missing', + 'args' => [1] + ) + + assert_equal 1, Sidekiq::Queues["missing"].size + end end end