Permalink
Browse files

argument serialization into redis

ActiveRecord, classes, modules and other parameters are serialized to strings such as "AR:User:2" and "CLASS:SomeClass" in order to have minimum concern about deserialization on the worker methods (used w/ #delay) and classes.
  • Loading branch information...
1 parent 254c204 commit 3a09556054b8f3e5ceab9f1b38e18b9f243a22b5 @elado committed Aug 18, 2012
View
@@ -5,6 +5,7 @@
require 'sidekiq/redis_connection'
require 'sidekiq/util'
+require 'sidekiq/extensions/args_serializer'
require 'sidekiq/extensions/class_methods'
require 'sidekiq/extensions/action_mailer'
require 'sidekiq/extensions/active_record'
View
@@ -1,4 +1,5 @@
require 'sidekiq/middleware/chain'
+require 'active_support'
module Sidekiq
class Client
@@ -14,8 +14,8 @@ class DelayedMailer
# than 30 seconds to send.
sidekiq_options :timeout => 30
- def perform(yml)
- (target, method_name, args) = YAML.load(yml)
+ def perform(*msg)
+ (target, method_name, args) = ArgsSerializer.deserialize_message(*msg)
target.send(method_name, *args).deliver
end
end
@@ -14,8 +14,8 @@ module Extensions
class DelayedModel
include Sidekiq::Worker
- def perform(yml)
- (target, method_name, args) = YAML.load(yml)
+ def perform(*msg)
+ (target, method_name, args) = ArgsSerializer.deserialize_message(*msg)
target.send(method_name, *args)
end
end
@@ -0,0 +1,39 @@
+module Sidekiq
+ module Extensions
+ class ArgsSerializer
+ # inspired by DelayedJob
+ CLASS_STRING_FORMAT = /^CLASS\:([A-Z][\w\:]+)$/
+ AR_STRING_FORMAT = /^AR\:([A-Z][\w\:]+)\:(\d+)$/
+ YAML_STRING_FORMAT = /\A---/
+
+ def self.serialize(obj)
+ case obj
+ when Array then obj.map { |o| serialize(o) }
+ when Hash then obj.inject({}) { |memo, (k, v)| memo[k] = serialize(v); memo }
+ when ::ActiveRecord::Base then "AR:#{obj.class.name}:#{obj.id}"
+ when Class, Module then "CLASS:#{obj.name}"
+ else obj.to_yaml
+ end
+ end
+
+ def self.deserialize(obj)
+ case obj
+ when CLASS_STRING_FORMAT then $1.constantize
+ when AR_STRING_FORMAT then $1.constantize.where(id: $2).first
+ when Array then obj.map { |item| deserialize(item) }
+ when Hash then obj.inject({}) { |memo, (k, v)| memo[k] = deserialize(v); memo }
+ else YAML.load(obj)
+ end
+ end
+
+
+ def self.serialize_message(target, method_name, *args)
+ [ serialize(target), method_name, serialize(args) ]
+ end
+
+ def self.deserialize_message(*msg)
+ [ deserialize(msg[0]), msg[1], deserialize(msg[2]) ]
+ end
+ end
+ end
+end
@@ -12,8 +12,8 @@ module Extensions
class DelayedClass
include Sidekiq::Worker
- def perform(yml)
- (target, method_name, args) = YAML.load(yml)
+ def perform(*msg)
+ (target, method_name, args) = ArgsSerializer.deserialize_message(*msg)
target.send(method_name, *args)
end
end
@@ -13,11 +13,12 @@ def method_missing(name, *args)
# serialize the objects to a String. The YAML will be converted
# to JSON and then deserialized on the other side back into a
# Ruby object.
- obj = [@target, name, args]
+ serialized_args = ArgsSerializer.serialize_message(@target, name, *args)
+
if @at
- @performable.perform_at(@at, ::YAML.dump(obj))
+ @performable.perform_at(@at, *serialized_args)
else
- @performable.perform_async(::YAML.dump(obj))
+ @performable.perform_async(*serialized_args)
end
end
end
@@ -29,7 +29,11 @@ module Worker
module ClassMethods
alias_method :perform_async_old, :perform_async
def perform_async(*args)
- new.perform(*Sidekiq.load_json(Sidekiq.dump_json(args)))
+ new.perform(*
+ Sidekiq::Extensions::ArgsSerializer.deserialize(
+ Sidekiq::Extensions::ArgsSerializer.serialize(args)
+ )
+ )
true
end
end
View
Binary file not shown.
View
Binary file not shown.
@@ -0,0 +1,67 @@
+require 'helper'
+require 'sidekiq'
+require 'active_record'
+require 'action_mailer'
+
+class TestArgsSerializer < MiniTest::Unit::TestCase
+ describe 'args parser' do
+ before do
+ ActiveRecord::Base.establish_connection adapter: "sqlite3", database: File.join(File.dirname(__FILE__), "db/test.sqlite3")
+ end
+
+ def ser(o)
+ Sidekiq::Extensions::ArgsSerializer.serialize(o)
+ end
+
+ def deser(s)
+ Sidekiq::Extensions::ArgsSerializer.deserialize(s)
+ end
+
+ class User < ActiveRecord::Base
+ end
+
+ it 'serializes active record class' do
+ assert_equal "CLASS:TestArgsSerializer::User", ser(User)
+ assert_equal TestArgsSerializer::User, deser(ser(User))
+ end
+
+ it 'serializes active record instance' do
+ user = User.create!
+ assert_equal "AR:TestArgsSerializer::User:#{user.id}", ser(user)
+ assert_equal user, deser(ser(user))
+ end
+
+ class SomeClass
+ end
+
+ module SomeModule
+ end
+
+ it 'serializes class' do
+ assert_equal "CLASS:TestArgsSerializer::SomeClass", ser(SomeClass)
+ assert_equal SomeClass, deser(ser(SomeClass))
+ end
+
+ it 'serializes module' do
+ assert_equal "CLASS:TestArgsSerializer::SomeModule", ser(SomeModule)
+ assert_equal SomeModule, deser(ser(SomeModule))
+ end
+
+ it 'serializes array' do
+ assert_equal [1, 2, 3], deser(ser([1, 2, 3]))
+ end
+
+ it 'serializes complex object' do
+ user = User.create!
+ user_2 = User.create!
+ user_3 = User.create!
+ obj = [user, [user_2], { user_3: user_3, number: 1, string: "s" }]
+ assert_equal obj, deser(ser(obj))
+ end
+
+ it 'serializes date' do
+ today = Date.today
+ assert_equal today, deser(ser(today))
+ end
+ end
+end
View
@@ -6,47 +6,86 @@
require 'sidekiq/extensions/active_record'
require 'sidekiq/rails'
+require 'sidekiq/processor'
+
Sidekiq.hook_rails!
class TestExtensions < MiniTest::Unit::TestCase
describe 'sidekiq extensions' do
before do
Sidekiq.redis = REDIS
Sidekiq.redis {|c| c.flushdb }
+
+ # overwrite db with blank one
+ dir = File.join(File.dirname(__FILE__), 'db')
+
+ old_db = File.join(dir, 'test.sqlite3')
+ FileUtils.rm(old_db) if File.exists?(old_db)
+ FileUtils.cp(File.join(dir, '.blank.sqlite3'), File.join(dir, 'test.sqlite3'))
+
+ ActiveRecord::Base.establish_connection adapter: "sqlite3", database: File.join(File.dirname(__FILE__), "db/test.sqlite3")
end
- class MyModel < ActiveRecord::Base
- def self.long_class_method
- raise "Should not be called!"
+ class User < ActiveRecord::Base
+ def self.long_class_method(arg)
+ "done long_class_method #{arg}"
+ end
+
+ def long_instance_method(arg)
+ "done long_instance_method #{arg}"
end
end
+ def perform_last_job!(performer)
+ msg = JSON.parse(Sidekiq.redis {|c| c.lrange "queue:default", 0, -1 }[0])
+ performer.new.perform(*msg['args'])
+ end
+
it 'allows delayed execution of ActiveRecord class methods' do
assert_equal [], Sidekiq::Client.registered_queues
assert_equal 0, Sidekiq.redis {|c| c.llen('queue:default') }
- MyModel.delay.long_class_method
+ User.delay.long_class_method("with_argument")
+ assert_equal ['default'], Sidekiq::Client.registered_queues
+ assert_equal 1, Sidekiq.redis {|c| c.llen('queue:default') }
+
+ assert_equal "done long_class_method with_argument", perform_last_job!(Sidekiq::Extensions::DelayedModel)
+ end
+
+ it 'allows delayed execution of ActiveRecord instance methods' do
+ assert_equal [], Sidekiq::Client.registered_queues
+ assert_equal 0, Sidekiq.redis {|c| c.llen('queue:default') }
+ user = User.create
+ user.delay.long_instance_method("with_argument")
assert_equal ['default'], Sidekiq::Client.registered_queues
assert_equal 1, Sidekiq.redis {|c| c.llen('queue:default') }
+
+ assert_equal "done long_instance_method with_argument", perform_last_job!(Sidekiq::Extensions::DelayedModel)
end
it 'allows delayed scheduling of AR class methods' do
assert_equal 0, Sidekiq.redis {|c| c.zcard('schedule') }
- MyModel.delay_for(5.days).long_class_method
+ User.delay_for(5.days).long_class_method
assert_equal 1, Sidekiq.redis {|c| c.zcard('schedule') }
end
+ ActionMailer::Base.perform_deliveries = false
+
class UserMailer < ActionMailer::Base
- def greetings(a, b)
- raise "Should not be called!"
+ def greetings(email, name)
+ mail from: "test@domain.com", to: email, subject: "Hello #{name}"
end
end
it 'allows delayed delivery of ActionMailer mails' do
assert_equal [], Sidekiq::Client.registered_queues
assert_equal 0, Sidekiq.redis {|c| c.llen('queue:default') }
- UserMailer.delay.greetings(1, 2)
+ UserMailer.delay.greetings("user@domain.com", "John Doe")
assert_equal ['default'], Sidekiq::Client.registered_queues
assert_equal 1, Sidekiq.redis {|c| c.llen('queue:default') }
+
+ mail_message = perform_last_job!(Sidekiq::Extensions::DelayedMailer)
+ assert_equal mail_message.to, ["user@domain.com"]
+ assert_equal mail_message.subject, "Hello John Doe"
end
it 'allows delayed scheduling of AM mails' do
@@ -57,11 +96,15 @@ def greetings(a, b)
class SomeClass
def self.doit(arg)
+ ["done", arg]
end
end
it 'allows delay of any ole class method' do
+ today = Date.today
SomeClass.delay.doit(Date.today)
+
+ assert_equal ["done", today], perform_last_job!(Sidekiq::Extensions::DelayedClass)
end
end
View
@@ -1,5 +1,6 @@
require 'helper'
require 'sidekiq/scheduled'
+require 'active_support/time'
class TestScheduling < MiniTest::Unit::TestCase
describe 'middleware' do
@@ -12,7 +12,7 @@
class TestInline < MiniTest::Unit::TestCase
describe 'sidekiq inline testing' do
class InlineError < RuntimeError; end
- class ParameterIsNotString < RuntimeError; end
+ class ParameterIsNotTime < RuntimeError; end
class InlineWorker
include Sidekiq::Worker
@@ -24,7 +24,7 @@ def perform(pass)
class InlineWorkerWithTimeParam
include Sidekiq::Worker
def perform(time)
- raise ParameterIsNotString unless time.is_a?(String)
+ raise ParameterIsNotTime unless time.is_a?(Time)
end
end

0 comments on commit 3a09556

Please sign in to comment.