Skip to content

Commit

Permalink
Merge pull request #70 from rasmus/read-model-populate
Browse files Browse the repository at this point in the history
Read model populate
  • Loading branch information
rasmus committed May 29, 2015
2 parents 6ea40d9 + 532eaf0 commit 7b5cf02
Show file tree
Hide file tree
Showing 52 changed files with 1,617 additions and 465 deletions.
6 changes: 6 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
* New: `IEventStore.DeleteAggregateAsync` to delete an entire aggregate
stream. Please consider carefully if you really want to use it. Storage
might be cheaper than the historic knowledge within your events
* New: `IReadModelPopulator` is new and enables you to both purge and
populate read models by going though the entire event store. Currently
its only basic functionality, but more will be added
* New: `IEventStore` now has `LoadAllEventsAsync` and `LoadAllEvents` that
enables you to load all events in the event store a few at a time.
* New: `IMetadata.TimestampEpoch` contains the Unix timestamp version
of `IMetadata.Timestamp`. Also, an additional metadata key
`timestamp_epoch` is added to events containing the same data. Note,
Expand All @@ -31,6 +36,7 @@
* Fixed: `AggregateRoot<>` now reads the aggregate version from
domain events applied during aggregate load. This resolves an issue
for when an `IEventUpgrader` removed events from the event stream
* Fixed: `InMemoryReadModelStore<,>` is now thread safe

### New in 0.7.481 (released 2015-05-22)

Expand Down
31 changes: 31 additions & 0 deletions Source/EventFlow.EventStores.MsSql/MssqlEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,37 @@ public class EventDataModel : ICommittedDomainEvent
_connection = connection;
}

protected override async Task<AllCommittedEventsPage> LoadAllCommittedDomainEvents(
long startPostion,
long endPosition,
CancellationToken cancellationToken)
{
const string sql = @"
SELECT
GlobalSequenceNumber, BatchId, AggregateId, AggregateName, Data, Metadata, AggregateSequenceNumber
FROM EventFlow
WHERE
GlobalSequenceNumber >= @FromId AND GlobalSequenceNumber <= @ToId
ORDER BY
GlobalSequenceNumber ASC";
var eventDataModels = await _connection.QueryAsync<EventDataModel>(
Label.Named("mssql-fetch-events"),
cancellationToken,
sql,
new
{
FromId = startPostion,
ToId = endPosition,
})
.ConfigureAwait(false);

var nextPosition = eventDataModels.Any()
? eventDataModels.Max(e => e.GlobalSequenceNumber) + 1
: startPostion;

return new AllCommittedEventsPage(nextPosition, eventDataModels);
}

protected override async Task<IReadOnlyCollection<ICommittedDomainEvent>> CommitEventsAsync<TAggregate, TIdentity>(
TIdentity id,
IReadOnlyCollection<SerializedEvent> serializedEvents,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class MsSqlIntegrationTestConfiguration : IntegrationTestConfiguration
protected ITestDatabase TestDatabase { get; private set; }
protected IMsSqlConnection MsSqlConnection { get; private set; }
protected IReadModelSqlGenerator ReadModelSqlGenerator { get; private set; }
protected IReadModelPopulator ReadModelPopulator { get; private set; }

public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptions)
{
Expand All @@ -52,11 +53,12 @@ public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptio
var resolver = eventFlowOptions
.ConfigureMsSql(MsSqlConfiguration.New.SetConnectionString(TestDatabase.ConnectionString))
.UseEventStore<MsSqlEventStore>()
.UseMssqlReadModel<MsSqlTestAggregateReadModel, ILocateByAggregateId>()
.UseMssqlReadModel<MsSqlTestAggregateReadModel>()
.CreateResolver();

MsSqlConnection = resolver.Resolve<IMsSqlConnection>();
ReadModelSqlGenerator = resolver.Resolve<IReadModelSqlGenerator>();
ReadModelPopulator = resolver.Resolve<IReadModelPopulator>();

var databaseMigrator = resolver.Resolve<IMsSqlDatabaseMigrator>();
EventFlowEventStoresMsSql.MigrateDatabase(databaseMigrator);
Expand All @@ -65,7 +67,7 @@ public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptio
return resolver;
}

public override async Task<ITestAggregateReadModel> GetTestAggregateReadModel(IIdentity id)
public override async Task<ITestAggregateReadModel> GetTestAggregateReadModelAsync(IIdentity id)
{
var sql = ReadModelSqlGenerator.CreateSelectSql<MsSqlTestAggregateReadModel>();
var readModels = await MsSqlConnection.QueryAsync<MsSqlTestAggregateReadModel>(
Expand All @@ -77,6 +79,16 @@ public override async Task<ITestAggregateReadModel> GetTestAggregateReadModel(II
return readModels.SingleOrDefault();
}

public override Task PurgeTestAggregateReadModelAsync()
{
return ReadModelPopulator.PurgeAsync<MsSqlTestAggregateReadModel>(CancellationToken.None);
}

public override Task PopulateTestAggregateReadModelAsync()
{
return ReadModelPopulator.PopulateAsync<MsSqlTestAggregateReadModel>(CancellationToken.None);
}

public override void TearDown()
{
TestDatabase.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
<Compile Include="MssqlReadModel.cs" />
<Compile Include="MssqlReadModelStore.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Queries\MsSqlReadModelByIdQueryHandler.cs" />
<Compile Include="ReadModelSqlGenerator.cs" />
</ItemGroup>
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,47 @@
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using EventFlow.Configuration.Registrations;
using EventFlow.Queries;
using EventFlow.ReadStores.MsSql.Queries;
using EventFlow.Extensions;

namespace EventFlow.ReadStores.MsSql.Extensions
{
public static class EventFlowOptionsExtensions
{
public static EventFlowOptions UseMssqlReadModel<TReadModel, TReadModelLocator>(this EventFlowOptions eventFlowOptions)
where TReadModel : IMssqlReadModel, new()
public static EventFlowOptions UseMssqlReadModel<TReadModel, TReadModelLocator>(
this EventFlowOptions eventFlowOptions)
where TReadModel : class, IMssqlReadModel, new()
where TReadModelLocator : IReadModelLocator
{
eventFlowOptions.RegisterServices(f =>
eventFlowOptions
.RegisterServices(f =>
{
if (!f.HasRegistrationFor<IReadModelSqlGenerator>())
{
f.Register<IReadModelSqlGenerator, ReadModelSqlGenerator>(Lifetime.Singleton);
}
f.Register<IMssqlReadModelStore<TReadModel>, MssqlReadModelStore<TReadModel>>();
f.Register<IReadModelStore<TReadModel>>(r => r.Resolver.Resolve<IMssqlReadModelStore<TReadModel>>());
})
.UseReadStoreFor<IMssqlReadModelStore<TReadModel>, TReadModel, TReadModelLocator>();

return eventFlowOptions;
}

public static EventFlowOptions UseMssqlReadModel<TReadModel>(
this EventFlowOptions eventFlowOptions)
where TReadModel : class, IMssqlReadModel, new()
{
eventFlowOptions
.RegisterServices(f =>
{
if (!f.HasRegistrationFor<IReadModelSqlGenerator>())
{
f.Register<IReadModelSqlGenerator, ReadModelSqlGenerator>(Lifetime.Singleton);
}
f.Register<IReadModelStore, MssqlReadModelStore<TReadModel, TReadModelLocator>>();
f.Register<IQueryHandler<ReadModelByIdQuery<TReadModel>, TReadModel>, MsSqlReadModelByIdQueryHandler<TReadModel>>();
});
f.Register<IMssqlReadModelStore<TReadModel>, MssqlReadModelStore<TReadModel>>();
f.Register<IReadModelStore<TReadModel>>(r => r.Resolver.Resolve<IMssqlReadModelStore<TReadModel>>());
})
.UseReadStoreFor<IMssqlReadModelStore<TReadModel>, TReadModel>();

return eventFlowOptions;
}
Expand Down
4 changes: 2 additions & 2 deletions Source/EventFlow.ReadStores.MsSql/IMssqlReadModelStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

namespace EventFlow.ReadStores.MsSql
{
public interface IMssqlReadModelStore<TReadModel> : IReadModelStore
where TReadModel : IMssqlReadModel, new()
public interface IMssqlReadModelStore<TReadModel> : IReadModelStore<TReadModel>
where TReadModel : class, IMssqlReadModel, new()
{
}
}
3 changes: 3 additions & 0 deletions Source/EventFlow.ReadStores.MsSql/IReadModelSqlGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@ string CreateSelectSql<TReadModel>()

string CreateUpdateSql<TReadModel>()
where TReadModel : IMssqlReadModel;

string CreatePurgeSql<TReadModel>()
where TReadModel : IReadModel;
}
}
124 changes: 72 additions & 52 deletions Source/EventFlow.ReadStores.MsSql/MssqlReadModelStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand All @@ -32,90 +33,109 @@

namespace EventFlow.ReadStores.MsSql
{
public class MssqlReadModelStore<TReadModel, TReadModelLocator> :
ReadModelStore<TReadModel, TReadModelLocator>,
public class MssqlReadModelStore<TReadModel> :
ReadModelStore<TReadModel>,
IMssqlReadModelStore<TReadModel>
where TReadModel : IMssqlReadModel, new()
where TReadModelLocator : IReadModelLocator
where TReadModel : class, IMssqlReadModel, new()
{
private readonly IMsSqlConnection _connection;
private readonly IQueryProcessor _queryProcessor;
private readonly IReadModelSqlGenerator _readModelSqlGenerator;

public MssqlReadModelStore(
ILog log,
TReadModelLocator readModelLocator,
IReadModelFactory readModelFactory,
IMsSqlConnection connection,
IQueryProcessor queryProcessor,
IReadModelSqlGenerator readModelSqlGenerator)
: base(log, readModelLocator, readModelFactory)
: base(log)
{
_connection = connection;
_queryProcessor = queryProcessor;
_readModelSqlGenerator = readModelSqlGenerator;
}

private async Task UpdateReadModelAsync(
string id,
IReadOnlyCollection<IDomainEvent> domainEvents,
public override async Task UpdateAsync(
IReadOnlyCollection<ReadModelUpdate> readModelUpdates,
IReadModelContext readModelContext,
Func<IReadModelContext, IReadOnlyCollection<IDomainEvent>, ReadModelEnvelope<TReadModel>, CancellationToken, Task<ReadModelEnvelope<TReadModel>>> updateReadModel,
CancellationToken cancellationToken)
{
var readModelNameLowerCased = typeof (TReadModel).Name.ToLowerInvariant();
var readModel = await GetByIdAsync(id, cancellationToken).ConfigureAwait(false);
var isNew = false;
if (readModel == null)
{
isNew = true;
readModel = new TReadModel
{
AggregateId = id,
CreateTime = domainEvents.First().Timestamp,
};
}
// TODO: Transaction

var appliedAny = await ReadModelFactory.UpdateReadModelAsync(
readModel,
domainEvents,
readModelContext,
cancellationToken)
.ConfigureAwait(false);
if (!appliedAny)
foreach (var readModelUpdate in readModelUpdates)
{
return;
}
var readModelNameLowerCased = typeof(TReadModel).Name.ToLowerInvariant();
var readModelEnvelope = await GetAsync(readModelUpdate.ReadModelId, cancellationToken).ConfigureAwait(false);
var readModel = readModelEnvelope.ReadModel;
var isNew = readModel == null;
if (readModel == null)
{
readModel = new TReadModel
{
AggregateId = readModelUpdate.ReadModelId,
CreateTime = readModelUpdate.DomainEvents.First().Timestamp,
};
}

var lastDomainEvent = domainEvents.Last();
readModel.UpdatedTime = lastDomainEvent.Timestamp;
readModel.LastAggregateSequenceNumber = lastDomainEvent.AggregateSequenceNumber;
readModelEnvelope = await updateReadModel(
readModelContext,
readModelUpdate.DomainEvents,
ReadModelEnvelope<TReadModel>.With(readModel, readModel.LastAggregateSequenceNumber),
cancellationToken)
.ConfigureAwait(false);

var sql = isNew
? _readModelSqlGenerator.CreateInsertSql<TReadModel>()
: _readModelSqlGenerator.CreateUpdateSql<TReadModel>();
readModel.UpdatedTime = DateTimeOffset.Now;
readModel.LastAggregateSequenceNumber = (int) readModelEnvelope.Version.GetValueOrDefault();

await _connection.ExecuteAsync(
Label.Named(string.Format("mssql-store-read-model-{0}", readModelNameLowerCased)),
var sql = isNew
? _readModelSqlGenerator.CreateInsertSql<TReadModel>()
: _readModelSqlGenerator.CreateUpdateSql<TReadModel>();

await _connection.ExecuteAsync(
Label.Named("mssql-store-read-model", readModelNameLowerCased),
cancellationToken,
sql,
readModel).ConfigureAwait(false);
}
}

public override async Task<ReadModelEnvelope<TReadModel>> GetAsync(string id, CancellationToken cancellationToken)
{
var readModelNameLowerCased = typeof(TReadModel).Name.ToLowerInvariant();
var selectSql = _readModelSqlGenerator.CreateSelectSql<TReadModel>();
var readModels = await _connection.QueryAsync<TReadModel>(
Label.Named(string.Format("mssql-fetch-read-model-{0}", readModelNameLowerCased)),
cancellationToken,
sql,
readModel).ConfigureAwait(false);
selectSql,
new { AggregateId = id })
.ConfigureAwait(false);
var readModel = readModels.SingleOrDefault();

return readModel == null
? ReadModelEnvelope<TReadModel>.Empty
: ReadModelEnvelope<TReadModel>.With(readModel, readModel.LastAggregateSequenceNumber);
}

public override Task<TReadModel> GetByIdAsync(
string id,
CancellationToken cancellationToken)
public override Task DeleteAsync(string id, CancellationToken cancellationToken)
{
return _queryProcessor.ProcessAsync(new ReadModelByIdQuery<TReadModel>(id), cancellationToken);
throw new NotImplementedException();
}

protected override Task UpdateReadModelsAsync(
IReadOnlyCollection<ReadModelUpdate> readModelUpdates,
IReadModelContext readModelContext,
CancellationToken cancellationToken)
public override async Task DeleteAllAsync(CancellationToken cancellationToken)
{
var updateTasks = readModelUpdates
.Select(rmu => UpdateReadModelAsync(rmu.ReadModelId, rmu.DomainEvents, readModelContext, cancellationToken));
return Task.WhenAll(updateTasks);
var sql = _readModelSqlGenerator.CreatePurgeSql<TReadModel>();
var readModelName = typeof(TReadModel).Name;

var rowsAffected = await _connection.ExecuteAsync(
Label.Named("mssql-purge-read-model", readModelName),
cancellationToken,
sql)
.ConfigureAwait(false);

Log.Verbose(
"Purge {0} read models of type '{1}'",
rowsAffected,
readModelName);
}
}
}
Loading

0 comments on commit 7b5cf02

Please sign in to comment.