Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
rasmus committed Jul 5, 2018
2 parents 0bba057 + 3795a67 commit c765ba2
Show file tree
Hide file tree
Showing 56 changed files with 1,317 additions and 226 deletions.
23 changes: 23 additions & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Contributers

List of notable contributers to EventFlow sorted alphabetically. For a
complete list of all contributions to EventFlow, have look at the
[contributors](https://github.com/eventflow/EventFlow/graphs/contributors)
graph.

If you think your name is missing from the list, the create a pull-request.

### [Carlos Eduardo Ferrari](https://github.com/ceferrari)

* Converted EventFlow to Visual Studio 2017 project format

### [Frank Ebersoll](https://github.com/frankebersoll)

* Created following packages
* `EventFlow.DependencyInjection`
* Several key contributions and bug fixes

### [Rasmus Mikkelsen](https://github.com/rasmus)

* Original creator and author of EventFlow

19 changes: 18 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
### New in 0.61 (not released yet)
### New in 0.62 (not released yet)

* New: Created `AggregateReadStoreManager<,,,>` which is a new read store manager
for read models that have a 1-to-1 relation with an aggregate. If read models get
out of sync, or events are applied in different order, events are either fecthed
or skipped. Added extensions to allow registration.
- `UseInMemoryReadStoreFor<,,>`
- `UseElasticsearchReadModelFor<,,>`
- `UseMssqlReadModelFor<,,>`
- `UseSQLiteReadModelFor<,,>`
* New: Added `ReadModelId` and `IsNew` properties to the context object that is
available to a read model inside the `Apply` methods in order to better support
scenarios where a single event affects multiple read model instances.
* Minor: Applying events to a snapshot will now have the correct `Version` set
inside the `Apply` methods.
* Minor: Trying to apply events in the wrong order will now throw an exception.

### New in 0.61.3524 (released 2018-06-26)

* New: Support for `Microsoft.Extensions.DependencyInjection` (`IServiceProvider`
and `IServiceCollection`) using the `EventFlow.DependencyInjection` NuGet package.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
using EventFlow.Extensions;
using EventFlow.ReadStores;
using EventFlow.TestHelpers;
using EventFlow.TestHelpers.Aggregates;
using EventFlow.TestHelpers.Aggregates.Entities;
using EventFlow.TestHelpers.Suites;
using Nest;
Expand Down Expand Up @@ -93,7 +94,7 @@ protected override IRootResolver CreateRootResolver(IEventFlowOptions eventFlowO
sr.Register<IReadModelDescriptionProvider>(c => testReadModelDescriptionProvider);
})
.ConfigureElasticsearch(_elasticsearchInstance.Uri)
.UseElasticsearchReadModel<ElasticsearchThingyReadModel>()
.UseElasticsearchReadModelFor<ThingyAggregate, ThingyId, ElasticsearchThingyReadModel>()
.UseElasticsearchReadModel<ElasticsearchThingyMessageReadModel, ThingyMessageLocator>()
.AddQueryHandlers(
typeof(ElasticsearchThingyGetQueryHandler),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System.Linq;
using EventFlow.Aggregates;
using EventFlow.ReadStores;
using EventFlow.TestHelpers.Aggregates;
Expand All @@ -32,7 +33,8 @@ namespace EventFlow.Elasticsearch.Tests.IntegrationTests.ReadModels
{
[ElasticsearchType(IdProperty = "Id", Name = "message")]
public class ElasticsearchThingyMessageReadModel : IReadModel,
IAmReadModelFor<ThingyAggregate, ThingyId, ThingyMessageAddedEvent>
IAmReadModelFor<ThingyAggregate, ThingyId, ThingyMessageAddedEvent>,
IAmReadModelFor<ThingyAggregate, ThingyId, ThingyMessageHistoryAddedEvent>
{
public string Id { get; set; }

Expand All @@ -55,6 +57,16 @@ public void Apply(IReadModelContext context, IDomainEvent<ThingyAggregate, Thing
Message = thingyMessage.Message;
}

public void Apply(IReadModelContext context, IDomainEvent<ThingyAggregate, ThingyId, ThingyMessageHistoryAddedEvent> domainEvent)
{
ThingyId = domainEvent.AggregateIdentity.Value;

var messageId = new ThingyMessageId(context.ReadModelId);
var thingyMessage = domainEvent.AggregateEvent.ThingyMessages.Single(m => m.Id == messageId);
Id = messageId.Value;
Message = thingyMessage.Message;
}

public ThingyMessage ToThingyMessage()
{
return new ThingyMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
using System;
using System.Linq;
using Elasticsearch.Net;
using EventFlow.Aggregates;
using EventFlow.Configuration;
using EventFlow.Core;
using EventFlow.Elasticsearch.ReadStores;
using EventFlow.Extensions;
using EventFlow.ReadStores;
Expand Down Expand Up @@ -79,11 +81,7 @@ public static class EventFlowOptionsExtensions
where TReadModel : class, IReadModel
{
return eventFlowOptions
.RegisterServices(f =>
{
f.Register<IElasticsearchReadModelStore<TReadModel>, ElasticsearchReadModelStore<TReadModel>>();
f.Register<IReadModelStore<TReadModel>>(r => r.Resolver.Resolve<IElasticsearchReadModelStore<TReadModel>>());
})
.RegisterServices(RegisterElasticsearchReadStore<TReadModel>)
.UseReadStoreFor<IElasticsearchReadModelStore<TReadModel>, TReadModel>();
}

Expand All @@ -93,12 +91,27 @@ public static class EventFlowOptionsExtensions
where TReadModelLocator : IReadModelLocator
{
return eventFlowOptions
.RegisterServices(f =>
{
f.Register<IElasticsearchReadModelStore<TReadModel>, ElasticsearchReadModelStore<TReadModel>>();
f.Register<IReadModelStore<TReadModel>>(r => r.Resolver.Resolve<IElasticsearchReadModelStore<TReadModel>>());
})
.RegisterServices(RegisterElasticsearchReadStore<TReadModel>)
.UseReadStoreFor<IElasticsearchReadModelStore<TReadModel>, TReadModel, TReadModelLocator>();
}

public static IEventFlowOptions UseElasticsearchReadModelFor<TAggregate, TIdentity, TReadModel>(
this IEventFlowOptions eventFlowOptions)
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
where TReadModel : class, IReadModel
{
return eventFlowOptions
.RegisterServices(RegisterElasticsearchReadStore<TReadModel>)
.UseReadStoreFor<TAggregate, TIdentity, IElasticsearchReadModelStore<TReadModel>, TReadModel>();
}

private static void RegisterElasticsearchReadStore<TReadModel>(
IServiceRegistration serviceRegistration)
where TReadModel : class, IReadModel
{
serviceRegistration.Register<IElasticsearchReadModelStore<TReadModel>, ElasticsearchReadModelStore<TReadModel>>();
serviceRegistration.Register<IReadModelStore<TReadModel>>(r => r.Resolver.Resolve<IElasticsearchReadModelStore<TReadModel>>());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ public class ElasticsearchReadModelStore<TReadModel> :
return ReadModelEnvelope<TReadModel>.Empty(id);
}

return ReadModelEnvelope<TReadModel>.With(id, getResponse.Source, getResponse.Version);
return ReadModelEnvelope<TReadModel>.With(
id,
getResponse.Source,
getResponse.Version);
}

public async Task DeleteAsync(
Expand Down Expand Up @@ -118,18 +121,17 @@ public class ElasticsearchReadModelStore<TReadModel> :
}

public async Task UpdateAsync(IReadOnlyCollection<ReadModelUpdate> readModelUpdates,
Func<IReadModelContext> readModelContextFactory,
IReadModelContextFactory readModelContextFactory,
Func<IReadModelContext, IReadOnlyCollection<IDomainEvent>, ReadModelEnvelope<TReadModel>, CancellationToken,
Task<ReadModelEnvelope<TReadModel>>> updateReadModel,
Task<ReadModelUpdateResult<TReadModel>>> updateReadModel,
CancellationToken cancellationToken)
{
var readModelDescription = _readModelDescriptionProvider.GetReadModelDescription<TReadModel>();
var readModelContext = readModelContextFactory();

foreach (var readModelUpdate in readModelUpdates)
{
await _transientFaultHandler.TryAsync(
c => UpdateReadModelAsync(readModelDescription, readModelUpdate, readModelContext, updateReadModel, c),
c => UpdateReadModelAsync(readModelDescription, readModelUpdate, readModelContextFactory, updateReadModel, c),
Label.Named("elasticsearch-read-model-update"),
cancellationToken)
.ConfigureAwait(false);
Expand All @@ -139,33 +141,44 @@ public class ElasticsearchReadModelStore<TReadModel> :
private async Task UpdateReadModelAsync(
ReadModelDescription readModelDescription,
ReadModelUpdate readModelUpdate,
IReadModelContext readModelContext,
Func<IReadModelContext, IReadOnlyCollection<IDomainEvent>, ReadModelEnvelope<TReadModel>, CancellationToken, Task<ReadModelEnvelope<TReadModel>>> updateReadModel,
IReadModelContextFactory readModelContextFactory,
Func<IReadModelContext, IReadOnlyCollection<IDomainEvent>, ReadModelEnvelope<TReadModel>, CancellationToken, Task<ReadModelUpdateResult<TReadModel>>> updateReadModel,
CancellationToken cancellationToken)
{
var readModelId = readModelUpdate.ReadModelId;

var response = await _elasticClient.GetAsync<TReadModel>(
readModelUpdate.ReadModelId,
readModelId,
d => d
.RequestConfiguration(c => c
.AllowedStatusCodes((int)HttpStatusCode.NotFound))
.Index(readModelDescription.IndexName.Value),
cancellationToken)
.ConfigureAwait(false);

var readModelEnvelope = response.Found
? ReadModelEnvelope<TReadModel>.With(readModelUpdate.ReadModelId, response.Source, response.Version)
: ReadModelEnvelope<TReadModel>.Empty(readModelUpdate.ReadModelId);
var isNew = !response.Found;

var readModelEnvelope = isNew
? ReadModelEnvelope<TReadModel>.Empty(readModelId)
: ReadModelEnvelope<TReadModel>.With(readModelUpdate.ReadModelId, response.Source, response.Version);

var context = readModelContextFactory.Create(readModelId, isNew);

readModelEnvelope = await updateReadModel(
readModelContext,
var readModelUpdateResult = await updateReadModel(
context,
readModelUpdate.DomainEvents,
readModelEnvelope,
cancellationToken)
.ConfigureAwait(false);
if (!readModelUpdateResult.IsModified)
{
return;
}

if (readModelContext.IsMarkedForDeletion)
readModelEnvelope = readModelUpdateResult.Envelope;
if (context.IsMarkedForDeletion)
{
await DeleteAsync(readModelUpdate.ReadModelId, cancellationToken);
await DeleteAsync(readModelId, cancellationToken).ConfigureAwait(false);
return;
}

Expand All @@ -177,11 +190,11 @@ public class ElasticsearchReadModelStore<TReadModel> :
{
d = d
.RequestConfiguration(c => c)
.Id(readModelUpdate.ReadModelId)
.Id(readModelId)
.Index(readModelDescription.IndexName.Value);
d = response.Found
? d.VersionType(VersionType.ExternalGte).Version(readModelEnvelope.Version.GetValueOrDefault())
: d.OpType(OpType.Create);
d = isNew
? d.OpType(OpType.Create)
: d.VersionType(VersionType.ExternalGte).Version(readModelEnvelope.Version.GetValueOrDefault());
return d;
},
cancellationToken)
Expand All @@ -191,7 +204,7 @@ public class ElasticsearchReadModelStore<TReadModel> :
when (e.Response?.HttpStatusCode == (int)HttpStatusCode.Conflict)
{
throw new OptimisticConcurrencyException(
$"Read model '{readModelUpdate.ReadModelId}' updated by another",
$"Read model '{readModelId}' updated by another",
e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
using EventFlow.MsSql.Tests.IntegrationTests.ReadStores.QueryHandlers;
using EventFlow.MsSql.Tests.IntegrationTests.ReadStores.ReadModels;
using EventFlow.TestHelpers;
using EventFlow.TestHelpers.Aggregates;
using EventFlow.TestHelpers.Aggregates.Entities;
using EventFlow.TestHelpers.MsSql;
using EventFlow.TestHelpers.Suites;
Expand All @@ -50,7 +51,7 @@ protected override IRootResolver CreateRootResolver(IEventFlowOptions eventFlowO
var resolver = eventFlowOptions
.RegisterServices(sr => sr.RegisterType(typeof(ThingyMessageLocator)))
.ConfigureMsSql(MsSqlConfiguration.New.SetConnectionString(_testDatabase.ConnectionString.Value))
.UseMssqlReadModel<MsSqlThingyReadModel>()
.UseMssqlReadModelFor<ThingyAggregate, ThingyId, MsSqlThingyReadModel>()
.UseMssqlReadModel<MsSqlThingyMessageReadModel, ThingyMessageLocator>()
.AddQueryHandlers(
typeof(MsSqlThingyGetQueryHandler),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System.ComponentModel.DataAnnotations.Schema;
using System.Linq;
using EventFlow.Aggregates;
using EventFlow.MsSql.ReadStores.Attributes;
using EventFlow.ReadStores;
Expand All @@ -33,7 +34,8 @@ namespace EventFlow.MsSql.Tests.IntegrationTests.ReadStores.ReadModels
{
[Table("ReadModel-ThingyMessage")]
public class MsSqlThingyMessageReadModel : IReadModel,
IAmReadModelFor<ThingyAggregate, ThingyId, ThingyMessageAddedEvent>
IAmReadModelFor<ThingyAggregate, ThingyId, ThingyMessageAddedEvent>,
IAmReadModelFor<ThingyAggregate, ThingyId, ThingyMessageHistoryAddedEvent>
{
public string ThingyId { get; set; }

Expand All @@ -51,6 +53,16 @@ public void Apply(IReadModelContext context, IDomainEvent<ThingyAggregate, Thing
Message = thingyMessage.Message;
}

public void Apply(IReadModelContext context, IDomainEvent<ThingyAggregate, ThingyId, ThingyMessageHistoryAddedEvent> domainEvent)
{
ThingyId = domainEvent.AggregateIdentity.Value;

var messageId = new ThingyMessageId(context.ReadModelId);
var thingyMessage = domainEvent.AggregateEvent.ThingyMessages.Single(m => m.Id == messageId);
MessageId = messageId.Value;
Message = thingyMessage.Message;
}

public ThingyMessage ToThingyMessage()
{
return new ThingyMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using EventFlow.Aggregates;
using EventFlow.Configuration;
using EventFlow.Core;
using EventFlow.Extensions;
using EventFlow.MsSql.ReadStores;
using EventFlow.ReadStores;
Expand All @@ -37,12 +39,7 @@ public static class EventFlowOptionsMsSqlReadStoreExtensions
where TReadModelLocator : IReadModelLocator
{
return eventFlowOptions
.RegisterServices(f =>
{
f.Register<IReadModelSqlGenerator, ReadModelSqlGenerator>(Lifetime.Singleton, true);
f.Register<IMssqlReadModelStore<TReadModel>, MssqlReadModelStore<TReadModel>>();
f.Register<IReadModelStore<TReadModel>>(r => r.Resolver.Resolve<IMssqlReadModelStore<TReadModel>>());
})
.RegisterServices(RegisterMssqlReadStore<TReadModel>)
.UseReadStoreFor<IMssqlReadModelStore<TReadModel>, TReadModel, TReadModelLocator>();
}

Expand All @@ -51,13 +48,28 @@ public static class EventFlowOptionsMsSqlReadStoreExtensions
where TReadModel : class, IReadModel
{
return eventFlowOptions
.RegisterServices(f =>
{
f.Register<IReadModelSqlGenerator, ReadModelSqlGenerator>(Lifetime.Singleton, true);
f.Register<IMssqlReadModelStore<TReadModel>, MssqlReadModelStore<TReadModel>>();
f.Register<IReadModelStore<TReadModel>>(r => r.Resolver.Resolve<IMssqlReadModelStore<TReadModel>>());
})
.RegisterServices(RegisterMssqlReadStore<TReadModel>)
.UseReadStoreFor<IMssqlReadModelStore<TReadModel>, TReadModel>();
}

public static IEventFlowOptions UseMssqlReadModelFor<TAggregate, TIdentity, TReadModel>(
this IEventFlowOptions eventFlowOptions)
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
where TReadModel : class, IReadModel
{
return eventFlowOptions
.RegisterServices(RegisterMssqlReadStore<TReadModel>)
.UseReadStoreFor<TAggregate, TIdentity, IMssqlReadModelStore<TReadModel>, TReadModel>();
}

private static void RegisterMssqlReadStore<TReadModel>(
IServiceRegistration serviceRegistration)
where TReadModel : class, IReadModel
{
serviceRegistration.Register<IReadModelSqlGenerator, ReadModelSqlGenerator>(Lifetime.Singleton, true);
serviceRegistration.Register<IMssqlReadModelStore<TReadModel>, MssqlReadModelStore<TReadModel>>();
serviceRegistration.Register<IReadModelStore<TReadModel>>(r => r.Resolver.Resolve<IMssqlReadModelStore<TReadModel>>());
}
}
}
Loading

0 comments on commit c765ba2

Please sign in to comment.