Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get uncommitted events without empty the aggregate #441

Open
matiux opened this issue Jun 23, 2023 · 4 comments
Open

Get uncommitted events without empty the aggregate #441

matiux opened this issue Jun 23, 2023 · 4 comments
Labels

Comments

@matiux
Copy link
Contributor

matiux commented Jun 23, 2023

Sometimes there is a need to obtain the uncommitted events from the aggregate without emptying it. Today, the implementation of the EventSourcedAggregateRoot::getUncommittedEvents() method, works by emptying the array:

public function getUncommittedEvents(): DomainEventStream
{
    $stream = new DomainEventStream($this->uncommittedEvents);

    $this->uncommittedEvents = [];

    return $stream;
}

A current scenario where I need to obtain the event stream is in the implementation of the outbox pattern, in which I need to send the events to a db recording system and then send them to an external microservice. Let's consider this example:

class Users extends EventSourcingRepository {

    public function __construct(EventStore $eventStore, EventBus $eventBus, private EventOutboxer $outboxer)
    {
        parent::__construct(
            $eventStore,
            $eventBus,
            User::class,
            new PublicConstructorAggregateFactory(),
        );
    }

   public function add(User $user): void {

      $events = array_map(
         fn(DomainMessage $dm) => $dm->getPayload(),
         iterator_to_array($user->getUncommittedEvents()),
      );

      parent::save($user);

      $this->outboxer->dispatch($events);

   }
}

I need to dispatch the events after saving to be sure that saving has worked well, as well as all projectors have done their job. However, this snippet of code cannot run because the call to $user->getUncommittedEvents() will empty the events inside the aggregates.

So, I wonder if this kind of behavior is mandatory. Or if you have ever thought about a different approach.

I have considered two possible solutions without modifying the current implementation:

Solution 1
Using PublicConstructorAggregateFactory to recreate aggregate passing to it the extracted event stream:

public function add(User $user): void
{
    $ue = $user->getUncommittedEvents();
    
    $recreatedUser = $this->publicConstructorAggregateFactory->create(
        User::class,
        $ue
    );

    parent::save($recreatedUser);
    
    $this->outboxer->dispatch(
        array_map(
            fn(DomainMessage $dm) => $dm->getPayload(),
            iterator_to_array($ue),
        )
    );
}

Solution 2
Implement the outbox pattern through a traditional Broadway\EventHandling\EventListener but this solution has a big problem: I can't guarantee that the "outbox event listener" be performed last. So this solution is automatically trashed.

What do you think about my situation? Can I use the solution 1? Or can we do something in the current implementation?

@wjzijderveld
Copy link
Member

In my opinion neither should be preferred. For a couple reasons:

  1. When you're performing an action on your entity, it should be an atomic entity. So if you're persisting the action (the events), there shouldn't be anything left to do for your entity. The internal state should already be updated, so if you would want to expose that to return a response or something, you could.
  2. Your domain events are internal to your bounded context there. If you want to let other bounded contexts know that something have change, you could for sure use the outbox pattern for that, but I would strongly recommend putting an anti-corruption layer in between and to not directly tie it to the persistence of your domain model. Of course there are situations where your context determines the language, and thus that your events will be leading and won't need translation, but personally I would still avoid coupling that directly to the persistence of your model itself.
  3. The dependency on "all projectors should have done their job" sounds a bit sketchy to me, it sounds like you're trying to avoid dealing with eventual consistency. Although understandable you're trying to minimize it, there's no way to actually guarantee that. For example, there's still a gap between the persistence of the model (and thus other requests can already read that) and the projectors finishing.

That being said, if you really feel like you need to be able to read the events, it should be pretty trivial to maybe add an interface to your app, something like CanPeekUncommittedEvents with an implementation that simply returns the events and put that on your aggregates.

@matiux
Copy link
Contributor Author

matiux commented Jun 23, 2023

Thanks @wjzijderveld

  1. Sure, in fact I want expose occurred action from bc A to bc B after to have persisted the aggregate state.
  2. I have an anti-corruption layer, of course, but for the purpose of my doubt, it wasn't of interest to speak about it. In my implementation of the outbox pattern, I perform a transformation from domain events to integration events.
  3. I'm certainly considering the eventual consistency. I have written, based on this reasoning, 'So, this solution is automatically discarded.'

But that being said, to implement outbox pattern:

  1. must do it after the state of my aggregate has been persisted.
  2. I can't rely on projection, of corse

Solution 3
Another idea that comes to mind is to rely on the event store table. I could periodically scan the table, keeping track of the last event id and bring new events to my outbox pattern

  1. transformation from domain event to integration event through the anti corruption layer
  2. persistence in a notification table
  3. async sending, maybe through another cron

@wjzijderveld
Copy link
Member

I'm still not sure why projectors are relevant here. If your outbox happens first or last in the event listener chain shouldn't make any difference right?

Your 3rd solution sounds like a good option for what you want, besides a bit convoluted for what you need.

Two other options to consider: wrapping either the event store or the event bus and add your outbox logic in there.
With the event store you could even put it in a single (nested) transaction if you require that kind of consistency.
And as you mentioned "after projectors" a couple times, with the event bus option you could guarantee to execute your outbox logic to happen after the regular event bus has finished.

@stale
Copy link

stale bot commented Aug 12, 2023

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Aug 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants