Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion railseventstore.org/source/docs/correlation_causation.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,37 @@ new_event.metadata[:correlation_id]
new_event.metadata[:causation_id]
```

This is however not necessary for sync handlers. Events published from sync handlers are by default correlated with events that caused them.

## Correlating events published from async handlers

Events published from async handlers are not correlated with events that caused them by default. To enable that functionality you need to prepend `RailsEventStore::CorrelatedHandler`

```ruby
class SendOrderEmail < ActiveJob::Base
prepend RailsEventStore::CorrelatedHandler
prepend RailsEventStore::AsyncHandler

def perform(event)
event_store.publish(HappenedLater.new(data:{
user_id: event.data.fetch(:user_id),
}))
end

private

def event_store
Rails.configuration.event_store
end
end
```

## Correlating an event with a command

If your command responds to `correlation_id` (can even always be `nil`) and `message_id` you can correlate your events also with commands.

```ruby
class ApproveOrder = < Struct.new(:order_id, :message_id, :correlation_id)
class ApproveOrder < Struct.new(:order_id, :message_id, :correlation_id)
end

command = ApproveOrder.new("KTXBN123", SecureRandom.uuid, nil)
Expand Down Expand Up @@ -126,6 +151,39 @@ class AddProductCommand < Struct.new(:message_id, :product_id)
end
```

## Building streams based on correlation id and causation id

You can use `RailsEventStore::LinkByCorrelationId` (`RubyEventStore::LinkByCorrelationId`) and `RailsEventStore::LinkByCausationId` (`RubyEventStore::LinkByCausationId`) to build streams of all events with certain correlation or causation id. This makes debugging and making sense of a large process easier to see.

```ruby
Rails.application.configure do
config.to_prepare do
Rails.configuration.event_store = event_store = RailsEventStore::Client.new
event_store.subscribe_to_all_events(RailsEventStore::LinkByCorrelationId.new)
event_store.subscribe_to_all_events(RailsEventStore::LinkByCausationId.new)
end
end
```

After publishing an event:

```ruby
event = OrderPlaced.new
event_store.publish(event)
```

you can read events caused by it:

```ruby
event_store.read.stream("$by_causation_id_#{event.event_id}")
```

and events correlated with it:

```ruby
event_store.read.stream("$by_correlation_id_#{event.correlation_id || event.event_id}")
```

## Thanks

Image thanks to [Arkency blog](https://blog.arkency.com/correlation-id-and-causation-id-in-evented-systems/)
10 changes: 8 additions & 2 deletions railseventstore.org/source/docs/protobuf.html.md.erb
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,16 @@ event_store.subscribe(->(ev){ }, to: [MyApp::OrderPlaced.descriptor.name])
class SendOrderEmailHandler < ActiveJob::Base
self.queue_adapter = :inline

def perform(event)
event = YAML.load(event)
def perform(payload)
event = event_store.deserialize(payload)
# do something
end

private

def event_store
Rails.configuration.event_store
end
end

event_store.subscribe(SendOrderEmailHandler, to: [MyApp::OrderPlaced.descriptor.name])
Expand Down
27 changes: 25 additions & 2 deletions railseventstore.org/source/docs/subscribe.html.md.erb
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,30 @@ Async handlers are just background jobs implemented with `ActiveJob`.

```ruby
class SendOrderEmail < ActiveJob::Base
def perform(payload)
event = event_store.deserialize(payload)
email = event.data.fetch(:customer_email)
OrderMailer.notify_customer(email).deliver_now!
end

private

def event_store
Rails.configuration.event_store
end
end

event_store = RailsEventStore::Client.new
event_store.subscribe(SendOrderEmail, to: [OrderPlaced])
```

You can also use `RailsEventStore::AsyncHandler` module that will deserialize the event for you:

```ruby
class SendOrderEmail < ActiveJob::Base
prepend RailsEventStore::AsyncHandler

def perform(event)
event = YAML.load(event)
email = event.data.fetch(:customer_email)
OrderMailer.notify_customer(email).deliver_now!
end
Expand Down Expand Up @@ -349,8 +371,9 @@ You can configure your dispatcher slightly different, to schedule async handlers

```ruby
class SendOrderEmail < ActiveJob::Base
prepend RailsEventStore::AsyncHandler

def perform(event)
event = YAML.load(event)
email = event.data.fetch(:customer_email)
OrderMailer.notify_customer(email).deliver_now!
end
Expand Down