Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriptions redesigned #665

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
be31df8
Redefine subscriptions - breaking changes
mpraglowski Aug 9, 2019
39bc7ad
Refactor subscriptions
mpraglowski Aug 9, 2019
8ff0e36
Subscription storage is optional
mpraglowski Aug 9, 2019
972be0a
All dispatchers & schedulers receive now subscription object instead …
mpraglowski Aug 10, 2019
b693e49
Single subscriptions store is enough
mpraglowski Aug 16, 2019
33f34cb
Subscription global or assigned to specific type - it's just an imple…
mpraglowski Aug 16, 2019
2c9e3aa
Minor improvement & fixes
mpraglowski Aug 16, 2019
554cdbe
Docs & lint/specs for subscriptions store
mpraglowski Aug 16, 2019
4f76f27
Docs & specs for Subscription object
mpraglowski Aug 16, 2019
e18ceb3
No more local & global subscriptions, there are main subscription sto…
mpraglowski Aug 16, 2019
c8dedab
Do not need that anymore
mpraglowski Aug 16, 2019
29e62b4
Simplify subscriptions store API
mpraglowski Aug 16, 2019
acc8184
Must be excluded from mutation tests as test where it could be trigge…
mpraglowski Aug 16, 2019
2bd6c6d
Kill mutant
mpraglowski Aug 16, 2019
9aefcdf
Typo fix
mpraglowski Aug 16, 2019
636cafb
It does not make sense to require subscription store to be chainable
mpraglowski Aug 16, 2019
ff945e7
Fix subscriptions store lint namespace
mpraglowski Aug 16, 2019
cec639c
Add instrumented subscriptions store
mpraglowski Aug 16, 2019
b82767b
Use store factory for thread store instead of just a class
mpraglowski Aug 16, 2019
570590f
Again, keep Ruby 2.4 compatibility for now
mpraglowski Sep 6, 2019
7140093
Kill mutations
mpraglowski Sep 6, 2019
25dd989
Add frozen_string_literal: true in all changed files
mpraglowski Sep 6, 2019
4ba25e3
Document subscriptions provider class
mpraglowski Sep 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

module RailsEventStore
class ActiveJobScheduler
def call(klass, serialized_event)
def call(subscription, serialized_event)
klass = subscription.subscriber
klass.perform_later(serialized_event.to_h)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ def initialize(scheduler:)
@scheduler = scheduler
end

def call(subscriber, _, serialized_event)
def call(subscription, _, serialized_event)
run do
@scheduler.call(subscriber, serialized_event)
@scheduler.call(subscription.subscriber, serialized_event)
end
end

Expand Down
3 changes: 2 additions & 1 deletion rails_event_store/spec/active_job_scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ module RailsEventStore
describe "#call" do
specify do
scheduler = ActiveJobScheduler.new
scheduler.call(MyAsyncHandler, serialized_event)
subscription = RubyEventStore::Subscription.new(MyAsyncHandler)
scheduler.call(subscription, serialized_event)

enqueued_jobs = ActiveJob::Base.queue_adapter.enqueued_jobs
expect(enqueued_jobs.size).to eq(1)
Expand Down
36 changes: 24 additions & 12 deletions rails_event_store/spec/after_commit_async_dispatcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class DummyRecord < ActiveRecord::Base

it "dispatch job immediately when no transaction is open" do
expect_to_have_enqueued_job(MyAsyncHandler) do
dispatcher.call(MyAsyncHandler, event, serialized_event)
subscription = RubyEventStore::Subscription.new(MyAsyncHandler)
dispatcher.call(subscription, event, serialized_event)
end
expect(MyAsyncHandler.received).to be_nil
MyAsyncHandler.perform_enqueued_jobs
Expand All @@ -48,7 +49,8 @@ class DummyRecord < ActiveRecord::Base
expect_to_have_enqueued_job(MyAsyncHandler) do
ActiveRecord::Base.transaction do
expect_no_enqueued_job(MyAsyncHandler) do
dispatcher.call(MyAsyncHandler, event, serialized_event)
subscription = RubyEventStore::Subscription.new(MyAsyncHandler)
dispatcher.call(subscription, event, serialized_event)
end
end
end
Expand All @@ -61,7 +63,8 @@ class DummyRecord < ActiveRecord::Base
it "does not dispatch job" do
expect_no_enqueued_job(MyAsyncHandler) do
ActiveRecord::Base.transaction do
dispatcher.call(MyAsyncHandler, event, serialized_event)
subscription = RubyEventStore::Subscription.new(MyAsyncHandler)
dispatcher.call(subscription, event, serialized_event)
raise ActiveRecord::Rollback
end
end
Expand All @@ -84,7 +87,8 @@ class DummyRecord < ActiveRecord::Base
it "does not dispatch job" do
expect_no_enqueued_job(MyAsyncHandler) do
ActiveRecord::Base.transaction do
dispatcher.call(MyAsyncHandler, event, serialized_event)
subscription = RubyEventStore::Subscription.new(MyAsyncHandler)
dispatcher.call(subscription, event, serialized_event)
raise ActiveRecord::Rollback
end
end
Expand All @@ -99,7 +103,8 @@ class DummyRecord < ActiveRecord::Base
ActiveRecord::Base.transaction do
expect_no_enqueued_job(MyAsyncHandler) do
ActiveRecord::Base.transaction(requires_new: false) do
dispatcher.call(MyAsyncHandler, event, serialized_event)
subscription = RubyEventStore::Subscription.new(MyAsyncHandler)
dispatcher.call(subscription, event, serialized_event)
end
end
end
Expand All @@ -114,7 +119,8 @@ class DummyRecord < ActiveRecord::Base
ActiveRecord::Base.transaction do
expect_no_enqueued_job(MyAsyncHandler) do
ActiveRecord::Base.transaction(requires_new: true) do
dispatcher.call(MyAsyncHandler, event, serialized_event)
subscription = RubyEventStore::Subscription.new(MyAsyncHandler)
dispatcher.call(subscription, event, serialized_event)
end
end
end
Expand All @@ -129,7 +135,8 @@ class DummyRecord < ActiveRecord::Base
ActiveRecord::Base.transaction do
expect_no_enqueued_job(MyAsyncHandler) do
ActiveRecord::Base.transaction(requires_new: true) do
dispatcher.call(MyAsyncHandler, event, serialized_event)
subscription = RubyEventStore::Subscription.new(MyAsyncHandler)
dispatcher.call(subscription, event, serialized_event)
raise ActiveRecord::Rollback
end
end
Expand All @@ -150,7 +157,8 @@ class DummyRecord < ActiveRecord::Base
ActiveRecord::Base.transaction do
DummyRecord.new.save!
expect_no_enqueued_job(MyAsyncHandler) do
dispatcher.call(MyAsyncHandler, event, serialized_event)
subscription = RubyEventStore::Subscription.new(MyAsyncHandler)
dispatcher.call(subscription, event, serialized_event)
end
end
rescue DummyError
Expand Down Expand Up @@ -181,7 +189,8 @@ class DummyRecord < ActiveRecord::Base
ActiveRecord::Base.transaction do
DummyRecord.new.save!
expect_no_enqueued_job(MyAsyncHandler) do
dispatcher.call(MyAsyncHandler, event, serialized_event)
subscription = RubyEventStore::Subscription.new(MyAsyncHandler)
dispatcher.call(subscription, event, serialized_event)
end
end
rescue DummyError
Expand All @@ -205,7 +214,8 @@ class DummyRecord < ActiveRecord::Base

it "dispatches the job" do
expect_to_have_enqueued_job(MyAsyncHandler) do
dispatcher.call(MyAsyncHandler, event, serialized_event)
subscription = RubyEventStore::Subscription.new(MyAsyncHandler)
dispatcher.call(subscription, event, serialized_event)
end
end

Expand All @@ -218,7 +228,8 @@ class DummyRecord < ActiveRecord::Base
expect_no_enqueued_job(MyAsyncHandler) do
ActiveRecord::Base.transaction do
expect_no_enqueued_job(MyAsyncHandler) do
dispatcher.call(MyAsyncHandler, event, serialized_event)
subscription = RubyEventStore::Subscription.new(MyAsyncHandler)
dispatcher.call(subscription, event, serialized_event)
end
end
end
Expand All @@ -234,7 +245,8 @@ class DummyRecord < ActiveRecord::Base
expect_to_have_enqueued_job(MyAsyncHandler) do
ActiveRecord::Base.transaction do
expect_no_enqueued_job(MyAsyncHandler) do
dispatcher.call(MyAsyncHandler, event, serialized_event)
subscription = RubyEventStore::Subscription.new(MyAsyncHandler)
dispatcher.call(subscription, event, serialized_event)
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion rails_event_store/spec/rails_event_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def perform(_event)
end

class CustomSidekiqScheduler
def call(klass, serialized_event)
def call(subscription, serialized_event)
klass = subscription.subscriber
klass.perform_async(serialized_event.to_h)
end

Expand Down
2 changes: 2 additions & 0 deletions ruby_event_store/lib/ruby_event_store.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# frozen_string_literal: true

require 'ruby_event_store/dispatcher'
require 'ruby_event_store/subscription'
require 'ruby_event_store/in_memory_subscriptions_store'
require 'ruby_event_store/subscriptions'
require 'ruby_event_store/broker'
require 'ruby_event_store/in_memory_repository'
Expand Down
2 changes: 1 addition & 1 deletion ruby_event_store/lib/ruby_event_store/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def call
unsubs += add_thread_subscribers
@block.call
ensure
unsubs.each(&:call) if unsubs
unsubs.each(&:unsubscribe) if unsubs
end

private
Expand Down
6 changes: 3 additions & 3 deletions ruby_event_store/lib/ruby_event_store/composed_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ def initialize(*dispatchers)
@dispatchers = dispatchers
end

def call(subscriber, event, serialized_event)
def call(subscription, event, serialized_event)
@dispatchers.each do |dispatcher|
if dispatcher.verify(subscriber)
dispatcher.call(subscriber, event, serialized_event)
if dispatcher.verify(subscription.subscriber)
dispatcher.call(subscription, event, serialized_event)
break
end
end
Expand Down
1 change: 1 addition & 0 deletions ruby_event_store/lib/ruby_event_store/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

module RubyEventStore
GLOBAL_STREAM = Object.new
GLOBAL_SUBSCRIPTION = Object.new
PAGE_SIZE = 100.freeze
end
5 changes: 2 additions & 3 deletions ruby_event_store/lib/ruby_event_store/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

module RubyEventStore
class Dispatcher
def call(subscriber, event, _)
subscriber = subscriber.new if Class === subscriber
subscriber.call(event)
def call(subscription, event, _)
subscription.call(event)
end

def verify(subscriber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ def initialize(scheduler:)
@scheduler = scheduler
end

def call(subscriber, _, serialized_event)
@scheduler.call(subscriber, serialized_event)
def call(subscription, _, serialized_event)
@scheduler.call(subscription, serialized_event)
end

def verify(subscriber)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
module RubyEventStore
class InMemorySubscriptionsStore
def initialize
@subscriptions = Hash.new {|hsh, key| hsh[key] = [] }
end

def add(subscription, type = GLOBAL_SUBSCRIPTION)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

I'd rather call it ANY_EVENT

@subscriptions[type.to_s] << subscription
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to_s? Aren't we expecting Event#type after all?

end

def delete(subscription, type = GLOBAL_SUBSCRIPTION)
@subscriptions.fetch(type.to_s).delete(subscription)
end

def all_for(event_type)
@subscriptions[event_type.to_s]
end

def value
self
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ def initialize(dispatcher, instrumentation)
@instrumentation = instrumentation
end

def call(subscriber, event, serialized_event)
instrumentation.instrument("call.dispatcher.rails_event_store", event: event, subscriber: subscriber) do
dispatcher.call(subscriber, event, serialized_event)
def call(subscription, event, serialized_event)
instrumentation.instrument("call.dispatcher.rails_event_store", event: event, subscription: subscription) do
dispatcher.call(subscription, event, serialized_event)
end
end

Expand Down
47 changes: 23 additions & 24 deletions ruby_event_store/lib/ruby_event_store/spec/subscriptions_lint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ def call(event)
subscriptions.add_subscription(handler, [Test1DomainEvent, Test3DomainEvent])
subscriptions.add_subscription(another_handler, [Test2DomainEvent])
subscriptions.add_global_subscription(global_handler)

expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler, global_handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([another_handler, global_handler])
expect(subscriptions.all_for('Test3DomainEvent')).to eq([handler, global_handler])
expect(subscriptions.all_for('Test1DomainEvent').map(&:subscriber)).to eq([handler, global_handler])
expect(subscriptions.all_for('Test2DomainEvent').map(&:subscriber)).to eq([another_handler, global_handler])
expect(subscriptions.all_for('Test3DomainEvent').map(&:subscriber)).to eq([handler, global_handler])
end

it 'returns subscribed thread handlers' do
Expand All @@ -40,9 +39,9 @@ def call(event)
subscriptions.add_thread_subscription(another_handler, [Test2DomainEvent])
subscriptions.add_thread_global_subscription(global_handler)

expect(subscriptions.all_for('Test1DomainEvent')).to eq([global_handler, handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([global_handler, another_handler])
expect(subscriptions.all_for('Test3DomainEvent')).to eq([global_handler, handler])
expect(subscriptions.all_for('Test1DomainEvent').map(&:subscriber)).to eq([global_handler, handler])
expect(subscriptions.all_for('Test2DomainEvent').map(&:subscriber)).to eq([global_handler, another_handler])
expect(subscriptions.all_for('Test3DomainEvent').map(&:subscriber)).to eq([global_handler, handler])
end

it 'returns lambda as an output of global subscribe methods' do
Expand All @@ -60,43 +59,43 @@ def call(event)
it 'revokes global subscription' do
handler = TestHandler.new

revoke = subscriptions.add_global_subscription(handler)
expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler])
revoke.()
subscription = subscriptions.add_global_subscription(handler)
expect(subscriptions.all_for('Test1DomainEvent').map(&:subscriber)).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent').map(&:subscriber)).to eq([handler])
subscription.unsubscribe
expect(subscriptions.all_for('Test1DomainEvent')).to eq([])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([])
end

it 'revokes subscription' do
handler = TestHandler.new

revoke = subscriptions.add_subscription(handler, [Test1DomainEvent, Test2DomainEvent])
expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler])
revoke.()
subscription = subscriptions.add_subscription(handler, [Test1DomainEvent, Test2DomainEvent])
expect(subscriptions.all_for('Test1DomainEvent').map(&:subscriber)).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent').map(&:subscriber)).to eq([handler])
subscription.unsubscribe
expect(subscriptions.all_for('Test1DomainEvent')).to eq([])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([])
end

it 'revokes thread global subscription' do
handler = TestHandler.new

revoke = subscriptions.add_thread_global_subscription(handler)
expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler])
revoke.()
subscription = subscriptions.add_thread_global_subscription(handler)
expect(subscriptions.all_for('Test1DomainEvent').map(&:subscriber)).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent').map(&:subscriber)).to eq([handler])
subscription.unsubscribe
expect(subscriptions.all_for('Test1DomainEvent')).to eq([])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([])
end

it 'revokes thread subscription' do
handler = TestHandler.new

revoke = subscriptions.add_thread_subscription(handler, [Test1DomainEvent, Test2DomainEvent])
expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler])
revoke.()
subscription = subscriptions.add_thread_subscription(handler, [Test1DomainEvent, Test2DomainEvent])
expect(subscriptions.all_for('Test1DomainEvent').map(&:subscriber)).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent').map(&:subscriber)).to eq([handler])
subscription.unsubscribe
expect(subscriptions.all_for('Test1DomainEvent')).to eq([])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([])
end
Expand All @@ -106,6 +105,6 @@ def call(event)
subscriptions.add_subscription(handler, ["Test1DomainEvent"])
subscriptions.add_thread_subscription(handler, ["Test1DomainEvent"])

expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler, handler])
expect(subscriptions.all_for('Test1DomainEvent').map(&:subscriber)).to eq([handler, handler])
end
end
Loading