Skip to content

Commit

Permalink
Merge pull request #1008 from SeWaS/issue/919/snapshot-upgrader-metadata
Browse files Browse the repository at this point in the history
Issue #919: Enable IEventStore to load events up to a given sequence number
  • Loading branch information
rasmus committed Mar 12, 2024
2 parents 8e25a04 + 8a3625b commit 1df3e47
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 5 deletions.
11 changes: 11 additions & 0 deletions Source/EventFlow.MongoDB/EventStore/MongoDbEventPersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEvent
.ConfigureAwait(continueOnCapturedContext: false);
}

public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(IIdentity id, int fromEventSequenceNumber, int toEventSequenceNumber,
CancellationToken cancellationToken)
{
return await MongoDbEventStoreCollection
.Find(model => model.AggregateId == id.Value &&
model.AggregateSequenceNumber >= fromEventSequenceNumber &&
model.AggregateSequenceNumber <= toEventSequenceNumber)
.ToListAsync(cancellationToken)
.ConfigureAwait(continueOnCapturedContext: false);
}

public async Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken)
{
DeleteResult affectedRows = await MongoDbEventStoreCollection
Expand Down
31 changes: 31 additions & 0 deletions Source/EventFlow.MsSql/EventStores/MsSqlEventPersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,37 @@ ORDER BY
return eventDataModels;
}

public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
IIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken)
{
const string sql = @"
SELECT
GlobalSequenceNumber, BatchId, AggregateId, AggregateName, Data, Metadata, AggregateSequenceNumber
FROM EventFlow
WHERE
AggregateId = @AggregateId AND
AggregateSequenceNumber >= @FromEventSequenceNumber AND
AggregateSequenceNumber <= @ToEventSequenceNumber
ORDER BY
AggregateSequenceNumber ASC";
var eventDataModels = await _connection.QueryAsync<EventDataModel>(
Label.Named("mssql-fetch-events"),
null,
cancellationToken,
sql,
new
{
AggregateId = id.Value,
FromEventSequenceNumber = fromEventSequenceNumber,
ToEventSequenceNumber = toEventSequenceNumber
})
.ConfigureAwait(false);
return eventDataModels;
}

public async Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken)
{
const string sql = @"DELETE FROM EventFlow WHERE AggregateId = @AggregateId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,38 @@ INSERT INTO
return eventDataModels;
}

public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
IIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken)
{
const string sql = @"
SELECT
GlobalSequenceNumber, BatchId, AggregateId, AggregateName, Data, Metadata, AggregateSequenceNumber
FROM EventFlow
WHERE
AggregateId = @AggregateId AND
AggregateSequenceNumber >= @FromEventSequenceNumber AND
AggregateSequenceNumber <= @ToSequenceNumber
ORDER BY
AggregateSequenceNumber ASC;";

var eventDataModels = await _connection.QueryAsync<EventDataModel>(
Label.Named("postgresql-fetch-events"),
null,
cancellationToken,
sql,
new
{
AggregateId = id.Value,
FromEventSequenceNumber = fromEventSequenceNumber,
ToSequenceNumber = toEventSequenceNumber
})
.ConfigureAwait(false);
return eventDataModels;
}

public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
IIdentity id,
int fromEventSequenceNumber,
Expand All @@ -184,6 +216,7 @@ FROM EventFlow
AggregateSequenceNumber >= @FromEventSequenceNumber
ORDER BY
AggregateSequenceNumber ASC;";

var eventDataModels = await _connection.QueryAsync<EventDataModel>(
Label.Named("postgresql-fetch-events"),
null,
Expand Down
16 changes: 16 additions & 0 deletions Source/EventFlow.TestHelpers/Suites/TestSuiteForEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,22 @@ public async Task LoadingOfEventsCanStartLater()
domainEvents.ElementAt(1).AggregateSequenceNumber.Should().Be(4);
domainEvents.ElementAt(2).AggregateSequenceNumber.Should().Be(5);
}

[Test]
public async Task LoadingOfEventsCanStartLaterAndStopEarlier()
{
// Arrange
var id = ThingyId.New;
await PublishPingCommandsAsync(id, 5);

// Act
var domainEvents = await EventStore.LoadEventsAsync<ThingyAggregate, ThingyId>(id, 3, 4, CancellationToken.None);

// Assert
domainEvents.Should().HaveCount(2);
domainEvents.ElementAt(0).AggregateSequenceNumber.Should().Be(3);
domainEvents.ElementAt(1).AggregateSequenceNumber.Should().Be(4);
}

[Test]
public async Task AggregateCanHaveMultipleCommits()
Expand Down
10 changes: 10 additions & 0 deletions Source/EventFlow.Tests/IntegrationTests/CancellationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,16 @@ public ManualEventPersistence(IEventPersistence inner)
return result;
}

public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(IIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken)
{
var result = await _inner.LoadCommittedEventsAsync(id, fromEventSequenceNumber, toEventSequenceNumber, cancellationToken);
await LoadCompletionSource.Task;
return result;
}

public Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken)
{
return _inner.DeleteEventsAsync(id, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,27 @@ public async Task Test_Arrange_EventStore(int eventInStore, int fromEventSequenc
// Assert
domainEvents.Should().HaveCount(expectedNumberOfEvents);
}

[Description("Mock test")]
[TestCase(5, 3, 5, 3)]
[TestCase(5, 0, 5, 5)]
[TestCase(5, 1, 2, 2)]
[TestCase(0, 1, 2, 0)]
public async Task Test_Arrange_EventStore_SequenceRange(int eventInStore, int fromEventSequenceNumber, int toEventSequenceNumber, int expectedNumberOfEvents)
{
// Arrange
Arrange_EventStore(ManyDomainEvents<ThingyPingEvent>(eventInStore));

// Act
var domainEvents = await _eventStoreMock.Object.LoadEventsAsync<ThingyAggregate, ThingyId>(
A<ThingyId>(),
fromEventSequenceNumber,
toEventSequenceNumber,
CancellationToken.None);

// Assert
domainEvents.Should().HaveCount(expectedNumberOfEvents);
}

private void Arrange_EventStore(IEnumerable<IDomainEvent<ThingyAggregate, ThingyId>> domainEvents)
{
Expand All @@ -123,6 +144,10 @@ private void Arrange_EventStore(IEnumerable<IDomainEvent<ThingyAggregate, Thingy
_eventStoreMock
.Setup(e => e.LoadEventsAsync<ThingyAggregate, ThingyId>(It.IsAny<ThingyId>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns<ThingyId, int, CancellationToken>((id, seq, c) => Task.FromResult<IReadOnlyCollection<IDomainEvent<ThingyAggregate, ThingyId>>>(domainEventList.Skip(Math.Max(seq - 1, 0)).ToList()));

_eventStoreMock
.Setup(e => e.LoadEventsAsync<ThingyAggregate, ThingyId>(It.IsAny<ThingyId>(), It.IsAny<int>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns<ThingyId, int, int, CancellationToken>((id, from, to, c) => Task.FromResult<IReadOnlyCollection<IDomainEvent<ThingyAggregate, ThingyId>>>(domainEventList.Take(to).Skip(Math.Max(from - 1, 0)).ToList()));
}

private void Arrange_Snapshot(ThingySnapshot thingySnapshot)
Expand Down
32 changes: 29 additions & 3 deletions Source/EventFlow/EventStores/EventStoreBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,25 @@ public class EventStoreBase : IEventStore
cancellationToken);
}

public async Task<IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity>>> LoadEventsAsync<TAggregate, TIdentity>(
TIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken) where TAggregate : IAggregateRoot<TIdentity> where TIdentity : IIdentity
{
if (fromEventSequenceNumber < 1) throw new ArgumentOutOfRangeException(nameof(fromEventSequenceNumber), "Event sequence numbers start at 1");
if (toEventSequenceNumber <= fromEventSequenceNumber) throw new ArgumentOutOfRangeException(nameof(toEventSequenceNumber), "Event sequence numbers end at start");

var committedDomainEvents = await _eventPersistence.LoadCommittedEventsAsync(
id,
fromEventSequenceNumber,
toEventSequenceNumber,
cancellationToken)
.ConfigureAwait(false);

return await MapToDomainEvents<TAggregate, TIdentity>(id, cancellationToken, committedDomainEvents);
}

public virtual async Task<IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity>>> LoadEventsAsync<TAggregate, TIdentity>(
TIdentity id,
int fromEventSequenceNumber,
Expand All @@ -164,10 +183,17 @@ public class EventStoreBase : IEventStore
if (fromEventSequenceNumber < 1) throw new ArgumentOutOfRangeException(nameof(fromEventSequenceNumber), "Event sequence numbers start at 1");

var committedDomainEvents = await _eventPersistence.LoadCommittedEventsAsync(
id,
fromEventSequenceNumber,
cancellationToken)
id,
fromEventSequenceNumber,
cancellationToken)
.ConfigureAwait(false);

return await MapToDomainEvents<TAggregate, TIdentity>(id, cancellationToken, committedDomainEvents);
}

private async Task<IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity>>> MapToDomainEvents<TAggregate, TIdentity>(TIdentity id, CancellationToken cancellationToken,
IReadOnlyCollection<ICommittedDomainEvent> committedDomainEvents) where TAggregate : IAggregateRoot<TIdentity> where TIdentity : IIdentity
{
var domainEvents = (IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity>>)committedDomainEvents
.Select(e => _eventJsonSerializer.Deserialize<TAggregate, TIdentity>(id, e))
.ToList();
Expand Down
25 changes: 25 additions & 0 deletions Source/EventFlow/EventStores/Files/FilesEventPersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,31 @@ private StreamWriter CreateNewTextFile(string path, FileEventData fileEventData)
}
}

public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
IIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken)
{
using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false))
{
var committedDomainEvents = new List<ICommittedDomainEvent>();
for (var i = fromEventSequenceNumber; i <= toEventSequenceNumber ; i++)
{
var eventPath = _filesEventLocator.GetEventPath(id, i);
if (!File.Exists(eventPath))
{
return committedDomainEvents;
}

var committedDomainEvent = await LoadFileEventDataFile(eventPath).ConfigureAwait(false);
committedDomainEvents.Add(committedDomainEvent);
}

return committedDomainEvents;
}
}

public async Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken)
{
_logger.LogTrace("Deleting entity with ID {EventFilePath}", id);
Expand Down
6 changes: 6 additions & 0 deletions Source/EventFlow/EventStores/IEventPersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public interface IEventPersistence
IIdentity id,
int fromEventSequenceNumber,
CancellationToken cancellationToken);

Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
IIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken);

Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken);
}
Expand Down
8 changes: 8 additions & 0 deletions Source/EventFlow/EventStores/IEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ public interface IEventStore
CancellationToken cancellationToken)
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity;

Task<IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity>>> LoadEventsAsync<TAggregate, TIdentity>(
TIdentity id,
int fromSequenceNumber,
int toSequenceNumber,
CancellationToken cancellationToken)
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity;

Task<IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity>>> LoadEventsAsync<TAggregate, TIdentity>(
TIdentity id,
Expand Down
24 changes: 22 additions & 2 deletions Source/EventFlow/EventStores/InMemory/InMemoryEventPersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,33 @@ IEnumerator IEnumerable.GetEnumerator()
IIdentity id,
int fromEventSequenceNumber,
CancellationToken cancellationToken)
{
return LoadCommittedEventsAsync(
id,
fromEventSequenceNumber,
e => e.AggregateSequenceNumber >= fromEventSequenceNumber);
}

public Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
IIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken)
{
return LoadCommittedEventsAsync(
id,
fromEventSequenceNumber,
e => e.AggregateSequenceNumber >= fromEventSequenceNumber && e.AggregateSequenceNumber <= toEventSequenceNumber);
}

private Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(IIdentity id,int fromEventSequenceNumber, Func<InMemoryCommittedDomainEvent, bool> filter)
{
IReadOnlyCollection<ICommittedDomainEvent> result;

if (_eventStore.TryGetValue(id.Value, out var committedDomainEvent))
result = fromEventSequenceNumber <= 1
? (IReadOnlyCollection<ICommittedDomainEvent>) committedDomainEvent
: committedDomainEvent.Where(e => e.AggregateSequenceNumber >= fromEventSequenceNumber).ToList();
? (IReadOnlyCollection<ICommittedDomainEvent>)committedDomainEvent
: committedDomainEvent.Where(filter).ToList();
else
result = new List<InMemoryCommittedDomainEvent>();

Expand Down

0 comments on commit 1df3e47

Please sign in to comment.