Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AggregateRoot versioning causes ConcurrencyException #34

Closed
andrewmurray opened this issue Jun 28, 2017 · 5 comments
Closed

AggregateRoot versioning causes ConcurrencyException #34

andrewmurray opened this issue Jun 28, 2017 · 5 comments

Comments

@andrewmurray
Copy link

andrewmurray commented Jun 28, 2017

Completely open to the fact that this might just be my code here as I'm new to CqrsLite and the Cqrs concept in general.

I'm using event store (https://geteventstore.com/) to persist the change events to my aggregate root. My aggregate is that of a Booking, containing date, time, customer, etc. I have the approach of creating a stream in event store per aggregate instance, meaning a stream only includes events related to a single Booking. When a stream doesn't exist, event store reports the expected version as -1, so when I create my entity, I have to set version as -1 in the Apply method for my BookingCreatedEvent (ultimately called by the Booking constructor). As long as I specify -1 as the version, the creation event is persisted to event store.

I have an issue now where I am trying to persist a BookingDateTimeUpdated event, which should result in a version of 1. The steam already exists and the version of that stream is 0. The aggregate is loaded with version 0 as expected, this seems to be entered into the _trackedAggregates (of the Session object), with the version as it was when loaded.

So, all good til now. Now I want to apply the BookingDateTimeUpdated event, so I do that and when it comes to save the event, in the Repository, the save method first does this:

if (expectedVersion != null && (await _eventStore.Get(aggregate.Id, expectedVersion.Value)).Any())
{
    throw new ConcurrencyException(aggregate.Id);
}

My understanding of that bit of code is that; it's looking to see if there is already a version of that stream equal to that which we are trying to save, meaning that we didn't have the full state when our event was applied to the aggregate, so yeah, throw an exception to prevent corrupting the data.

The expected version comes down from the _trackedAggregates of Session as 0 (that's what was set at load time) and because version 0 already exists, a ConcurrencyException is thrown. I would expect the version that we are trying to save after a new event is applied to be version 1, not 0.

I can't work out if this is a bug in CqrsLite, that the expected version that Repository.Save uses should come from somewhere else or if I'm not setting things up correctly.

@gautema
Copy link
Owner

gautema commented Jun 28, 2017

What does _eventStore.Get return in the method you are asking for and what is the input? The implementation in the sample is like this events?.Where(x => x.Version > fromVersion) ?? new List<IEvent>(), so it only returns the events saved after the current version.

@andrewmurray
Copy link
Author

OK, so IEventStore.Get, in the sample, only loads those events that haven't already been loaded?

My implementation of IEventStore.Get just now reads all events related to the stream for that aggregate:

        public async Task<IEnumerable<IEvent>> Get(Guid aggregateId, int fromVersion)
        {
            var streamEvents = new List<ResolvedEvent>();

            StreamEventsSlice currentSlice;
            var nextSliceStart = (long)StreamPosition.Start;
            do
            {
                currentSlice = await _connection.ReadStreamEventsForwardAsync(
                    StreamName($"booking-{aggregateId}"), nextSliceStart, 200, false);

                nextSliceStart = currentSlice.NextEventNumber;

                streamEvents.AddRange(currentSlice.Events);
            } while (!currentSlice.IsEndOfStream);

            return streamEvents.Select(e => (BaseEvent)RebuildEvent(e));
        }

This is pretty much just a lift and drop from the Event Store documentation (http://docs.geteventstore.com/dotnet-api/4.0.0/reading-events/).

The input for fromVersion will be 0 given that that's the expected version.

@gautema
Copy link
Owner

gautema commented Jun 28, 2017

Can you try to just add .Where(x => x.Version > fromVersion) to the bottom of your method?

@andrewmurray
Copy link
Author

Yes I can! That fixes it.

Ok, understanding adjusted I think. IEventStore.Get MUST only return those events that have a version greater than that passed in. I was going on the assumption that I could ignore the fromVersion for the time being assuming it was just a convenience provided to improve performance, but it's actually linked to the successful saving of the new events.

Thanks for your help and the prompt replies!

@gautema
Copy link
Owner

gautema commented Jun 28, 2017

Yes, that's the way it is now. It could possibly be moved into the repository as well, but where put there for performance reasons, so the eventstore would not necessarily need to load all events every time.

@gautema gautema closed this as completed Jun 28, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants