Skip to content

Commit

Permalink
Merge pull request #1065 from aksio-insurtech:fix/storage-api
Browse files Browse the repository at this point in the history
Fix/storage-api
  • Loading branch information
einari committed Jan 14, 2024
2 parents 8528aaa + f47fc57 commit 5f417a4
Show file tree
Hide file tree
Showing 573 changed files with 3,666 additions and 2,886 deletions.
19 changes: 8 additions & 11 deletions Benchmarks/BenchmarkJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
using Aksio.Cratis;
using Aksio.Cratis.Events;
using Aksio.Cratis.Kernel.Configuration;
using Aksio.Cratis.Kernel.Schemas;
using Aksio.Cratis.Kernel.Storage;
using Aksio.Cratis.Kernel.Storage.EventTypes;
using Aksio.Cratis.Schemas;
using Aksio.Execution;
using Aksio.MongoDB;
using Microsoft.Extensions.DependencyInjection;
using MongoDB.Driver;
Expand All @@ -19,9 +19,9 @@ namespace Benchmarks;
public abstract class BenchmarkJob
{
protected IGrainFactory GrainFactory { get; private set; } = null!;
protected IExecutionContextManager? ExecutionContextManager { get; private set; }
protected IEventSerializer? EventSerializer { get; private set; }
protected ISchemaStore? SchemaStore { get; private set; }
protected IStorage? Storage { get; private set; }
protected IEventTypesStorage? EventTypesStorage { get; private set; }
protected IJsonSchemaGenerator? SchemaGenerator { get; private set; }
protected virtual IEnumerable<Type> EventTypes => Enumerable.Empty<Type>();

Expand All @@ -31,12 +31,11 @@ public abstract class BenchmarkJob
[GlobalSetup]
public void GlobalSetup()
{
SetExecutionContext();

GrainFactory = GlobalVariables.ServiceProvider.GetRequiredService<IGrainFactory>();
ExecutionContextManager = GlobalVariables.ServiceProvider.GetRequiredService<IExecutionContextManager>();
EventSerializer = GlobalVariables.ServiceProvider.GetRequiredService<IEventSerializer>();
SchemaStore = GlobalVariables.ServiceProvider.GetRequiredService<ISchemaStore>();
Storage = GlobalVariables.ServiceProvider.GetRequiredService<IStorage>();

EventTypesStorage = Storage.GetEventStore((string)GlobalVariables.MicroserviceId).EventTypes;
SchemaGenerator = GlobalVariables.ServiceProvider.GetRequiredService<IJsonSchemaGenerator>();

var configuration = GlobalVariables.ServiceProvider.GetRequiredService<Storage>();
Expand All @@ -54,7 +53,7 @@ public void GlobalSetup()
foreach (var eventType in EventTypes)
{
var eventTypeAttribute = eventType.GetCustomAttribute<EventTypeAttribute>()!;
SchemaStore.Register(eventTypeAttribute.Type, eventType.Name, SchemaGenerator.Generate(eventType));
EventTypesStorage.Register(eventTypeAttribute.Type, eventType.Name, SchemaGenerator.Generate(eventType));
}

Setup();
Expand All @@ -73,7 +72,5 @@ protected virtual void Setup()
{
}

protected void SetExecutionContext() => ExecutionContextManager?.Establish(GlobalVariables.TenantId, CorrelationId.New(), GlobalVariables.MicroserviceId);

protected JsonObject SerializeEvent(object @event) => EventSerializer!.Serialize(@event).GetAwaiter().GetResult();
}
9 changes: 2 additions & 7 deletions Benchmarks/EventSequences/EventLogJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Aksio.Cratis.EventSequences;
using Aksio.Cratis.Kernel.Grains.EventSequences;
using Aksio.Cratis.Kernel.MongoDB;
using Aksio.Cratis.Kernel.Storage.EventSequences;
using Aksio.Cratis.Kernel.Storage.MongoDB;
using Microsoft.Extensions.DependencyInjection;
using IEventSequence = Aksio.Cratis.Kernel.Grains.EventSequences.IEventSequence;

Expand All @@ -16,8 +16,6 @@ public abstract class EventLogJob : BenchmarkJob
[IterationSetup]
public void CleanEventStore()
{
SetExecutionContext();

Database?.DropCollection(WellKnownCollectionNames.EventLog);
Database?.DropCollection(WellKnownCollectionNames.EventSequences);
}
Expand All @@ -28,13 +26,10 @@ protected override void Setup()

var grainFactory = GlobalVariables.ServiceProvider.GetRequiredService<IGrainFactory>();
EventSequence = grainFactory.GetGrain<IEventSequence>(EventSequenceId.Log, keyExtension: new EventSequenceKey(GlobalVariables.MicroserviceId, GlobalVariables.TenantId));

SetExecutionContext();
}

protected async Task Perform(Func<IEventSequence, Task> action)
{
SetExecutionContext();
await action(EventSequence!);
}
}
5 changes: 1 addition & 4 deletions Benchmarks/Observation/ClientObserverJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Aksio.Cratis.Kernel.Grains.Observation;
using Aksio.Cratis.Kernel.MongoDB;
using Aksio.Cratis.Kernel.Storage.MongoDB;
using Aksio.Cratis.Observation;
using Benchmark.Model;

Expand All @@ -15,14 +15,11 @@ public class ClientObserverJob : BenchmarkJob
[IterationSetup]
public void CleanEventStore()
{
SetExecutionContext();

Database?.DropCollection(WellKnownCollectionNames.Observers);
}

protected override void Setup()
{
SetExecutionContext();
base.Setup();

var key = new ObserverKey(GlobalVariables.MicroserviceId, GlobalVariables.TenantId, GlobalVariables.ObserverEventSequence);
Expand Down
5 changes: 1 addition & 4 deletions Benchmarks/Projections/ProjectionJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

using Aksio.Cratis.EventSequences;
using Aksio.Cratis.Kernel.Grains.Observation;
using Aksio.Cratis.Kernel.MongoDB;
using Aksio.Cratis.Kernel.Storage.MongoDB;
using Aksio.Cratis.Observation;
using Benchmark.Model;

Expand All @@ -16,14 +16,11 @@ public class ProjectionJob : BenchmarkJob
[IterationSetup]
public void CleanEventStore()
{
SetExecutionContext();

Database?.DropCollection(WellKnownCollectionNames.Observers);
}

protected override void Setup()
{
SetExecutionContext();
base.Setup();

var key = new ObserverKey(GlobalVariables.MicroserviceId, GlobalVariables.TenantId, EventSequenceId.Log);
Expand Down
5 changes: 1 addition & 4 deletions Benchmarks/Reducers/ClientReducerJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Aksio.Cratis.Kernel.Grains.Observation;
using Aksio.Cratis.Kernel.MongoDB;
using Aksio.Cratis.Kernel.Storage.MongoDB;
using Aksio.Cratis.Observation;
using Benchmark.Model;

Expand All @@ -15,14 +15,11 @@ public class ClientReducerJob : BenchmarkJob
[IterationSetup]
public void CleanEventStore()
{
SetExecutionContext();

Database?.DropCollection(WellKnownCollectionNames.Observers);
}

protected override void Setup()
{
SetExecutionContext();
base.Setup();

var key = new ObserverKey(GlobalVariables.MicroserviceId, GlobalVariables.TenantId, GlobalVariables.ObserverEventSequence);
Expand Down
4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
<ItemGroup>
<PackageVersion Include="BenchmarkDotNet" Version="0.13.7" />
<PackageVersion Include="docfx.console" Version="2.59.3" />
<PackageVersion Include="Aksio.Fundamentals" Version="1.6.0" />
<PackageVersion Include="Aksio.MongoDB" Version="1.4.1" />
<PackageVersion Include="Aksio.Fundamentals" Version="1.6.1" />
<PackageVersion Include="Aksio.MongoDB" Version="1.4.6" />
<PackageVersion Include="Aksio.Applications" Version="$(ApplicationModel)" />
<PackageVersion Include="Aksio.Applications.CQRS" Version="$(ApplicationModel)" />
<PackageVersion Include="Aksio.Applications.CQRS.MongoDB" Version="$(ApplicationModel)" />
Expand Down
1 change: 0 additions & 1 deletion Samples/Banking/Bank/Reactions/Accounts/MoneyLaundering.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ public MoneyLaundering(IImmediateProjections immediateProjections, IEventLog eve

public async Task AccountOpened(DebitAccountOpened @event, EventContext context)
{
throw new NotImplementedException();
var count = await _immediateProjections.GetInstanceById<AccountsCounter>(context.EventSourceId);
if (count.Model.Count > 42)
{
Expand Down
2 changes: 1 addition & 1 deletion Source/Clients/DotNET/ClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void Build()
.AddTransient<IClientReducers, ClientReducers>()
.AddSingleton<IComplianceMetadataResolver, ComplianceMetadataResolver>()
.AddSingleton<IJsonSchemaGenerator, JsonSchemaGenerator>()
.AddSingleton<IEventTypes, EventTypes>()
.AddSingleton<IEventTypes, Events.EventTypes>()
.AddSingleton<IEventSerializer, EventSerializer>()
.AddSingleton<IExecutionContextManager, ExecutionContextManager>()
.AddSingleton<ITypes>(Types.Types.Instance)
Expand Down
1 change: 1 addition & 0 deletions Source/Clients/DotNET/Projections/ProjectionBuilderFor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Text.Json;
using Aksio.Cratis.Events;
using Aksio.Cratis.EventTypes;
using Aksio.Cratis.Models;
using Aksio.Cratis.Projections.Definitions;
using Aksio.Cratis.Schemas;
Expand Down
4 changes: 3 additions & 1 deletion Source/Clients/DotNET/Rules/Rules.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ public void ProjectTo(IRule rule, object? modelIdentifier = default)
rule.Identifier.Value,
modelIdentifier is null ? ModelKey.Unspecified : modelIdentifier.ToString()!).GetAwaiter().GetResult();

foreach (var property in rule.GetType().GetProperties(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance | BindingFlags.SetProperty))
var properties = rule.GetType().GetProperties(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance | BindingFlags.SetProperty);
properties = properties.Where(_ => _.CanWrite).ToArray();
foreach (var property in properties)
{
var name = property.Name.ToCamelCase();
var node = result.Model[name];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Aksio.Cratis.Events;
using Aksio.Cratis.EventSequences;
using Aksio.Cratis.Kernel.Storage.EventSequences;

namespace Aksio.Cratis.Specifications;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,29 @@
using System.Dynamic;
using Aksio.Cratis.Auditing;
using Aksio.Cratis.Events;
using Aksio.Cratis.EventSequences;
using Aksio.Cratis.Identities;
using Aksio.Cratis.Kernel.Storage.EventSequences;

namespace Aksio.Cratis.Specifications;

/// <summary>
/// Represents an in-memory implementation of <see cref="IEventSequenceStorage"/>.
/// </summary>
public class EventSequenceStorageProviderForSpecifications : IEventSequenceStorage
public class EventSequenceStorageForSpecifications : IEventSequenceStorage
{
readonly EventLogForSpecifications _eventLog;

/// <summary>
/// Initializes a new instance of the <see cref="EventSequenceStorageProviderForSpecifications"/> class.
/// Initializes a new instance of the <see cref="EventSequenceStorageForSpecifications"/> class.
/// </summary>
/// <param name="eventLog">The <see creF="EventLogForSpecifications"/>.</param>
public EventSequenceStorageProviderForSpecifications(EventLogForSpecifications eventLog)
public EventSequenceStorageForSpecifications(EventLogForSpecifications eventLog)
{
_eventLog = eventLog;
}

/// <inheritdoc/>
public Task<IEventCursor> GetFromSequenceNumber(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventSourceId? eventSourceId = null, IEnumerable<EventType>? eventTypes = null, CancellationToken cancellationToken = default)
public Task<IEventCursor> GetFromSequenceNumber(EventSequenceNumber sequenceNumber, EventSourceId? eventSourceId = null, IEnumerable<EventType>? eventTypes = null, CancellationToken cancellationToken = default)
{
var query = _eventLog.AppendedEvents.Where(_ => _.Metadata.SequenceNumber >= sequenceNumber);
if (eventSourceId is not null)
Expand All @@ -44,7 +44,7 @@ public Task<IEventCursor> GetFromSequenceNumber(EventSequenceId eventSequenceId,
}

/// <inheritdoc/>
public Task<IEventCursor> GetRange(EventSequenceId eventSequenceId, EventSequenceNumber start, EventSequenceNumber end, EventSourceId? eventSourceId = null, IEnumerable<EventType>? eventTypes = null, CancellationToken cancellationToken = default)
public Task<IEventCursor> GetRange(EventSequenceNumber start, EventSequenceNumber end, EventSourceId? eventSourceId = null, IEnumerable<EventType>? eventTypes = null, CancellationToken cancellationToken = default)
{
var query = _eventLog.AppendedEvents.Where(_ => _.Metadata.SequenceNumber >= start && _.Metadata.SequenceNumber <= end);
if (eventSourceId is not null)
Expand All @@ -61,16 +61,16 @@ public Task<IEventCursor> GetRange(EventSequenceId eventSequenceId, EventSequenc
}

/// <inheritdoc/>
public Task<EventSequenceNumber> GetHeadSequenceNumber(EventSequenceId eventSequenceId, IEnumerable<EventType>? eventTypes = null, EventSourceId? eventSourceId = null) => Task.FromResult(_eventLog.AppendedEvents.First().Metadata.SequenceNumber);
public Task<EventSequenceNumber> GetHeadSequenceNumber(IEnumerable<EventType>? eventTypes = null, EventSourceId? eventSourceId = null) => Task.FromResult(_eventLog.AppendedEvents.First().Metadata.SequenceNumber);

/// <inheritdoc/>
public Task<EventSequenceNumber> GetTailSequenceNumber(EventSequenceId eventSequenceId, IEnumerable<EventType>? eventTypes = null, EventSourceId? eventSourceId = null) => Task.FromResult(_eventLog.AppendedEvents.Last().Metadata.SequenceNumber);
public Task<EventSequenceNumber> GetTailSequenceNumber(IEnumerable<EventType>? eventTypes = null, EventSourceId? eventSourceId = null) => Task.FromResult(_eventLog.AppendedEvents.Last().Metadata.SequenceNumber);

/// <inheritdoc/>
public Task<IImmutableDictionary<EventType, EventSequenceNumber>> GetTailSequenceNumbersForEventTypes(EventSequenceId eventSequenceId, IEnumerable<EventType> eventTypes) => Task.FromResult<IImmutableDictionary<EventType, EventSequenceNumber>>(ImmutableDictionary<EventType, EventSequenceNumber>.Empty);
public Task<IImmutableDictionary<EventType, EventSequenceNumber>> GetTailSequenceNumbersForEventTypes(IEnumerable<EventType> eventTypes) => Task.FromResult<IImmutableDictionary<EventType, EventSequenceNumber>>(ImmutableDictionary<EventType, EventSequenceNumber>.Empty);

/// <inheritdoc/>
public Task<EventSequenceNumber> GetNextSequenceNumberGreaterOrEqualThan(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, IEnumerable<EventType>? eventTypes = null, EventSourceId? eventSourceId = null)
public Task<EventSequenceNumber> GetNextSequenceNumberGreaterOrEqualThan(EventSequenceNumber sequenceNumber, IEnumerable<EventType>? eventTypes = null, EventSourceId? eventSourceId = null)
{
var query = _eventLog.AppendedEvents.Where(_ => _.Metadata.SequenceNumber >= sequenceNumber);
if (eventSourceId is not null)
Expand All @@ -87,44 +87,50 @@ public Task<EventSequenceNumber> GetNextSequenceNumberGreaterOrEqualThan(EventSe
}

/// <inheritdoc/>
public Task<AppendedEvent> GetLastInstanceFor(EventSequenceId eventSequenceId, EventTypeId eventTypeId, EventSourceId eventSourceId)
public Task<AppendedEvent> GetLastInstanceFor(EventTypeId eventTypeId, EventSourceId eventSourceId)
{
var lastInstance = _eventLog.AppendedEvents.Where(_ => _.Metadata.Type.Id == eventTypeId && _.Context.EventSourceId == eventSourceId).OrderByDescending(_ => _.Metadata.SequenceNumber).First();
return Task.FromResult(new AppendedEvent(lastInstance.Metadata, lastInstance.Context, lastInstance.Content));
}

/// <inheritdoc/>
public Task<AppendedEvent> GetLastInstanceOfAny(EventSequenceId eventSequenceId, EventSourceId eventSourceId, IEnumerable<EventTypeId> eventTypes)
public Task<AppendedEvent> GetLastInstanceOfAny(EventSourceId eventSourceId, IEnumerable<EventTypeId> eventTypes)
{
var lastInstance = _eventLog.AppendedEvents.Where(_ => eventTypes.Any(et => et == _.Metadata.Type.Id) && _.Context.EventSourceId == eventSourceId).OrderByDescending(_ => _.Metadata.SequenceNumber).First();
return Task.FromResult(new AppendedEvent(lastInstance.Metadata, lastInstance.Context, lastInstance.Content));
}

/// <inheritdoc/>
public Task<bool> HasInstanceFor(EventSequenceId eventSequenceId, EventTypeId eventTypeId, EventSourceId eventSourceId)
public Task<bool> HasInstanceFor(EventTypeId eventTypeId, EventSourceId eventSourceId)
{
var count = _eventLog.AppendedEvents.Count(_ => _.Metadata.Type.Id == eventTypeId && _.Context.EventSourceId == eventSourceId);
return Task.FromResult(count > 0);
}

/// <inheritdoc/>
public Task Append(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventSourceId eventSourceId, EventType eventType, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain, DateTimeOffset occurred, DateTimeOffset validFrom, ExpandoObject content) => throw new NotImplementedException();
public Task Append(EventSequenceNumber sequenceNumber, EventSourceId eventSourceId, EventType eventType, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain, DateTimeOffset occurred, DateTimeOffset validFrom, ExpandoObject content) => throw new NotImplementedException();

/// <inheritdoc/>
public Task Compensate(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, EventType eventType, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain, DateTimeOffset occurred, DateTimeOffset validFrom, ExpandoObject content) => throw new NotImplementedException();
public Task Compensate(EventSequenceNumber sequenceNumber, EventType eventType, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain, DateTimeOffset occurred, DateTimeOffset validFrom, ExpandoObject content) => throw new NotImplementedException();

/// <inheritdoc/>
public Task<AppendedEvent> Redact(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber, RedactionReason reason, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain, DateTimeOffset occurred) => throw new NotImplementedException();
public Task<AppendedEvent> Redact(EventSequenceNumber sequenceNumber, RedactionReason reason, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain, DateTimeOffset occurred) => throw new NotImplementedException();

/// <inheritdoc/>
public Task<IEnumerable<EventType>> Redact(EventSequenceId eventSequenceId, EventSourceId eventSourceId, RedactionReason reason, IEnumerable<EventType>? eventTypes, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain, DateTimeOffset occurred) => throw new NotImplementedException();
public Task<IEnumerable<EventType>> Redact(EventSourceId eventSourceId, RedactionReason reason, IEnumerable<EventType>? eventTypes, IEnumerable<Causation> causation, IEnumerable<IdentityId> causedByChain, DateTimeOffset occurred) => throw new NotImplementedException();

/// <inheritdoc/>
public Task<AppendedEvent> GetEventAt(EventSequenceId eventSequenceId, EventSequenceNumber sequenceNumber) => throw new NotImplementedException();
public Task<AppendedEvent> GetEventAt(EventSequenceNumber sequenceNumber) => throw new NotImplementedException();

/// <inheritdoc/>
public Task<EventCount> GetCount(EventSequenceId eventSequenceId, EventSequenceNumber? lastEventSequenceNumber = null, IEnumerable<EventType>? eventTypes = null) => throw new NotImplementedException();
public Task<EventCount> GetCount(EventSequenceNumber? lastEventSequenceNumber = null, IEnumerable<EventType>? eventTypes = null) => throw new NotImplementedException();

/// <inheritdoc/>
public Task<TailEventSequenceNumbers> GetTailSequenceNumbers(EventSequenceId eventSequenceId, IEnumerable<EventType> eventTypes) => throw new NotImplementedException();
public Task<TailEventSequenceNumbers> GetTailSequenceNumbers(IEnumerable<EventType> eventTypes) => throw new NotImplementedException();

/// <inheritdoc/>
public Task<EventSequenceState> GetState() => throw new NotImplementedException();

/// <inheritdoc/>
public Task SaveState(EventSequenceState state) => throw new NotImplementedException();
}

0 comments on commit 5f417a4

Please sign in to comment.