Skip to content

Commit

Permalink
Aggregates can now be deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
rasmus committed May 22, 2015
1 parent 173b579 commit 2fc8011
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 0 deletions.
3 changes: 3 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
- `IDomainEvent.GlobalSequenceNumber` removed
- `IEventStore.LoadEventsAsync` and `IEventStore.LoadEvents` taking
a `GlobalSequenceNumberRange` removed
* New: `IEventStore.DeleteAggregateAsync` to delete an entire aggregate
stream. Please consider carefully if you really want to use it. Storage
might be cheaper than the historic knowledge within your events
* Fixed: `AggregateRoot<>` now reads the aggregate version from
domain events applied during aggregate load. This resolves an issue
for when an `IEventUpgrader` removed events from the event stream
Expand Down
19 changes: 19 additions & 0 deletions Source/EventFlow.EventStores.MsSql/MssqlEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,5 +165,24 @@ ORDER BY
.ConfigureAwait(false);
return eventDataModels;
}

public override async Task DeleteAggregateAsync<TAggregate, TIdentity>(
TIdentity id,
CancellationToken cancellationToken)
{
const string sql = @"DELETE FROM EventFlow WHERE AggregateId = @AggregateId";
var affectedRows = await _connection.ExecuteAsync(
Label.Named("mssql-delete-aggregate"),
cancellationToken,
sql,
new {AggregateId = id.Value})
.ConfigureAwait(false);

Log.Verbose(
"Deleted aggregate '{0}' with ID '{1}' by deleting all of its {2} events",
typeof(TAggregate).Name,
id,
affectedRows);
}
}
}
24 changes: 24 additions & 0 deletions Source/EventFlow.TestHelpers/Suites/EventStoreSuite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,30 @@ public async Task AggregateEventStreamsAreSeperate()
aggregate2.Version.Should().Be(2);
}

[Test]
public async Task AggregateEventStreamsCanBeDeleted()
{
// Arrange
var id1 = TestId.New;
var id2 = TestId.New;
var aggregate1 = await EventStore.LoadAggregateAsync<TestAggregate, TestId>(id1, CancellationToken.None).ConfigureAwait(false);
var aggregate2 = await EventStore.LoadAggregateAsync<TestAggregate, TestId>(id2, CancellationToken.None).ConfigureAwait(false);
aggregate1.Ping(PingId.New);
aggregate2.Ping(PingId.New);
aggregate2.Ping(PingId.New);
await aggregate1.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false);
await aggregate2.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false);

// Act
await EventStore.DeleteAggregateAsync<TestAggregate, TestId>(id2, CancellationToken.None).ConfigureAwait(false);

// Assert
aggregate1 = await EventStore.LoadAggregateAsync<TestAggregate, TestId>(id1, CancellationToken.None).ConfigureAwait(false);
aggregate2 = await EventStore.LoadAggregateAsync<TestAggregate, TestId>(id2, CancellationToken.None).ConfigureAwait(false);
aggregate1.Version.Should().Be(1);
aggregate2.Version.Should().Be(0);
}

[Test]
public async Task NoEventsEmittedIsOk()
{
Expand Down
6 changes: 6 additions & 0 deletions Source/EventFlow/EventStores/EventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,5 +214,11 @@ public abstract class EventStore : IEventStore
}
return aggregate;
}

public abstract Task DeleteAggregateAsync<TAggregate, TIdentity>(
TIdentity id,
CancellationToken cancellationToken)
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity;
}
}
14 changes: 14 additions & 0 deletions Source/EventFlow/EventStores/Files/FilesEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,20 @@ public class FileEventData : ICommittedDomainEvent
}
}

public override Task DeleteAggregateAsync<TAggregate, TIdentity>(
TIdentity id,
CancellationToken cancellationToken)
{
var aggregateType = typeof (TAggregate);
Log.Verbose(
"Deleting aggregate '{0}' with ID '{1}'",
aggregateType.Name,
id);
var path = GetAggregatePath(aggregateType, id);
Directory.Delete(path, true);
return Task.FromResult(0);
}

private async Task<FileEventData> LoadFileEventDataFile(string eventPath)
{
using (var streamReader = File.OpenText(eventPath))
Expand Down
6 changes: 6 additions & 0 deletions Source/EventFlow/EventStores/IEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,11 @@ public interface IEventStore
CancellationToken cancellationToken)
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity;

Task DeleteAggregateAsync<TAggregate, TIdentity>(
TIdentity id,
CancellationToken cancellationToken)
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity;
}
}
18 changes: 18 additions & 0 deletions Source/EventFlow/EventStores/InMemory/InMemoryEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,24 @@ public override string ToString()
}
}

public override Task DeleteAggregateAsync<TAggregate, TIdentity>(
TIdentity id,
CancellationToken cancellationToken)
{
if (_eventStore.ContainsKey(id.Value))
{
List<ICommittedDomainEvent> committedDomainEvents;
_eventStore.TryRemove(id.Value, out committedDomainEvents);
Log.Verbose(
"Deleted aggregate '{0}' with ID '{1}' by deleting all of its {2} events",
typeof(TAggregate).Name,
id,
committedDomainEvents.Count);
}

return Task.FromResult(0);
}

public void Dispose()
{
_asyncLock.Dispose();
Expand Down

0 comments on commit 2fc8011

Please sign in to comment.