Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish a DomainEventMessage outside of an aggregate #510

Closed
RobWin opened this issue Feb 20, 2018 · 7 comments
Closed

Publish a DomainEventMessage outside of an aggregate #510

RobWin opened this issue Feb 20, 2018 · 7 comments

Comments

@RobWin
Copy link

RobWin commented Feb 20, 2018

Hello,

it's more a question or feature request than an issue.
I'm working in the Smart Home/IoT-domain and our system consumes events (messages) from gateways/devices via AMQP. The events represent domain events and we want to store them in the event store (keyword: event sourcing). Gateways/devices are our aggregate roots and we want to able to reconstruct their state based on events (keyword: digital twins).
I need a way to apply a GenericDomainEventMessage outside of an aggregate. The simplest solution would be to to use the EventBus to publish the GenericDomainEventMessage. The Event Message could have a field with an @TargetAggregateIdentifier annotation. The EventBus then knows how to set the aggregateIdentifier in the event store and how to increase the sequenceNumber.

What do you think?

@smcvb
Copy link
Member

smcvb commented Feb 20, 2018

Hi @RobWin,

Interesting use case you've got there. Typically I'd not be overly enthusiastic to let outside sources influence your aggregates, but this seems a little different from the other use cases I've come across.
I've discussed it with @abuijze and we've not reached the point that it should be a feature in the framework yet, but created this yourself should be fairly easy.

So, what I'd suggest to do is the following:

  • You'll need the latest sequence number to set your domain events at the right spot in the store. To grab the latest sequence id for a given aggregate, you can use the EventStore#lastSequenceNumberFor(String aggregateIdentifier).
  • To instantiate the GenericDomainEventMessage you're going to append you can just call the constructor of the GenericDomainEventMessage, providing the aggregate type, the aggregateIdentifier (which you've previously used to pull the sequence number), the sequenceNumber you've retrieved in the previous operation and the payload, being the event you've received. You can additionally provide you own meta data as a Map, which could contain the correlation id of the AMQP message, a message identifier and a timestamp. I'd suggest to leave the latter two to the GenericDomainEventMessage constructor, but you could provide your own if necessary.
  • After you've created your GenericDomainEventMessage, you can call the EventBus#publish(List<? extends EventMessage<?>> events) function to publish your event. If you're using the EmbeddedEventStore (which I guess you will), the event will be stored, and after that handled by any Event Listeners you have.

The only caveat to this is that the EventStore#lastSequenceNumberFor(String aggregateIdentifier) is not in a release yet. It's already part of 3.2 though, which should be released somewhere end of month.

Would this solution work out for you @RobWin?

@RobWin
Copy link
Author

RobWin commented Feb 20, 2018

Yes, but how can I make sure that the sequenceNumber is not increased in parallel by another node/thread?
I need a way to increase the sequenceNumer atomically so that this number can solely be used by my thread.
I assume EventStore#lastSequenceNumberFor does not increase the sequence number for me?

@RobWin
Copy link
Author

RobWin commented Feb 20, 2018

What is the reason that EventStore#publish does not increase the sequenceNumber?
As a User of an API I don't want to handle incremental sequence numbers :(

@RobWin
Copy link
Author

RobWin commented Feb 20, 2018

Since atomic counters (sequence numbers) in distributed systems are slow and difficult to implement, wouldn't it be possible that EventStore#readEvents(String aggregateIdentifier) orders events by creation timestamp and uses the event identifier or hash of the timestamp as an offset (sequence number)?

Isn't the sequence number anway an implementation detail of the Eventstore? The event producer should not know anything about this. Only event consumers are interested in the offset.

@abuijze
Copy link
Member

abuijze commented Feb 20, 2018

Hi Robert,

I think you may be mixing up two different things here. The sequence number is used by an Aggregate as it emits events to ensure its internal consistency. It is not used by the event store to order events globally. Since an Aggregate is a unit of consistency (by definition of Domain Driven Design), there shouldn't normally be any conflicts here. An Aggregate is responsible for maintaining its own state.

Consumers don't need this sequence number. They either subscribe to Events (using a SubscribingEventProcessor) or they pull Events in (using a TrackingEventProcessor). In the latter case, they are ordered as much as possible in the order as they were published, depending on what the underlying implementation is able to do. This ordering has nothing to do with the sequence number.

You said "As a User of an API I don't want to handle incremental sequence numbers :(". Good news: you don't, normally. If you implement an aggregate, all you need to do is apply() your events, and Axon will update your sequence number for that specific Aggregate automatically.
If you don't use Aggregates, but just have a Command handling component that emits events, then you probably don't need sequence numbers at all. These are only necessary if you have Aggregates you want to source from its past events.
If you do want to be able to track events to a specific (conceptual) Aggregate, but without using an Aggregate itself, then as of Axon 3.2, it will be possible to retrieve the last used sequence number for that Aggregate. Again, there shouldn't be any conflicts here, because the Aggregate should not be concurrently accessed. If it is, then you should be glad a Duplicate Key conflict (or whatever the underlying store reports) is thrown to notify you of a potential consistency breach.

Hope this clarifies things.

@RobWin
Copy link
Author

RobWin commented Feb 20, 2018

Hi,

thank you for your clarification.

Our scenario looks as follows.
A client sends a command to a real device and the real device emits an event when the command has been processed. We want to update the cloud representation (digital twin) of the device.

I guess I have to translate incoming event messages from devices into commands which are handled by a device aggregate (digital twin) which itself emits/applies a similar internal event. It's like simulating the real communication. Seems fine to me.

@abuijze abuijze closed this as completed Mar 5, 2018
@metalpalo
Copy link

Hi guys

I think that I have similar problem.

Concretely I use sagas to handle payment transaction. My scenario is following:

  1. Clients send starts/end payment commands PaymentStartCommad and PaymentEndCommad to command handler what is same class as my aggregate BankAccount
    with aggregate identifier uid
  2. Command handler (within my aggregate) just applies PaymentStartedEvent and PaymentEndEvent to saga instance
  3. Saga handles PaymentStartedEvent(@EndSaga) by way that schedules other event PaymentExpiredEvent(after 30 seconds) that something wrong and payment is aborted
  4. Saga also handles PaymentEndEvent(@EndSaga) and cancel scheduled event PaymentExpiredEvent

When PaymentExpiredEvent occurs it is stored into db where event identifier is same as aggregate identifier column . That means that such event can not be used during event sourcing of my aggregate.

Q: Am I right? Is this correct behaviour?

If so, what I can do as RobWin mentioned is to trigger another PaymentExpiredCommand from saga and create similar ExpiredEvent which will be joind with aggregate. But I want to avoid this if possible.

Q: What can I do?

thank you very much

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants