Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
Rasmus Mikkelsen committed Jun 18, 2018
2 parents ae74439 + 182d3e4 commit 4f2c3ee
Show file tree
Hide file tree
Showing 42 changed files with 711 additions and 246 deletions.
55 changes: 30 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ to the documentation.
* In-memory - only for test
* Files - only for test
* Microsoft SQL Server
* EventStore - only for test (for now) [home page](https://geteventstore.com/)
* EventStore - [home page](https://geteventstore.com/)
* [**Subscribers:**](http://docs.geteventflow.net/Subscribers.html)
Listeners that act on specific domain events. Useful if an specific action
needs to be triggered after a domain event has been committed.
Expand Down Expand Up @@ -179,12 +179,12 @@ public async Task Example()

```csharp
// The aggregate root
public class ExampleAggrenate : AggregateRoot<ExampleAggrenate, ExampleId>,
public class ExampleAggregate : AggregateRoot<ExampleAggregate, ExampleId>,
IEmit<ExampleEvent>
{
private int? _magicNumber;

public ExampleAggrenate(ExampleId id) : base(id) { }
public ExampleAggregate(ExampleId id) : base(id) { }

// Method invoked by our command
public void SetMagicNumer(int magicNumber)
Expand Down Expand Up @@ -215,7 +215,7 @@ public class ExampleId : Identity<ExampleId>

```csharp
// A basic event containing some information
public class ExampleEvent : AggregateEvent<ExampleAggrenate, ExampleId>
public class ExampleEvent : AggregateEvent<ExampleAggregate, ExampleId>
{
public ExampleEvent(int magicNumber)
{
Expand All @@ -228,7 +228,7 @@ public class ExampleEvent : AggregateEvent<ExampleAggrenate, ExampleId>

```csharp
// Command for update magic number
public class ExampleCommand : Command<ExampleAggrenate, ExampleId>
public class ExampleCommand : Command<ExampleAggregate, ExampleId>
{
public ExampleCommand(
ExampleId aggregateId,
Expand All @@ -245,10 +245,10 @@ public class ExampleCommand : Command<ExampleAggrenate, ExampleId>
```csharp
// Command handler for our command
public class ExampleCommandHandler
: CommandHandler<ExampleAggrenate, ExampleId, ExampleCommand>
: CommandHandler<ExampleAggregate, ExampleId, ExampleCommand>
{
public override Task ExecuteAsync(
ExampleAggrenate aggregate,
ExampleAggregate aggregate,
ExampleCommand command,
CancellationToken cancellationToken)
{
Expand All @@ -261,13 +261,13 @@ public class ExampleCommandHandler
```csharp
// Read model for our aggregate
public class ExampleReadModel : IReadModel,
IAmReadModelFor<ExampleAggrenate, ExampleId, ExampleEvent>
IAmReadModelFor<ExampleAggregate, ExampleId, ExampleEvent>
{
public int MagicNumber { get; private set; }

public void Apply(
IReadModelContext context,
IDomainEvent<ExampleAggrenate, ExampleId, ExampleEvent> domainEvent)
IDomainEvent<ExampleAggregate, ExampleId, ExampleEvent> domainEvent)
{
MagicNumber = domainEvent.AggregateEvent.MagicNumber;
}
Expand All @@ -294,26 +294,31 @@ section lists some of them. If you have a link with a relevant article, please
share it by creating an issue with the link.

* **Domain-Driven Design**
- [Domain-Driven Design Reference](https://domainlanguage.com/ddd/reference/) by Eric Evans
* [Domain-Driven Design Reference](https://domainlanguage.com/ddd/reference/)
by Eric Evans
* [DDD Decoded - Bounded Contexts Explained](http://blog.sapiensworks.com/post/2016/08/12/DDD-Bounded-Contexts-Explained)
* [Going "Events-First" for Microservices with Event Storming and DDD](http://www.russmiles.com/essais/going-events-first-for-microservices-with-event-storming-and-ddd)
* **General CQRS+ES**
- [CQRS Journey by Microsoft](https://msdn.microsoft.com/en-us/library/jj554200.aspx)
published by Microsoft
- [An In-Depth Look At CQRS](http://blog.sapiensworks.com/post/2015/09/01/In-Depth-CQRS/)
by Mike Mogosanu
- [CQRS, Task Based UIs, Event Sourcing agh!](http://codebetter.com/gregyoung/2010/02/16/cqrs-task-based-uis-event-sourcing-agh/)
by Greg Young
- [Busting some CQRS myths](https://lostechies.com/jimmybogard/2012/08/22/busting-some-cqrs-myths/)
by Jimmy Bogard
- [CQRS applied](https://lostechies.com/gabrielschenker/2015/04/12/cqrs-applied/)
by Gabriel Schenker
* [CQRS Journey by Microsoft](https://msdn.microsoft.com/en-us/library/jj554200.aspx)
published by Microsoft
* [An In-Depth Look At CQRS](http://blog.sapiensworks.com/post/2015/09/01/In-Depth-CQRS/)
by Mike Mogosanu
* [CQRS, Task Based UIs, Event Sourcing agh!](http://codebetter.com/gregyoung/2010/02/16/cqrs-task-based-uis-event-sourcing-agh/)
by Greg Young
* [Busting some CQRS myths](https://lostechies.com/jimmybogard/2012/08/22/busting-some-cqrs-myths/)
by Jimmy Bogard
* [CQRS applied](https://lostechies.com/gabrielschenker/2015/04/12/cqrs-applied/)
by Gabriel Schenker
* [DDD Decoded - Entities and Value Objects Explained](http://blog.sapiensworks.com/post/2016/07/29/DDD-Entities-Value-Objects-Explained)
* **Eventual consistency**
- [How To Ensure Idempotency In An Eventual Consistent DDD/CQRS Application](http://blog.sapiensworks.com/post/2015/08/26/How-To-Ensure-Idempotency)
* [How To Ensure Idempotency In An Eventual Consistent DDD/CQRS Application](http://blog.sapiensworks.com/post/2015/08/26/How-To-Ensure-Idempotency)
by Mike Mogosanu
* [DDD Decoded - Don't Fear Eventual Consistency](http://blog.sapiensworks.com/post/2016/07/23/DDD-Eventual-Consistency)
* **Why _not_ to implement "unit of work" in DDD**
- [Unit Of Work is the new Singleton](http://blog.sapiensworks.com/post/2014/06/04/Unit-Of-Work-is-the-new-Singleton.aspx)
by Mike Mogosanu
- [The Unit of Work and Transactions In Domain Driven Design](http://blog.sapiensworks.com/post/2015/09/02/DDD-and-UoW/)
by Mike Mogosanu
* [Unit Of Work is the new Singleton](http://blog.sapiensworks.com/post/2014/06/04/Unit-Of-Work-is-the-new-Singleton.aspx)
by Mike Mogosanu
* [The Unit of Work and Transactions In Domain Driven Design](http://blog.sapiensworks.com/post/2015/09/02/DDD-and-UoW/)
by Mike Mogosanu


### Integration tests
Expand Down
11 changes: 10 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
### New in 0.59 (not released yet)
### New in 0.60 (not released yet)

* New: Implemented optimistic concurrency checks for MSSQL, SQLite and
Elasticsearch read models
* New: Added .NET standard support for EventStore
* New: Delete read models by invoking `context.MarkForDeletion()` in an Apply method
* Minor: Removed unnecessary transaction in EventStore persistance
* Fixed: Read model SQL schema is no longer ignored for `Table` attribute

### New in 0.59.3396 (released 2018-05-23)

* Fix: Commands are now correctly published when no events are emitted from a saga
after handling a domain event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using EventFlow.Logs;
using EventFlow.TestHelpers;
using EventFlow.TestHelpers.Aggregates.Commands;
using FluentAssertions;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.TestHost;
using Newtonsoft.Json;
Expand Down Expand Up @@ -74,6 +76,15 @@ public async Task PublishCommand()
await PostAsync("commands/ThingyPing/1", pingCommand).ConfigureAwait(false);
}

[Test]
public void PublishCommand_WithNull_ThrowsException()
{
// Arrange + Act
Action action = () => Task.WaitAll(PostAsync("commands/ThingyPing/1", null));

action.ShouldThrow<HttpRequestException>("because of command is null.");
}

private async Task<string> GetAsync(string url)
{
// Act
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ private async Task PublishCommandAsync(string name, int version, HttpContext con
private async Task WriteAsync(object obj, HttpStatusCode statusCode, HttpContext context)
{
var json = _jsonSerializer.Serialize(obj);
await context.Response.WriteAsync(json).ConfigureAwait(false);
context.Response.StatusCode = (int) statusCode;
await context.Response.WriteAsync(json).ConfigureAwait(false);
}

private Task WriteErrorAsync(string errorMessage, HttpStatusCode statusCode, HttpContext context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ namespace EventFlow.Elasticsearch.Tests.IntegrationTests.ReadModels
[ElasticsearchType(IdProperty = "Id", Name = "thingy")]
public class ElasticsearchThingyReadModel : IReadModel,
IAmReadModelFor<ThingyAggregate, ThingyId, ThingyDomainErrorAfterFirstEvent>,
IAmReadModelFor<ThingyAggregate, ThingyId, ThingyPingEvent>
IAmReadModelFor<ThingyAggregate, ThingyId, ThingyPingEvent>,
IAmReadModelFor<ThingyAggregate, ThingyId, ThingyDeletedEvent>
{
[Keyword(
Index = true)]
Expand Down Expand Up @@ -61,6 +62,11 @@ public void Apply(IReadModelContext context, IDomainEvent<ThingyAggregate, Thing
PingsReceived++;
}

public void Apply(IReadModelContext context, IDomainEvent<ThingyAggregate, ThingyId, ThingyDeletedEvent> domainEvent)
{
context.MarkForDeletion();
}

public Thingy ToThingy()
{
return new Thingy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net;
using EventFlow.Aggregates;
using EventFlow.Core;
using EventFlow.Core.RetryStrategies;
using EventFlow.Elasticsearch.ValueObjects;
using EventFlow.Exceptions;
using EventFlow.Extensions;
using EventFlow.Logs;
using EventFlow.ReadStores;
Expand All @@ -43,15 +46,18 @@ public class ElasticsearchReadModelStore<TReadModel> :
private readonly ILog _log;
private readonly IElasticClient _elasticClient;
private readonly IReadModelDescriptionProvider _readModelDescriptionProvider;
private readonly ITransientFaultHandler<IOptimisticConcurrencyRetryStrategy> _transientFaultHandler;

public ElasticsearchReadModelStore(
ILog log,
IElasticClient elasticClient,
IReadModelDescriptionProvider readModelDescriptionProvider)
IReadModelDescriptionProvider readModelDescriptionProvider,
ITransientFaultHandler<IOptimisticConcurrencyRetryStrategy> transientFaultHandler)
{
_log = log;
_elasticClient = elasticClient;
_readModelDescriptionProvider = readModelDescriptionProvider;
_transientFaultHandler = transientFaultHandler;
}

public async Task<ReadModelEnvelope<TReadModel>> GetAsync(
Expand Down Expand Up @@ -111,52 +117,83 @@ public class ElasticsearchReadModelStore<TReadModel> :
.ConfigureAwait(false);
}

public async Task UpdateAsync(
IReadOnlyCollection<ReadModelUpdate> readModelUpdates,
IReadModelContext readModelContext,
Func<IReadModelContext, IReadOnlyCollection<IDomainEvent>, ReadModelEnvelope<TReadModel>, CancellationToken, Task<ReadModelEnvelope<TReadModel>>> updateReadModel,
public async Task UpdateAsync(IReadOnlyCollection<ReadModelUpdate> readModelUpdates,
Func<IReadModelContext> readModelContextFactory,
Func<IReadModelContext, IReadOnlyCollection<IDomainEvent>, ReadModelEnvelope<TReadModel>, CancellationToken,
Task<ReadModelEnvelope<TReadModel>>> updateReadModel,
CancellationToken cancellationToken)
{
var readModelDescription = _readModelDescriptionProvider.GetReadModelDescription<TReadModel>();

_log.Verbose(() =>
{
var readModelIds = readModelUpdates
.Select(u => u.ReadModelId)
.Distinct()
.OrderBy(i => i)
.ToList();
return $"Updating read models of type '{typeof(TReadModel).PrettyPrint()}' with IDs '{string.Join(", ", readModelIds)}' in index '{readModelDescription.IndexName}'";
});
var readModelContext = readModelContextFactory();

foreach (var readModelUpdate in readModelUpdates)
{
var response = await _elasticClient.GetAsync<TReadModel>(
readModelUpdate.ReadModelId,
d => d
.RequestConfiguration(c => c
.AllowedStatusCodes((int)HttpStatusCode.NotFound))
.Index(readModelDescription.IndexName.Value),
cancellationToken)
await _transientFaultHandler.TryAsync(
c => UpdateReadModelAsync(readModelDescription, readModelUpdate, readModelContext, updateReadModel, c),
Label.Named("elasticsearch-read-model-update"),
cancellationToken)
.ConfigureAwait(false);
}
}

var readModelEnvelope = response.Found
? ReadModelEnvelope<TReadModel>.With(readModelUpdate.ReadModelId, response.Source, response.Version)
: ReadModelEnvelope<TReadModel>.Empty(readModelUpdate.ReadModelId);
private async Task UpdateReadModelAsync(
ReadModelDescription readModelDescription,
ReadModelUpdate readModelUpdate,
IReadModelContext readModelContext,
Func<IReadModelContext, IReadOnlyCollection<IDomainEvent>, ReadModelEnvelope<TReadModel>, CancellationToken, Task<ReadModelEnvelope<TReadModel>>> updateReadModel,
CancellationToken cancellationToken)
{
var response = await _elasticClient.GetAsync<TReadModel>(
readModelUpdate.ReadModelId,
d => d
.RequestConfiguration(c => c
.AllowedStatusCodes((int)HttpStatusCode.NotFound))
.Index(readModelDescription.IndexName.Value),
cancellationToken)
.ConfigureAwait(false);

var readModelEnvelope = response.Found
? ReadModelEnvelope<TReadModel>.With(readModelUpdate.ReadModelId, response.Source, response.Version)
: ReadModelEnvelope<TReadModel>.Empty(readModelUpdate.ReadModelId);

readModelEnvelope = await updateReadModel(
readModelContext,
readModelUpdate.DomainEvents,
readModelEnvelope,
cancellationToken)
.ConfigureAwait(false);

readModelEnvelope = await updateReadModel(readModelContext, readModelUpdate.DomainEvents, readModelEnvelope, cancellationToken).ConfigureAwait(false);
if (readModelContext.IsMarkedForDeletion)
{
await DeleteAsync(readModelUpdate.ReadModelId, cancellationToken);
return;
}

try
{
await _elasticClient.IndexAsync(
readModelEnvelope.ReadModel,
d => d
.RequestConfiguration(c => c)
.Id(readModelUpdate.ReadModelId)
.Index(readModelDescription.IndexName.Value)
.Version(readModelEnvelope.Version.GetValueOrDefault())
.VersionType(VersionType.ExternalGte),
cancellationToken)
d =>
{
d = d
.RequestConfiguration(c => c)
.Id(readModelUpdate.ReadModelId)
.Index(readModelDescription.IndexName.Value);
d = response.Found
? d.VersionType(VersionType.ExternalGte).Version(readModelEnvelope.Version.GetValueOrDefault())
: d.OpType(OpType.Create);
return d;
},
cancellationToken)
.ConfigureAwait(false);
}
catch (ElasticsearchClientException e)
when (e.Response?.HttpStatusCode == (int)HttpStatusCode.Conflict)
{
throw new OptimisticConcurrencyException(
$"Read model '{readModelUpdate.ReadModelId}' updated by another",
e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="../Common.props" />
<PropertyGroup>
<TargetFramework>net462</TargetFramework>
<TargetFrameworks>net462;netstandard2.0</TargetFrameworks>
<TreatWarningsAsErrors>True</TreatWarningsAsErrors>
<GenerateAssemblyInfo>True</GenerateAssemblyInfo>
<GeneratePackageOnBuild>False</GeneratePackageOnBuild>
Expand All @@ -23,9 +23,12 @@
<ItemGroup>
<Folder Include="Properties\" />
</ItemGroup>
<ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'net462'">
<PackageReference Include="EventStore.Client" Version="4.0.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="EventStore.ClientAPI.NetCore" Version="4.1.0.23" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\EventFlow\EventFlow.csproj" />
</ItemGroup>
Expand Down
Loading

0 comments on commit 4f2c3ee

Please sign in to comment.