Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
rasmus committed May 29, 2015
2 parents fa87cc5 + 7b5cf02 commit 42509e5
Show file tree
Hide file tree
Showing 84 changed files with 1,785 additions and 1,210 deletions.
9 changes: 9 additions & 0 deletions Documentation/EventUpgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,20 @@ EventFlow event upgraders are invoked whenever the event stream is loaded from
the event store. Each event upgrader receives the entire event stream one event
at a time.

A new instance of a event upgrader is created each time an aggregate is loaded.
This enables you to store information from previous events on the upgrader
instance to be used later, e.g. to determine an action to take on a event
or provide additional information for a new event.

Note that the _ordering_ of event upgraders is important as you might implement
two upgraders, one upgrade a event from V1 to V2 and then another upgrading V2
to V3. EventFlow orders the event upgraders by name before starting the event
upgrade.

**Be careful** if working with event upgraders that return zero or more than one
event, as this have an influence on the aggregate version and you need to make
sure that the aggregate sequence number on upgraded events have a valid value.

## Example - removing a damaged event

To remove an event, simply check and only return the event if its no the event
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,4 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
```

42 changes: 41 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,44 @@
### New in 0.7 (not released yet)
### New in 0.8 (not released yet)

* Breaking: Remove _all_ functionality related to global sequence
numbers as it proved problematic to maintain. It also matches this
quote:

> Order is only assured per a handler within an aggregate root
> boundary. There is no assurance of order between handlers or
> between aggregates. Trying to provide those things leads to
> the dark side.
>> Greg Young
- If you use a MSSQL read store, be sure to delete the
`LastGlobalSequenceNumber` column during update, or set it to
default `NULL`
- `IDomainEvent.GlobalSequenceNumber` removed
- `IEventStore.LoadEventsAsync` and `IEventStore.LoadEvents` taking
a `GlobalSequenceNumberRange` removed
* Breaking: Remove the concept of event caches. If you really need this
then implement it by registering a decorator for `IEventStore`
* Breaking: Moved `IDomainEvent.BatchId` to metadata and created
`MetadataKeys.BatchId` to help access it
* New: `IEventStore.DeleteAggregateAsync` to delete an entire aggregate
stream. Please consider carefully if you really want to use it. Storage
might be cheaper than the historic knowledge within your events
* New: `IReadModelPopulator` is new and enables you to both purge and
populate read models by going though the entire event store. Currently
its only basic functionality, but more will be added
* New: `IEventStore` now has `LoadAllEventsAsync` and `LoadAllEvents` that
enables you to load all events in the event store a few at a time.
* New: `IMetadata.TimestampEpoch` contains the Unix timestamp version
of `IMetadata.Timestamp`. Also, an additional metadata key
`timestamp_epoch` is added to events containing the same data. Note,
the `TimestampEpoch` on `IMetadata` handles cases in which the
`timestamp_epoch` is not present by using the existing timestamp
* Fixed: `AggregateRoot<>` now reads the aggregate version from
domain events applied during aggregate load. This resolves an issue
for when an `IEventUpgrader` removed events from the event stream
* Fixed: `InMemoryReadModelStore<,>` is now thread safe

### New in 0.7.481 (released 2015-05-22)

* New: EventFlow now includes a `IQueryProcessor` that enables you to implement
queries and query handlers in a structure manner. EventFlow ships with two
Expand Down
71 changes: 46 additions & 25 deletions Source/EventFlow.EventStores.MsSql/MssqlEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
using System.Threading.Tasks;
using EventFlow.Aggregates;
using EventFlow.Core;
using EventFlow.EventCaches;
using EventFlow.Exceptions;
using EventFlow.Logs;
using EventFlow.MsSql;
Expand Down Expand Up @@ -56,13 +55,43 @@ public class EventDataModel : ICommittedDomainEvent
IEventJsonSerializer eventJsonSerializer,
IEventUpgradeManager eventUpgradeManager,
IEnumerable<IMetadataProvider> metadataProviders,
IEventCache eventCache,
IMsSqlConnection connection)
: base(log, aggregateFactory, eventJsonSerializer, eventCache, eventUpgradeManager, metadataProviders)
: base(log, aggregateFactory, eventJsonSerializer, eventUpgradeManager, metadataProviders)
{
_connection = connection;
}

protected override async Task<AllCommittedEventsPage> LoadAllCommittedDomainEvents(
long startPostion,
long endPosition,
CancellationToken cancellationToken)
{
const string sql = @"
SELECT
GlobalSequenceNumber, BatchId, AggregateId, AggregateName, Data, Metadata, AggregateSequenceNumber
FROM EventFlow
WHERE
GlobalSequenceNumber >= @FromId AND GlobalSequenceNumber <= @ToId
ORDER BY
GlobalSequenceNumber ASC";
var eventDataModels = await _connection.QueryAsync<EventDataModel>(
Label.Named("mssql-fetch-events"),
cancellationToken,
sql,
new
{
FromId = startPostion,
ToId = endPosition,
})
.ConfigureAwait(false);

var nextPosition = eventDataModels.Any()
? eventDataModels.Max(e => e.GlobalSequenceNumber) + 1
: startPostion;

return new AllCommittedEventsPage(nextPosition, eventDataModels);
}

protected override async Task<IReadOnlyCollection<ICommittedDomainEvent>> CommitEventsAsync<TAggregate, TIdentity>(
TIdentity id,
IReadOnlyCollection<SerializedEvent> serializedEvents,
Expand All @@ -73,15 +102,13 @@ public class EventDataModel : ICommittedDomainEvent
return new ICommittedDomainEvent[] {};
}

var batchId = Guid.NewGuid();
var aggregateType = typeof(TAggregate);
var aggregateName = aggregateType.Name.Replace("Aggregate", string.Empty);
var eventDataModels = serializedEvents
.Select((e, i) => new EventDataModel
{
AggregateId = id.Value,
AggregateName = aggregateName,
BatchId = batchId,
AggregateName = e.Metadata[MetadataKeys.AggregateName],
BatchId = Guid.Parse(e.Metadata[MetadataKeys.BatchId]),
Data = e.Data,
Metadata = e.Meta,
AggregateSequenceNumber = e.AggregateSequenceNumber,
Expand Down Expand Up @@ -166,29 +193,23 @@ ORDER BY
return eventDataModels;
}

protected override async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
GlobalSequenceNumberRange globalSequenceNumberRange,
public override async Task DeleteAggregateAsync<TAggregate, TIdentity>(
TIdentity id,
CancellationToken cancellationToken)
{
const string sql = @"
SELECT
GlobalSequenceNumber, BatchId, AggregateId, AggregateName, Data, Metadata, AggregateSequenceNumber
FROM EventFlow
WHERE
GlobalSequenceNumber >= @FromId AND GlobalSequenceNumber <= @ToId
ORDER BY
GlobalSequenceNumber ASC";
var eventDataModels = await _connection.QueryAsync<EventDataModel>(
Label.Named("mssql-fetch-events"),
const string sql = @"DELETE FROM EventFlow WHERE AggregateId = @AggregateId";
var affectedRows = await _connection.ExecuteAsync(
Label.Named("mssql-delete-aggregate"),
cancellationToken,
sql,
new
{
FromId = globalSequenceNumberRange.From,
ToId = globalSequenceNumberRange.To,
})
new {AggregateId = id.Value})
.ConfigureAwait(false);
return eventDataModels;

Log.Verbose(
"Deleted aggregate '{0}' with ID '{1}' by deleting all of its {2} events",
typeof(TAggregate).Name,
id,
affectedRows);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class MsSqlIntegrationTestConfiguration : IntegrationTestConfiguration
protected ITestDatabase TestDatabase { get; private set; }
protected IMsSqlConnection MsSqlConnection { get; private set; }
protected IReadModelSqlGenerator ReadModelSqlGenerator { get; private set; }
protected IReadModelPopulator ReadModelPopulator { get; private set; }

public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptions)
{
Expand All @@ -52,11 +53,12 @@ public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptio
var resolver = eventFlowOptions
.ConfigureMsSql(MsSqlConfiguration.New.SetConnectionString(TestDatabase.ConnectionString))
.UseEventStore<MsSqlEventStore>()
.UseMssqlReadModel<MsSqlTestAggregateReadModel, ILocateByAggregateId>()
.UseMssqlReadModel<MsSqlTestAggregateReadModel>()
.CreateResolver();

MsSqlConnection = resolver.Resolve<IMsSqlConnection>();
ReadModelSqlGenerator = resolver.Resolve<IReadModelSqlGenerator>();
ReadModelPopulator = resolver.Resolve<IReadModelPopulator>();

var databaseMigrator = resolver.Resolve<IMsSqlDatabaseMigrator>();
EventFlowEventStoresMsSql.MigrateDatabase(databaseMigrator);
Expand All @@ -65,7 +67,7 @@ public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptio
return resolver;
}

public override async Task<ITestAggregateReadModel> GetTestAggregateReadModel(IIdentity id)
public override async Task<ITestAggregateReadModel> GetTestAggregateReadModelAsync(IIdentity id)
{
var sql = ReadModelSqlGenerator.CreateSelectSql<MsSqlTestAggregateReadModel>();
var readModels = await MsSqlConnection.QueryAsync<MsSqlTestAggregateReadModel>(
Expand All @@ -77,6 +79,16 @@ public override async Task<ITestAggregateReadModel> GetTestAggregateReadModel(II
return readModels.SingleOrDefault();
}

public override Task PurgeTestAggregateReadModelAsync()
{
return ReadModelPopulator.PurgeAsync<MsSqlTestAggregateReadModel>(CancellationToken.None);
}

public override Task PopulateTestAggregateReadModelAsync()
{
return ReadModelPopulator.PopulateAsync<MsSqlTestAggregateReadModel>(CancellationToken.None);
}

public override void TearDown()
{
TestDatabase.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
[CreateTime] [datetimeoffset](7) NOT NULL,
[UpdatedTime] [datetimeoffset](7) NOT NULL,
[LastAggregateSequenceNumber] [int] NOT NULL,
[LastGlobalSequenceNumber] [bigint] NOT NULL,
CONSTRAINT [PK_ReadModel-TestAggregate] PRIMARY KEY CLUSTERED
(
[Id] ASC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public void CreateInsertSql_ProducesCorrectSql()
// Assert
sql.Should().Be(
"INSERT INTO [ReadModel-TestAggregate] " +
"(AggregateId, CreateTime, DomainErrorAfterFirstReceived, LastAggregateSequenceNumber, LastGlobalSequenceNumber, PingsReceived, UpdatedTime) " +
"(AggregateId, CreateTime, DomainErrorAfterFirstReceived, LastAggregateSequenceNumber, PingsReceived, UpdatedTime) " +
"VALUES " +
"(@AggregateId, @CreateTime, @DomainErrorAfterFirstReceived, @LastAggregateSequenceNumber, @LastGlobalSequenceNumber, @PingsReceived, @UpdatedTime)");
"(@AggregateId, @CreateTime, @DomainErrorAfterFirstReceived, @LastAggregateSequenceNumber, @PingsReceived, @UpdatedTime)");
}

[Test]
Expand All @@ -58,7 +58,7 @@ public void CreateUpdateSql_ProducesCorrectSql()
sql.Should().Be(
"UPDATE [ReadModel-TestAggregate] SET " +
"CreateTime = @CreateTime, DomainErrorAfterFirstReceived = @DomainErrorAfterFirstReceived, " +
"LastAggregateSequenceNumber = @LastAggregateSequenceNumber, LastGlobalSequenceNumber = @LastGlobalSequenceNumber, " +
"LastAggregateSequenceNumber = @LastAggregateSequenceNumber, " +
"PingsReceived = @PingsReceived, UpdatedTime = @UpdatedTime " +
"WHERE AggregateId = @AggregateId");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
<Compile Include="MssqlReadModel.cs" />
<Compile Include="MssqlReadModelStore.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Queries\MsSqlReadModelByIdQueryHandler.cs" />
<Compile Include="ReadModelSqlGenerator.cs" />
</ItemGroup>
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,47 @@
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using EventFlow.Configuration.Registrations;
using EventFlow.Queries;
using EventFlow.ReadStores.MsSql.Queries;
using EventFlow.Extensions;

namespace EventFlow.ReadStores.MsSql.Extensions
{
public static class EventFlowOptionsExtensions
{
public static EventFlowOptions UseMssqlReadModel<TReadModel, TReadModelLocator>(this EventFlowOptions eventFlowOptions)
where TReadModel : IMssqlReadModel, new()
public static EventFlowOptions UseMssqlReadModel<TReadModel, TReadModelLocator>(
this EventFlowOptions eventFlowOptions)
where TReadModel : class, IMssqlReadModel, new()
where TReadModelLocator : IReadModelLocator
{
eventFlowOptions.RegisterServices(f =>
eventFlowOptions
.RegisterServices(f =>
{
if (!f.HasRegistrationFor<IReadModelSqlGenerator>())
{
f.Register<IReadModelSqlGenerator, ReadModelSqlGenerator>(Lifetime.Singleton);
}
f.Register<IMssqlReadModelStore<TReadModel>, MssqlReadModelStore<TReadModel>>();
f.Register<IReadModelStore<TReadModel>>(r => r.Resolver.Resolve<IMssqlReadModelStore<TReadModel>>());
})
.UseReadStoreFor<IMssqlReadModelStore<TReadModel>, TReadModel, TReadModelLocator>();

return eventFlowOptions;
}

public static EventFlowOptions UseMssqlReadModel<TReadModel>(
this EventFlowOptions eventFlowOptions)
where TReadModel : class, IMssqlReadModel, new()
{
eventFlowOptions
.RegisterServices(f =>
{
if (!f.HasRegistrationFor<IReadModelSqlGenerator>())
{
f.Register<IReadModelSqlGenerator, ReadModelSqlGenerator>(Lifetime.Singleton);
}
f.Register<IReadModelStore, MssqlReadModelStore<TReadModel, TReadModelLocator>>();
f.Register<IQueryHandler<ReadModelByIdQuery<TReadModel>, TReadModel>, MsSqlReadModelByIdQueryHandler<TReadModel>>();
});
f.Register<IMssqlReadModelStore<TReadModel>, MssqlReadModelStore<TReadModel>>();
f.Register<IReadModelStore<TReadModel>>(r => r.Resolver.Resolve<IMssqlReadModelStore<TReadModel>>());
})
.UseReadStoreFor<IMssqlReadModelStore<TReadModel>, TReadModel>();

return eventFlowOptions;
}
Expand Down
1 change: 0 additions & 1 deletion Source/EventFlow.ReadStores.MsSql/IMssqlReadModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,5 @@ public interface IMssqlReadModel : IReadModel
DateTimeOffset CreateTime { get; set; }
DateTimeOffset UpdatedTime { get; set; }
int LastAggregateSequenceNumber { get; set; }
long LastGlobalSequenceNumber { get; set; }
}
}
4 changes: 2 additions & 2 deletions Source/EventFlow.ReadStores.MsSql/IMssqlReadModelStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

namespace EventFlow.ReadStores.MsSql
{
public interface IMssqlReadModelStore<TReadModel> : IReadModelStore
where TReadModel : IMssqlReadModel, new()
public interface IMssqlReadModelStore<TReadModel> : IReadModelStore<TReadModel>
where TReadModel : class, IMssqlReadModel, new()
{
}
}
3 changes: 3 additions & 0 deletions Source/EventFlow.ReadStores.MsSql/IReadModelSqlGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@ string CreateSelectSql<TReadModel>()

string CreateUpdateSql<TReadModel>()
where TReadModel : IMssqlReadModel;

string CreatePurgeSql<TReadModel>()
where TReadModel : IReadModel;
}
}
4 changes: 1 addition & 3 deletions Source/EventFlow.ReadStores.MsSql/MssqlReadModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@ public abstract class MssqlReadModel : IMssqlReadModel
public DateTimeOffset CreateTime { get; set; }
public DateTimeOffset UpdatedTime { get; set; }
public int LastAggregateSequenceNumber { get; set; }
public long LastGlobalSequenceNumber { get; set; }

public override string ToString()
{
return string.Format(
"Read model '{0}' for '{1} ({2}/{3}'",
"Read model '{0}' for '{1} v{2}'",
GetType().Name,
AggregateId,
LastGlobalSequenceNumber,
LastAggregateSequenceNumber);
}
}
Expand Down
Loading

0 comments on commit 42509e5

Please sign in to comment.