Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions app/jobs/event_created_job.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
class EventCreatedJob
include Sidekiq::Job

def self.perform_async(*args)
can_handle?(type: args[0]["type"]) ? super : nil
def self.perform_async(event_id, event_type)
can_handle?(event_type: event_type) ? super : nil
end

def perform(args)
job_class_name = to_job_class_name(type: args["type"])
def perform(event_id, event_type)
job_class_name = to_job_class_name(event_type: event_type)

if job_class_name.nil?
Sidekiq.logger.debug("Could not get job class name from event type: #{args["type"]}")
Sidekiq.logger.debug("Could not get job class name from event type: #{event_type}")

return
end
Expand All @@ -22,16 +22,16 @@ def perform(args)
return
end

job_class.perform_async(args["id"])
job_class.perform_async(event_id, event_type)
end

TYPE_REGEX = /\A(::)?([A-Z][\w]*::)*[A-Z][\w]*Event\z/

def to_job_class_name(type:)
self.class.can_handle?(type: type) ? type.sub(/Event\z/, "Job") : nil
def to_job_class_name(event_type:)
self.class.can_handle?(event_type: event_type) ? event_type.sub(/Event\z/, "Job") : nil
end

def self.can_handle?(type:)
def self.can_handle?(event_type:)
# \A => start of string
# (::)? => optional leading ::
# ([A-Z]\w*::)* => zero or more namespace segments like Foo:: or FooBar::
Expand All @@ -52,6 +52,6 @@ def self.can_handle?(type:)
# ✘ 123::InvalidEvent # starts with digit
# ✘ Foo:: # incomplete constant

/\A(::)?([A-Z][\w]*::)*[A-Z][\w]*Event\z/.match?(type)
/\A(::)?([A-Z][\w]*::)*[A-Z][\w]*Event\z/.match?(event_type)
end
end
4 changes: 2 additions & 2 deletions app/jobs/outboxer_integration/test_started_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ module OutboxerIntegration
class TestStartedJob
include Sidekiq::Job

def perform(event_id)
test_started_event = TestStartedEvent.find(event_id)
def perform(event_id, event_type)
test_started_event = event_type.safe_constantize.find(event_id)

TestCompletedEvent.create!(body: { "test_id" => test_started_event.body["test_id"] })
end
Expand Down
7 changes: 1 addition & 6 deletions bin/outboxer_publisher
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ begin
heartbeat: options[:heartbeat],
logger: logger
) do |message|
if EventCreatedJob.can_handle?(type: message[:messageable_type])
EventCreatedJob.perform_async({
"id" => message[:messageable_id],
"type" => message[:messageable_type]
})
end
EventCreatedJob.perform_async(message[:messageable_id], message[:messageable_type])
end
ensure
Outboxer::Database.disconnect(logger: logger)
Expand Down
123 changes: 78 additions & 45 deletions spec/jobs/event_created_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,38 @@
it "enqueues ContactCreatedJob" do
stub_const("ContactCreatedJob", Class.new { include Sidekiq::Job })

EventCreatedJob.new.perform({ "id" => 1, "type" => "ContactCreatedEvent" })
EventCreatedJob.new.perform(1, "ContactCreatedEvent")

expect(ContactCreatedJob.jobs).to match([
hash_including("class" => "ContactCreatedJob", "args" => [1])
hash_including(
"class" => "ContactCreatedJob",
"args" => [1, "ContactCreatedEvent"])
])
end

it "enqueues Accountify::ContactCreatedJob" do
stub_const("Accountify", Module.new)
stub_const("Accountify::ContactCreatedJob", Class.new { include Sidekiq::Job })

EventCreatedJob.new.perform({ "id" => 2, "type" => "Accountify::ContactCreatedEvent" })
EventCreatedJob.new.perform(2, "Accountify::ContactCreatedEvent")

expect(Accountify::ContactCreatedJob.jobs).to match([
hash_including("class" => "Accountify::ContactCreatedJob", "args" => [2])
hash_including(
"class" => "Accountify::ContactCreatedJob",
"args" => [2, "Accountify::ContactCreatedEvent"])
])
end

it "enqueues Accountify::InvoiceUpdatedJob" do
stub_const("Accountify", Module.new) unless defined?(Accountify)
stub_const("Accountify::InvoiceUpdatedJob", Class.new { include Sidekiq::Job })

EventCreatedJob.new.perform({ "id" => 3, "type" => "Accountify::InvoiceUpdatedEvent" })
EventCreatedJob.new.perform(3, "Accountify::InvoiceUpdatedEvent")

expect(Accountify::InvoiceUpdatedJob.jobs).to match([
hash_including("class" => "Accountify::InvoiceUpdatedJob", "args" => [3])
hash_including(
"class" => "Accountify::InvoiceUpdatedJob",
"args" => [3, "Accountify::InvoiceUpdatedEvent"])
])
end

Expand All @@ -46,79 +52,87 @@
stub_const("Accountify::Invoice", Module.new)
stub_const("Accountify::Invoice::CreatedJob", Class.new { include Sidekiq::Job })

EventCreatedJob.new.perform({ "id" => 4, "type" => "Accountify::Invoice::CreatedEvent" })
EventCreatedJob.new.perform(4, "Accountify::Invoice::CreatedEvent")

expect(Accountify::Invoice::CreatedJob.jobs).to match([
hash_including("class" => "Accountify::Invoice::CreatedJob", "args" => [4])
hash_including(
"class" => "Accountify::Invoice::CreatedJob",
"args" => [4, "Accountify::Invoice::CreatedEvent"])
])
end

it "enqueues MyApp::Domain::Event::UserSignedUpJob" do
stub_const("MyApp", Module.new)
stub_const("MyApp::Domain", Module.new)
stub_const("MyApp::Domain::Event", Module.new)
stub_const("MyApp::Domain::Event::UserSignedUpJob", Class.new { include Sidekiq::Job })
stub_const("MyApp::Domain::Event::UserSignedUpJob",
Class.new { include Sidekiq::Job })

EventCreatedJob.new.perform(
{ "id" => 5, "type" => "MyApp::Domain::Event::UserSignedUpEvent" })
5,
"MyApp::Domain::Event::UserSignedUpEvent")

expect(MyApp::Domain::Event::UserSignedUpJob.jobs).to match([
hash_including("class" => "MyApp::Domain::Event::UserSignedUpJob", "args" => [5])
hash_including(
"class" => "MyApp::Domain::Event::UserSignedUpJob",
"args" => [5, "MyApp::Domain::Event::UserSignedUpEvent"])
])
end

it "handles leading :: in type" do
stub_const("ContactCreatedJob", Class.new { include Sidekiq::Job })

EventCreatedJob.new.perform({ "id" => 6, "type" => "::ContactCreatedEvent" })
EventCreatedJob.new.perform(6, "::ContactCreatedEvent")

expect(ContactCreatedJob.jobs).to match([
hash_including("class" => "ContactCreatedJob", "args" => [6])
hash_including(
"class" => "ContactCreatedJob",
"args" => [6, "::ContactCreatedEvent"])
])
end

it "does not enqueue for invalid type format" do
EventCreatedJob.new.perform({ "id" => 7, "type" => "contact_created_event" })
EventCreatedJob.new.perform(7, "contact_created_event")
expect(Sidekiq::Worker.jobs.size).to eq(0)
end

it "does not enqueue for empty string" do
EventCreatedJob.new.perform({ "id" => 8, "type" => "" })
EventCreatedJob.new.perform(8, "")
expect(Sidekiq::Worker.jobs.size).to eq(0)
end

it "does not enqueue if type does not end in Event" do
EventCreatedJob.new.perform({ "id" => 9, "type" => "SomethingCreated" })
EventCreatedJob.new.perform(9, "SomethingCreated")
expect(Sidekiq::Worker.jobs.size).to eq(0)
end

it "does not enqueue for missing job class" do
EventCreatedJob.new.perform({ "id" => 10, "type" => "ImaginaryThingEvent" })
EventCreatedJob.new.perform(10, "ImaginaryThingEvent")
expect(Sidekiq::Worker.jobs.size).to eq(0)
end

it "does not enqueue for invalid type constant path" do
EventCreatedJob.new.perform({ "id" => 11, "type" => "123::@@@Invalid" })
EventCreatedJob.new.perform(11, "123::@@@Invalid")
expect(Sidekiq::Worker.jobs.size).to eq(0)
end

it "does not enqueue if type is exactly 'Event'" do
EventCreatedJob.new.perform({ "id" => 12, "type" => "Event" })
EventCreatedJob.new.perform(12, "Event")
expect(Sidekiq::Worker.jobs.size).to eq(0)
end

it "logs debug message when type is invalid" do
expect(Sidekiq.logger).to receive(:debug).with(
"Could not get job class name from event type: bad_type")

EventCreatedJob.new.perform({ "id" => 13, "type" => "bad_type" })
EventCreatedJob.new.perform(13, "bad_type")
end

it "logs debug message when job class name is not constantizable" do
expect(Sidekiq.logger).to receive(:debug).with(
"Could not constantize job class name: ImaginaryThingJob")

EventCreatedJob.new.perform({ "id" => 14, "type" => "ImaginaryThingEvent" })
EventCreatedJob.new.perform(14, "ImaginaryThingEvent")
end
end

Expand All @@ -128,35 +142,41 @@
it "enqueues ContactCreatedJob through perform_async" do
stub_const("ContactCreatedJob", Class.new { include Sidekiq::Job })

EventCreatedJob.perform_async({ "id" => 1, "type" => "ContactCreatedEvent" })
EventCreatedJob.perform_async(1, "ContactCreatedEvent")
EventCreatedJob.drain

expect(ContactCreatedJob.jobs).to match([
hash_including("class" => "ContactCreatedJob", "args" => [1])
hash_including(
"class" => "ContactCreatedJob",
"args" => [1, "ContactCreatedEvent"])
])
end

it "enqueues Accountify::ContactCreatedJob through perform_async" do
stub_const("Accountify", Module.new)
stub_const("Accountify::ContactCreatedJob", Class.new { include Sidekiq::Job })

EventCreatedJob.perform_async({ "id" => 2, "type" => "Accountify::ContactCreatedEvent" })
EventCreatedJob.perform_async(2, "Accountify::ContactCreatedEvent")
EventCreatedJob.drain

expect(Accountify::ContactCreatedJob.jobs).to match([
hash_including("class" => "Accountify::ContactCreatedJob", "args" => [2])
hash_including(
"class" => "Accountify::ContactCreatedJob",
"args" => [2, "Accountify::ContactCreatedEvent"])
])
end

it "enqueues Accountify::InvoiceUpdatedJob through perform_async" do
stub_const("Accountify", Module.new) unless defined?(Accountify)
stub_const("Accountify::InvoiceUpdatedJob", Class.new { include Sidekiq::Job })

EventCreatedJob.perform_async({ "id" => 3, "type" => "Accountify::InvoiceUpdatedEvent" })
EventCreatedJob.perform_async(3, "Accountify::InvoiceUpdatedEvent")
EventCreatedJob.drain

expect(Accountify::InvoiceUpdatedJob.jobs).to match([
hash_including("class" => "Accountify::InvoiceUpdatedJob", "args" => [3])
hash_including(
"class" => "Accountify::InvoiceUpdatedJob",
"args" => [3, "Accountify::InvoiceUpdatedEvent"])
])
end

Expand All @@ -165,91 +185,104 @@
stub_const("Accountify::Invoice", Module.new)
stub_const("Accountify::Invoice::CreatedJob", Class.new { include Sidekiq::Job })

EventCreatedJob.perform_async({ "id" => 4, "type" => "Accountify::Invoice::CreatedEvent" })
EventCreatedJob.perform_async(4, "Accountify::Invoice::CreatedEvent")
EventCreatedJob.drain

expect(Accountify::Invoice::CreatedJob.jobs).to match([
hash_including("class" => "Accountify::Invoice::CreatedJob", "args" => [4])
hash_including(
"class" => "Accountify::Invoice::CreatedJob",
"args" => [4, "Accountify::Invoice::CreatedEvent"])
])
end

it "enqueues MyApp::Domain::Event::UserSignedUpJob through perform_async" do
stub_const("MyApp", Module.new)
stub_const("MyApp::Domain", Module.new)
stub_const("MyApp::Domain::Event", Module.new)
stub_const("MyApp::Domain::Event::UserSignedUpJob", Class.new { include Sidekiq::Job })
stub_const("MyApp::Domain::Event::UserSignedUpJob",
Class.new { include Sidekiq::Job })

EventCreatedJob.perform_async({
"id" => 5,
"type" => "MyApp::Domain::Event::UserSignedUpEvent"
})
EventCreatedJob.perform_async(
5,
"MyApp::Domain::Event::UserSignedUpEvent")
EventCreatedJob.drain

expect(MyApp::Domain::Event::UserSignedUpJob.jobs).to match([
hash_including("class" => "MyApp::Domain::Event::UserSignedUpJob", "args" => [5])
hash_including(
"class" => "MyApp::Domain::Event::UserSignedUpJob",
"args" => [5, "MyApp::Domain::Event::UserSignedUpEvent"])
])
end

it "handles leading :: in type through perform_async" do
stub_const("ContactCreatedJob", Class.new { include Sidekiq::Job })

EventCreatedJob.perform_async({ "id" => 6, "type" => "::ContactCreatedEvent" })
EventCreatedJob.perform_async(6, "::ContactCreatedEvent")
EventCreatedJob.drain

expect(ContactCreatedJob.jobs).to match([
hash_including("class" => "ContactCreatedJob", "args" => [6])
hash_including(
"class" => "ContactCreatedJob",
"args" => [6, "::ContactCreatedEvent"])
])
end

it "does not enqueue job for invalid type format" do
expect do
EventCreatedJob.perform_async({ "id" => 7, "type" => "contact_created_event" })
EventCreatedJob.perform_async(7, "contact_created_event")
end.not_to change(EventCreatedJob.jobs, :size)
end

it "does not enqueue job for empty string" do
expect do
EventCreatedJob.perform_async({ "id" => 8, "type" => "" })
EventCreatedJob.perform_async(8, "")
end.not_to change(EventCreatedJob.jobs, :size)
end

it "does not enqueue job if type does not end in Event" do
expect do
EventCreatedJob.perform_async({ "id" => 9, "type" => "SomethingCreated" })
EventCreatedJob.perform_async(9, "SomethingCreated")
end.not_to change(EventCreatedJob.jobs, :size)
end

it "skips perform_async for missing job class" do
EventCreatedJob.perform_async({ "id" => 10, "type" => "ImaginaryThingEvent" })
EventCreatedJob.perform_async(10, "ImaginaryThingEvent")
EventCreatedJob.drain

expect(Sidekiq::Worker.jobs.size).to eq(0)
end

it "skips perform_async for invalid constant path" do
EventCreatedJob.perform_async({ "id" => 11, "type" => "123::@@@Invalid" })
EventCreatedJob.perform_async(11, "123::@@@Invalid")
EventCreatedJob.drain

expect(Sidekiq::Worker.jobs.size).to eq(0)
end

it "skips perform_async if type is exactly 'Event'" do
EventCreatedJob.perform_async({ "id" => 12, "type" => "Event" })
EventCreatedJob.perform_async(12, "Event")
EventCreatedJob.drain

expect(Sidekiq::Worker.jobs.size).to eq(0)
end

it "returns nil for invalid type" do
result = EventCreatedJob.perform_async({ "id" => 13, "type" => "bad_type" })
result = EventCreatedJob.perform_async(13, "bad_type")
expect(result).to be_nil
end

it "returns job ID when enqueued" do
stub_const("ContactCreatedJob", Class.new { include Sidekiq::Job })

result = EventCreatedJob.perform_async({ "id" => 14, "type" => "ContactCreatedEvent" })
result = EventCreatedJob.perform_async(14, "ContactCreatedEvent")
EventCreatedJob.drain

expect(result).to be_a(String)
expect(ContactCreatedJob.jobs).to match([
hash_including(
"class" => "ContactCreatedJob",
"args" => [14, "ContactCreatedEvent"])
])
end
end
end