Skip to content

Commit

Permalink
Primary ctors, simplify new() usage, implicit operators for serializa…
Browse files Browse the repository at this point in the history
…tion options, and flow LINQ serialization options in read calls.
  • Loading branch information
IEvangelist committed Sep 29, 2023
1 parent b5ac4eb commit 0788386
Show file tree
Hide file tree
Showing 66 changed files with 422 additions and 658 deletions.
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ file_header_template = Copyright (c) David Pine. All rights reserved.\nLicensed
indent_size = 4
tab_width = 4
trim_trailing_whitespace = true
end_of_line = lf

###############################
# .NET Coding Conventions #
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,20 @@

namespace ChangedFeedSamples.Shared.Processors;

public class BookChangeFeedProcessor : IItemChangeFeedProcessor<Book>
public class BookChangeFeedProcessor(ILogger<BookChangeFeedProcessor> logger,
IRepository<BookByIdReference> bookByIdReferenceRepository) : IItemChangeFeedProcessor<Book>
{
private readonly ILogger<BookChangeFeedProcessor> _logger;
private readonly IRepository<BookByIdReference> _bookByIdReferenceRepository;

public BookChangeFeedProcessor(ILogger<BookChangeFeedProcessor> logger,
IRepository<BookByIdReference> bookByIdReferenceRepository)
{
_logger = logger;
_bookByIdReferenceRepository = bookByIdReferenceRepository;
}

public async ValueTask HandleAsync(Book rating, CancellationToken cancellationToken)
{
_logger.LogInformation("Change detected for book with ID: {BookId}", rating.Id);
logger.LogInformation("Change detected for book with ID: {BookId}", rating.Id);

if (!rating.HasBeenUpdated)
{
await _bookByIdReferenceRepository
await bookByIdReferenceRepository
.CreateAsync(new BookByIdReference(rating.Id, rating.Category),
cancellationToken);
}

_logger.LogInformation("Processed change for book with ID: {BookId}", rating.Id);
logger.LogInformation("Processed change for book with ID: {BookId}", rating.Id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,8 @@

namespace Microsoft.Azure.CosmosEventSourcing.Builders;

internal class DefaultCosmosEventSourcingBuilder : ICosmosEventSourcingBuilder
internal class DefaultCosmosEventSourcingBuilder(IServiceCollection services) : ICosmosEventSourcingBuilder
{
private readonly IServiceCollection _services;

public DefaultCosmosEventSourcingBuilder(IServiceCollection services) =>
_services = services;

public IEventItemProjectionBuilder<TEventItem, TProjectionKey> AddEventItemProjection<TEventItem, TProjectionKey, TProjection>(
Action<EventSourcingProcessorOptions<TEventItem, TProjectionKey>>? optionsAction = null)
where TEventItem : EventItem
Expand All @@ -30,12 +25,12 @@ internal class DefaultCosmosEventSourcingBuilder : ICosmosEventSourcingBuilder
EventSourcingProcessorOptions<TEventItem, TProjectionKey> options = new();
optionsAction?.Invoke(options);

_services.AddSingleton(options);
_services.AddScoped<IEventItemProjection<TEventItem, TProjectionKey>, TProjection>();
_services.AddSingleton<IEventSourcingProcessor, DefaultEventSourcingProcessor<TEventItem, TProjectionKey>>();
services.AddSingleton(options);
services.AddScoped<IEventItemProjection<TEventItem, TProjectionKey>, TProjection>();
services.AddSingleton<IEventSourcingProcessor, DefaultEventSourcingProcessor<TEventItem, TProjectionKey>>();

return new EventItemProjectionBuilder<TEventItem, TProjectionKey>(
_services,
services,
this);
}

Expand All @@ -47,10 +42,10 @@ internal class DefaultCosmosEventSourcingBuilder : ICosmosEventSourcingBuilder
EventSourcingProcessorOptions<TEventItem, TProjectionKey> options = new();
optionsAction?.Invoke(options);

_services.AddSingleton(options);
_services
services.AddSingleton(options);
services
.AddScoped<IEventItemProjection<TEventItem, TProjectionKey>, DefaultDomainEventProjection<TEventItem, TProjectionKey>>();
_services.AddSingleton<IEventSourcingProcessor, DefaultEventSourcingProcessor<TEventItem, TProjectionKey>>();
services.AddSingleton<IEventSourcingProcessor, DefaultEventSourcingProcessor<TEventItem, TProjectionKey>>();
return this;
}

Expand Down Expand Up @@ -80,7 +75,7 @@ internal class DefaultCosmosEventSourcingBuilder : ICosmosEventSourcingBuilder
assemblies = AppDomain.CurrentDomain.GetAssemblies();
}

_services.Scan(x => x.FromAssemblies(assemblies)
services.Scan(x => x.FromAssemblies(assemblies)
.AddClasses(classes => classes.AssignableTo(typeof(IDomainEventProjection<,,>)))
.AsImplementedInterfaces()
.WithScopedLifetime());
Expand All @@ -92,7 +87,7 @@ internal class DefaultCosmosEventSourcingBuilder : ICosmosEventSourcingBuilder
Action<RepositoryOptions>? setupAction = default,
Action<CosmosClientOptions>? additionSetupAction = default)
{
_services.AddCosmosRepository(options =>
services.AddCosmosRepository(options =>
{
options.ContainerPerItemType = true;
setupAction?.Invoke(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,23 @@

namespace Microsoft.Azure.CosmosEventSourcing.Builders;

internal class EventItemProjectionBuilder<TEventItem, TProjectionKey> :
internal class EventItemProjectionBuilder<TEventItem, TProjectionKey>(
IServiceCollection services,
ICosmosEventSourcingBuilder eventSourcingBuilder) :
IEventItemProjectionBuilder<TEventItem, TProjectionKey>
where TEventItem : EventItem
where TProjectionKey : IProjectionKey
{
private readonly IServiceCollection _services;

public EventItemProjectionBuilder(
IServiceCollection services,
ICosmosEventSourcingBuilder eventSourcingBuilder)
{
_services = services;
EventSourcingBuilder = eventSourcingBuilder;
}

public ICosmosEventSourcingBuilder EventSourcingBuilder { get; }
public ICosmosEventSourcingBuilder EventSourcingBuilder { get; } = eventSourcingBuilder;
public IEventItemProjectionBuilder<TEventItem, TProjectionKey> WithDeadLetterDecorator(
Action<DeadLetterOptions<TEventItem, TProjectionKey>>? optionsAction = null)
{
_services.Decorate<IEventItemProjection<TEventItem, TProjectionKey>,
services.Decorate<IEventItemProjection<TEventItem, TProjectionKey>,
DeadLetterProjectionDecorator<TEventItem, TProjectionKey>>();

DeadLetterOptions<TEventItem, TProjectionKey> options = new();
optionsAction?.Invoke(options);
_services.AddSingleton(options);
services.AddSingleton(options);

return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,74 +13,60 @@

namespace Microsoft.Azure.CosmosEventSourcing.ChangeFeed;

internal class DefaultEventSourcingProcessor<TSourcedEvent, TProjectionKey> : IEventSourcingProcessor
internal class DefaultEventSourcingProcessor<TSourcedEvent, TProjectionKey>(
EventSourcingProcessorOptions<TSourcedEvent, TProjectionKey> options,
ICosmosContainerService containerService,
ILeaseContainerProvider leaseContainerProvider,
ILogger<DefaultEventSourcingProcessor<TSourcedEvent, TProjectionKey>> logger,
IServiceProvider serviceProvider) : IEventSourcingProcessor
where TSourcedEvent : EventItem
where TProjectionKey : IProjectionKey
{
private readonly EventSourcingProcessorOptions<TSourcedEvent, TProjectionKey> _options;
private readonly ICosmosContainerService _containerService;
private readonly ILeaseContainerProvider _leaseContainerProvider;
private readonly ILogger<DefaultEventSourcingProcessor<TSourcedEvent, TProjectionKey>> _logger;
private readonly IServiceProvider _serviceProvider;
private ChangeFeedProcessor? _processor;

public DefaultEventSourcingProcessor(
EventSourcingProcessorOptions<TSourcedEvent, TProjectionKey> options,
ICosmosContainerService containerService,
ILeaseContainerProvider leaseContainerProvider,
ILogger<DefaultEventSourcingProcessor<TSourcedEvent, TProjectionKey>> logger,
IServiceProvider serviceProvider)
{
_options = options;
_containerService = containerService;
_leaseContainerProvider = leaseContainerProvider;
_logger = logger;
_serviceProvider = serviceProvider;
}

public async Task StartAsync()
{
Container itemContainer = await _containerService.GetContainerAsync<TSourcedEvent>();
Container leaseContainer = await _leaseContainerProvider.GetLeaseContainerAsync();
Container itemContainer = await containerService.GetContainerAsync<TSourcedEvent>();
Container leaseContainer = await leaseContainerProvider.GetLeaseContainerAsync();

ChangeFeedProcessorBuilder builder = itemContainer
.GetChangeFeedProcessorBuilder<TSourcedEvent>(_options.ProcessorName, (changes, token) =>
.GetChangeFeedProcessorBuilder<TSourcedEvent>(options.ProcessorName, (changes, token) =>
OnChangesAsync(changes, itemContainer.Id, token))
.WithLeaseContainer(leaseContainer)
.WithInstanceName(_options.InstanceName)
.WithInstanceName(options.InstanceName)
.WithErrorNotification((_, exception) => OnErrorAsync(exception, itemContainer.Id));

if (_options.PollInterval.HasValue)
if (options.PollInterval.HasValue)
{
builder.WithPollInterval(_options.PollInterval.Value);
builder.WithPollInterval(options.PollInterval.Value);
}

_processor = builder.Build();

_logger.LogInformation("Starting change feed processor for container {ContainerName} with key {ProjectionKey} and processor name {ProcessorName}",
logger.LogInformation("Starting change feed processor for container {ContainerName} with key {ProjectionKey} and processor name {ProcessorName}",
itemContainer.Id,
typeof(TProjectionKey).Name,
_options.ProcessorName);
options.ProcessorName);

await _processor.StartAsync();

_logger.LogInformation("Successfully started change feed processor for container {ContainerName} with key {ProjectionKey} and processor name {ProcessorName}",
logger.LogInformation("Successfully started change feed processor for container {ContainerName} with key {ProjectionKey} and processor name {ProcessorName}",
itemContainer.Id,
typeof(TProjectionKey).Name,
_options.ProcessorName);
options.ProcessorName);
}

private async Task OnChangesAsync(
IReadOnlyCollection<TSourcedEvent> changes,
string containerName,
CancellationToken cancellationToken)
{
_logger.LogDebug("Detected changes for container {ContainerName} total ({ChangesCount})",
logger.LogDebug("Detected changes for container {ContainerName} total ({ChangesCount})",
containerName, changes.Count);

foreach (TSourcedEvent change in changes)
{
using IServiceScope scope = _serviceProvider.CreateScope();
using IServiceScope scope = serviceProvider.CreateScope();
IEventItemProjection<TSourcedEvent, TProjectionKey> projection = scope.ServiceProvider
.GetRequiredService<IEventItemProjection<TSourcedEvent, TProjectionKey>>();

Expand All @@ -93,7 +79,7 @@ public async Task StartAsync()
}
catch (Exception e)
{
_logger.LogError(e,
logger.LogError(e,
"Failed handling projection for container {ContainerName} source event ID {SourcedEventId}",
containerName, change.Id);
}
Expand All @@ -102,7 +88,7 @@ public async Task StartAsync()

private Task OnErrorAsync(Exception exception, string containerName)
{
_logger.LogError(exception, "Failed handling when handling changes detected from container {ContainerName}",
logger.LogError(exception, "Failed handling when handling changes detected from container {ContainerName}",
containerName);
return Task.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,8 @@

namespace Microsoft.Azure.CosmosEventSourcing.ChangeFeed;

internal class DefaultEventSourcingProvider : IChangeFeedContainerProcessorProvider
internal class DefaultEventSourcingProvider(IEnumerable<IEventSourcingProcessor> processors) : IChangeFeedContainerProcessorProvider
{
private readonly IEnumerable<IEventSourcingProcessor> _processors;

public DefaultEventSourcingProvider(IEnumerable<IEventSourcingProcessor> processors) =>
_processors = processors;

public IEnumerable<IContainerChangeFeedProcessor> GetProcessors() =>
_processors;
processors;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,9 @@ namespace Microsoft.Azure.CosmosEventSourcing.Exceptions;
/// An exception that is thrown when not events are provided to an <see cref="IAggregateRoot"/>.
/// </summary>
/// <remarks>An <see cref="IAggregateRoot"/> must be provided at least one <see cref="DomainEvent"/> when replaying events</remarks>
public class DomainEventsRequiredException : Exception
/// <remarks>
/// Creates a <see cref="DomainEventsRequiredException"/>.
/// </remarks>
public class DomainEventsRequiredException(Type aggregateRootType) : Exception($"At least 1 {nameof(AtomicEvent)} must be provided for {aggregateRootType.Namespace} when replaying events")
{
/// <summary>
/// Creates a <see cref="DomainEventsRequiredException"/>.
/// </summary>
public DomainEventsRequiredException(Type aggregateRootType) :
base($"At least 1 {nameof(AtomicEvent)} must be provided for {aggregateRootType.Namespace} when replaying events")
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,10 @@ namespace Microsoft.Azure.CosmosEventSourcing.Exceptions;
/// <summary>
/// Exception which is thrown the PersistAsync is used with the aggregate overload and the aggregate doesn't provide the attribute
/// </summary>
public class EventItemPartitionKeyAttributeRequiredException : Exception
/// <remarks>
/// Ceates a new <see cref="EventItemPartitionKeyAttributeRequiredException" />
/// </remarks>
/// <param name="aggregateType">Type of the aggregate which had the invalid configuration. Used to build up the message</param>
public class EventItemPartitionKeyAttributeRequiredException(Type aggregateType) : Exception($"A {nameof(EventItemPartitionKeyAttribute)} must be present on a property in {aggregateType.Name} or you must specify the partition key explicitly")
{
/// <summary>
/// Ceates a new <see cref="EventItemPartitionKeyAttributeRequiredException" />
/// </summary>
/// <param name="aggregateType">Type of the aggregate which had the invalid configuration. Used to build up the message</param>
public EventItemPartitionKeyAttributeRequiredException(Type aggregateType) :
base($"A {nameof(EventItemPartitionKeyAttribute)} must be present on a property in {aggregateType.Name} or you must specify the partition key explicitly")
{

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,10 @@ namespace Microsoft.Azure.CosmosEventSourcing.Exceptions;
/// <summary>
/// Exception which is thrown the PersistAsync is used with the aggregate overload and the aggregate provides multiple InvalidEventItemPartitionKeyAttributeCombinationException
/// </summary>
public class InvalidEventItemPartitionKeyAttributeCombinationException : Exception
/// <remarks>
/// Ceates a new <see cref="InvalidEventItemPartitionKeyAttributeCombinationException" />
/// </remarks>
/// <param name="aggregateType">Type of the aggregate which had the invalid configuration. Used to build up the message</param>
public class InvalidEventItemPartitionKeyAttributeCombinationException(Type aggregateType) : Exception($"{nameof(EventItemPartitionKeyAttribute)} can not be present on multiple properties in {aggregateType.Name}")
{
/// <summary>
/// Ceates a new <see cref="InvalidEventItemPartitionKeyAttributeCombinationException" />
/// </summary>
/// <param name="aggregateType">Type of the aggregate which had the invalid configuration. Used to build up the message</param>
public InvalidEventItemPartitionKeyAttributeCombinationException(Type aggregateType) :
base($"{nameof(EventItemPartitionKeyAttribute)} can not be present on multiple properties in {aggregateType.Name}")
{

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,12 @@ namespace Microsoft.Azure.CosmosEventSourcing.Exceptions;
/// <summary>
/// Thrown when the value of the partition key is null
/// </summary>
public class InvalidPartitionKeyValueException : Exception
/// <remarks>
/// Ceates a new <see cref="InvalidPartitionKeyValueException" />
/// </remarks>
/// <param name="propertyName">The name of the property that was null</param>
/// <param name="aggregateType">Type of the aggregate which had the invalid configuration. Used to build up the message</param>
public class InvalidPartitionKeyValueException(string propertyName, Type aggregateType) : Exception(
$"{propertyName} in {aggregateType.Name} was null. This is not a valid partition key value")
{
/// <summary>
/// Ceates a new <see cref="InvalidPartitionKeyValueException" />
/// </summary>
/// <param name="propertyName">The name of the property that was null</param>
/// <param name="aggregateType">Type of the aggregate which had the invalid configuration. Used to build up the message</param>
public InvalidPartitionKeyValueException(string propertyName, Type aggregateType) :
base(
$"{propertyName} in {aggregateType.Name} was null. This is not a valid partition key value")
{

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,9 @@ namespace Microsoft.Azure.CosmosEventSourcing.Exceptions;
/// <summary>
/// Thrown when using the _eventStore.ReadAggregateAsync{TAggregateRoot} method and there is not static method named Replay defined.
/// </summary>
public class ReplayMethodNotDefinedException : Exception
/// <remarks>
/// Creates an <see cref="ReplayMethodNotDefinedException"/>
/// </remarks>
public class ReplayMethodNotDefinedException(MemberInfo aggregateType) : Exception($"The {nameof(IAggregateRoot)} of type {aggregateType.Name} does not have a public static TAggregateRoot Replay(List<DomainEvent> events) method defined")
{
/// <summary>
/// Creates an <see cref="ReplayMethodNotDefinedException"/>
/// </summary>
public ReplayMethodNotDefinedException(MemberInfo aggregateType) :
base($"The {nameof(IAggregateRoot)} of type {aggregateType.Name} does not have a public static TAggregateRoot Replay(List<DomainEvent> events) method defined")
{

}
}

0 comments on commit 0788386

Please sign in to comment.