Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

serializable_hash: Jobs should partially define how to as serialize and de-serialize themselves #9924

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
84 changes: 76 additions & 8 deletions activesupport/lib/active_support/queueing.rb
@@ -1,7 +1,54 @@
require 'delegate'
require 'thread'
require 'active_support/hash_with_indifferent_access'

module ActiveSupport

class JobPayloadizer

def self.payload_from_job(job)
if job.is_a?(Class)
return job.name
end
if job.nil?
return job
end
unless job.respond_to?(:to_serializable_hash)
raise TypeError, "Please implement to_serializable_hash on #{job.inspect}"
end
unless job.class.respond_to?(:from_serializable_hash)
raise TypeError, "Please implement #{job.class}.from_serializable_hash"
end
{'job_class' => job.class.name, 'object_hash' => job.to_serializable_hash}
end

def self.job_from_payload(payload)
if payload.nil?
return payload
end
unless payload.is_a?(Hash)
return payload.constantize
end
job_class = payload["job_class"].constantize
object_hash = HashWithIndifferentAccess.new(payload["object_hash"])
job_class.from_serializable_hash(object_hash)
end

def self.unconstantize
self.name.to_s
end

end

class MarshalEncoder
def self.encode(object)
Marshal.dump(object)
end
def self.decode(encoded_object)
Marshal.load(encoded_object)
end
end

# A Queue that simply inherits from STDLIB's Queue. When this
# queue is used, Rails automatically starts a job runner in a
# background thread.
Expand All @@ -17,13 +64,41 @@ def consumer
@consumer ||= ThreadedQueueConsumer.new(self, @consumer_options)
end

def default_payloadizer
@payloadizer ||= JobPayloadizer
end

def encoder
# but perhaps ActiveSupport::JSON would be a better default
@encoder ||= MarshalEncoder
end

# Drain the queue, running all jobs in a different thread. This method
# may not be available on production queues.
def drain
# run the jobs in a separate thread so assumptions of synchronous
# jobs are caught in test mode.
consumer.drain
end

def pop
joberize(super)
end

def push(job)
#Jobs must define their own payloadizer, or implement to_serializable_hash / from_serializable_hash
payloadizer = job.respond_to?(:payloadizer) ? job.payloadizer : default_payloadizer
payload = payloadizer.payload_from_job(job)
super encoder.encode({'payloadizer' => payloadizer.unconstantize, 'payload' => payload})
end

protected

def joberize(payload)
job_data = encoder.decode(payload)
job_data["payloadizer"].constantize.job_from_payload(job_data["payload"])
end

end

class SynchronousQueue < Queue
Expand All @@ -45,14 +120,7 @@ class TestQueue < Queue
# Get a list of the jobs off this queue. This method may not be
# available on production queues.
def jobs
@que.dup
end

# Marshal and unmarshal job before pushing it onto the queue. This will
# raise an exception on any attempts in tests to push jobs that can't (or
# shouldn't) be marshalled.
def push(job)
super Marshal.load(Marshal.dump(job))
@que.dup.map{ |job_data| joberize(job_data) }
end
end

Expand Down
22 changes: 19 additions & 3 deletions activesupport/test/queueing/synchronous_queue_test.rb
Expand Up @@ -3,11 +3,27 @@

class SynchronousQueueTest < ActiveSupport::TestCase
class Job
attr_reader :ran
def run; @ran = true end
class << self
attr_accessor :ran
end
def to_serializable_hash
{}
end
def self.from_serializable_hash(hash)
Job.new
end
def run
self.class.ran = true
end
end

class ExceptionRaisingJob
def to_serializable_hash
{}
end
def self.from_serializable_hash(hash)
ExceptionRaisingJob.new
end
def run; raise end
end

Expand All @@ -18,7 +34,7 @@ def setup
def test_runs_jobs_immediately
job = Job.new
@queue.push job
assert job.ran
assert Job.ran

assert_raises RuntimeError do
@queue.push ExceptionRaisingJob.new
Expand Down
65 changes: 47 additions & 18 deletions activesupport/test/queueing/test_queue_test.rb
Expand Up @@ -10,6 +10,12 @@ class ExceptionRaisingJob
def run
raise
end
def to_serializable_hash
{}
end
def self.from_serializable_hash(hash)
ExceptionRaisingJob.new
end
end

def test_drain_raises_exceptions_from_running_jobs
Expand All @@ -18,14 +24,21 @@ def test_drain_raises_exceptions_from_running_jobs
end

def test_jobs
@queue.push 1
@queue.push 2
assert_equal [1,2], @queue.jobs
@queue.push Fixnum
@queue.push String
assert_equal [Fixnum,String], @queue.jobs
end

class EquivalentJob
def initialize
@initial_id = self.object_id
def initialize(id = nil)
@initial_id = id || self.object_id
end

def to_serializable_hash
{:initial_id => @initial_id}
end
def self.from_serializable_hash(hash)
EquivalentJob.new(hash[:initial_id])
end

def run
Expand Down Expand Up @@ -61,6 +74,13 @@ def initialize(object)
@object = object
end

def to_serializable_hash
{:object => @object}
end
def self.from_serializable_hash(hash)
ProcessingJob.new(hash[:object])
end

def run
self.class.processed << @object
end
Expand All @@ -79,32 +99,47 @@ def test_order
end

class ThreadTrackingJob
attr_reader :thread_id
class << self
attr_accessor :thread_id
end

def to_serializable_hash
{}
end
def self.from_serializable_hash(hash)
ThreadTrackingJob.new
end

def run
@thread_id = Thread.current.object_id
self.class.thread_id = Thread.current.object_id
end

def ran?
@thread_id
def self.ran?
thread_id
end
end

def test_drain
@queue.push ThreadTrackingJob.new
job = @queue.jobs.last
@queue.drain

assert @queue.empty?
assert job.ran?, "The job runs synchronously when the queue is drained"
assert_equal job.thread_id, Thread.current.object_id
assert ThreadTrackingJob.ran?, "The job runs synchronously when the queue is drained"
assert_equal ThreadTrackingJob.thread_id, Thread.current.object_id
end

class IdentifiableJob
def initialize(id)
@id = id
end

def to_serializable_hash
{:id => @id}
end
def self.from_serializable_hash(hash)
IdentifiableJob.new(hash[:id])
end

def ==(other)
other.same_id?(@id)
end
Expand Down Expand Up @@ -137,10 +172,4 @@ def test_adding_an_unmarshallable_job
end
end

def test_attempting_to_add_a_reference_to_itself
job = {reference: @queue}
assert_raises TypeError do
@queue.push job
end
end
end
15 changes: 14 additions & 1 deletion activesupport/test/queueing/threaded_consumer_test.rb
Expand Up @@ -4,12 +4,24 @@

class TestThreadConsumer < ActiveSupport::TestCase
class Job
class << self; attr_accessor :blocks; end
self.blocks = {}

attr_reader :id
def initialize(id = 1, &block)
@id = id
@block = block
end

def to_serializable_hash
Job.blocks[id] = @block
{:id => @id}
end
def self.from_serializable_hash(hash)
id = hash[:id]
Job.new(id, &Job.blocks[id])
end

def run
@block.call if @block
end
Expand Down Expand Up @@ -68,7 +80,8 @@ def teardown
consume_queue @queue

assert_equal 1, @logger.logged(:error).size
assert_match "Job Error: #{job.inspect}\nRuntimeError: Error!", @logger.logged(:error).last
assert_match "Job Error: #<TestThreadConsumer::Job", @logger.logged(:error).last
assert_match "RuntimeError: Error!", @logger.logged(:error).last
end

test "logger defaults to stderr" do
Expand Down