Skip to content

Commit

Permalink
allow passing options to #delay to control queue, retry etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
elado committed Aug 18, 2012
1 parent 0559df1 commit 3bb3148
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 20 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -4,3 +4,5 @@ Gemfile.lock
dump.rdb dump.rdb
.rbx .rbx
coverage/ coverage/

/test/db/test.sqlite3
9 changes: 5 additions & 4 deletions lib/sidekiq/extensions/action_mailer.rb
Expand Up @@ -21,11 +21,12 @@ def perform(*msg)
end end


module ActionMailer module ActionMailer
def delay def delay(options={})
Proxy.new(DelayedMailer, self) Proxy.new(DelayedMailer, self, options)
end end
def delay_for(interval) def delay_for(interval, options={})
Proxy.new(DelayedMailer, self, Time.now.to_f + interval.to_f) options = options.reverse_merge(at: Time.now.to_f + interval.to_f)
delay(options)
end end
end end


Expand Down
9 changes: 5 additions & 4 deletions lib/sidekiq/extensions/active_record.rb
Expand Up @@ -28,11 +28,12 @@ def sidekiq_deserialize(string)
end end


module InstanceMethods module InstanceMethods
def delay def delay(options={})
Proxy.new(DelayedModel, self) Proxy.new(DelayedModel, self, options)
end end
def delay_for(interval) def delay_for(interval, options={})
Proxy.new(DelayedModel, self, Time.now.to_f + interval.to_f) options = options.reverse_merge(at: Time.now.to_f + interval.to_f)
delay(options)
end end


def sidekiq_serialize def sidekiq_serialize
Expand Down
9 changes: 5 additions & 4 deletions lib/sidekiq/extensions/class_methods.rb
Expand Up @@ -19,11 +19,12 @@ def perform(*msg)
end end


module Klass module Klass
def delay def delay(options={})
Proxy.new(DelayedClass, self) Proxy.new(DelayedClass, self, options)
end end
def delay_for(interval) def delay_for(interval, options={})
Proxy.new(DelayedClass, self, Time.now.to_f + interval.to_f) options = options.reverse_merge(at: Time.now.to_f + interval.to_f)
delay(options)
end end


def sidekiq_serialize def sidekiq_serialize
Expand Down
10 changes: 3 additions & 7 deletions lib/sidekiq/extensions/generic_proxy.rb
@@ -1,10 +1,10 @@
module Sidekiq module Sidekiq
module Extensions module Extensions
class Proxy < BasicObject class Proxy < BasicObject
def initialize(performable, target, at=nil) def initialize(performable, target, options={})
@performable = performable @performable = performable
@target = target @target = target
@at = at @options = options
end end


def method_missing(name, *args) def method_missing(name, *args)
Expand All @@ -15,11 +15,7 @@ def method_missing(name, *args)
# Ruby object. # Ruby object.
serialized_args = ArgsSerializer.serialize_message(@target, name, *args) serialized_args = ArgsSerializer.serialize_message(@target, name, *args)


if @at @performable.perform_async_with_options(@options, *serialized_args)
@performable.perform_at(@at, *serialized_args)
else
@performable.perform_async(*serialized_args)
end
end end
end end


Expand Down
9 changes: 9 additions & 0 deletions lib/sidekiq/testing/inline.rb
Expand Up @@ -36,6 +36,15 @@ def perform_async(*args)
) )
true true
end end

def perform_async_with_options(options, *args)
new.perform(*
Sidekiq::Extensions::ArgsSerializer.deserialize(
Sidekiq::Extensions::ArgsSerializer.serialize(args)
)
)
true
end
end end
end end
end end
9 changes: 9 additions & 0 deletions lib/sidekiq/worker.rb
Expand Up @@ -35,6 +35,15 @@ def perform_async(*args)
client_push('class' => self, 'args' => args) client_push('class' => self, 'args' => args)
end end


def perform_async_with_options(options, *args)
if options[:at]
int = options[:at].to_f
options[:at] = (int < 1_000_000_000 ? Time.now.to_f + int : int)
end
options = stringify_keys(options)
client_push(options.merge('class' => self, 'args' => args))
end

def perform_in(interval, *args) def perform_in(interval, *args)
int = interval.to_f int = interval.to_f
ts = (int < 1_000_000_000 ? Time.now.to_f + int : int) ts = (int < 1_000_000_000 ? Time.now.to_f + int : int)
Expand Down
Binary file removed test/db/test.sqlite3
Binary file not shown.
11 changes: 10 additions & 1 deletion test/test_extensions.rb
Expand Up @@ -55,7 +55,7 @@ def perform_last_job!(performer)
it 'allows delayed execution of ActiveRecord instance methods' do it 'allows delayed execution of ActiveRecord instance methods' do
assert_equal [], Sidekiq::Client.registered_queues assert_equal [], Sidekiq::Client.registered_queues
assert_equal 0, Sidekiq.redis {|c| c.llen('queue:default') } assert_equal 0, Sidekiq.redis {|c| c.llen('queue:default') }
user = User.create user = User.create!
user.delay.long_instance_method("with_argument") user.delay.long_instance_method("with_argument")
assert_equal ['default'], Sidekiq::Client.registered_queues assert_equal ['default'], Sidekiq::Client.registered_queues
assert_equal 1, Sidekiq.redis {|c| c.llen('queue:default') } assert_equal 1, Sidekiq.redis {|c| c.llen('queue:default') }
Expand All @@ -69,6 +69,15 @@ def perform_last_job!(performer)
assert_equal 1, Sidekiq.redis {|c| c.zcard('schedule') } assert_equal 1, Sidekiq.redis {|c| c.zcard('schedule') }
end end


it 'allows setting queue from options' do
assert_equal [], Sidekiq::Client.registered_queues
assert_equal 0, Sidekiq.redis {|c| c.llen('queue:custom_queue') }
user = User.create!
user.delay(queue: :custom_queue).long_instance_method("with_argument")
assert_equal ['custom_queue'], Sidekiq::Client.registered_queues
assert_equal 1, Sidekiq.redis {|c| c.llen('queue:custom_queue') }
end

ActionMailer::Base.perform_deliveries = false ActionMailer::Base.perform_deliveries = false


class UserMailer < ActionMailer::Base class UserMailer < ActionMailer::Base
Expand Down

0 comments on commit 3bb3148

Please sign in to comment.