Skip to content

Commit

Permalink
Refactoring SQL support (#298)
Browse files Browse the repository at this point in the history
* Add base SQL classes
* Added polling query incremental delays to subscriptions, fixes #224
* Allow subscriptions to restart properly
* Added TaskRunner
* [BREAKING] Adding stream name going back and forth but at least it will be consistent
* [BREAKING] Changed the SQL Server config to use the connection string
* Added SQL Server DI registrations similar to Postgres
  • Loading branch information
alexeyzimarev committed Nov 23, 2023
1 parent 1f3152f commit bd59c90
Show file tree
Hide file tree
Showing 82 changed files with 1,375 additions and 1,315 deletions.
27 changes: 27 additions & 0 deletions Eventuous.sln
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,18 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Testing", "src\Te
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{E86AACC6-7131-4BE5-B77F-2CD57A1837B8}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Relational", "Relational", "{4E45624D-DE07-4991-862D-915E332D858C}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Base", "Base", "{1C69B015-B73B-4C6C-ACE6-B73614AA3425}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Sql.Base", "src\Relational\src\Eventuous.Sql.Base\Eventuous.Sql.Base.csproj", "{C7EE70E2-F808-417E-B929-02B37FE9D106}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{C0C0D751-0EAD-47BB-8947-0332D64D3E54}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{8C660C82-BC45-4154-B82E-FFDA7D365ACC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.Persistence.Base", "src\Core\test\Eventuous.Tests.Persistence.Base\Eventuous.Tests.Persistence.Base.csproj", "{8E238CA1-1EFF-403F-A15C-DCD03DE165DD}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -389,6 +401,14 @@ Global
{2FF9E7A1-DB6B-4552-98E3-4E2C524A366C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2FF9E7A1-DB6B-4552-98E3-4E2C524A366C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2FF9E7A1-DB6B-4552-98E3-4E2C524A366C}.Release|Any CPU.Build.0 = Release|Any CPU
{C7EE70E2-F808-417E-B929-02B37FE9D106}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C7EE70E2-F808-417E-B929-02B37FE9D106}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C7EE70E2-F808-417E-B929-02B37FE9D106}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C7EE70E2-F808-417E-B929-02B37FE9D106}.Release|Any CPU.Build.0 = Release|Any CPU
{8E238CA1-1EFF-403F-A15C-DCD03DE165DD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8E238CA1-1EFF-403F-A15C-DCD03DE165DD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8E238CA1-1EFF-403F-A15C-DCD03DE165DD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8E238CA1-1EFF-403F-A15C-DCD03DE165DD}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -469,6 +489,13 @@ Global
{1FCCE816-36FB-4B19-AB6B-72CA46AC2FBE} = {A6762149-8450-468B-A28F-1D3F6E586247}
{E86AACC6-7131-4BE5-B77F-2CD57A1837B8} = {CF1A32AC-2BAF-4EC9-BDF2-F85A6FD36E05}
{2FF9E7A1-DB6B-4552-98E3-4E2C524A366C} = {E86AACC6-7131-4BE5-B77F-2CD57A1837B8}
{D9CA9EF0-F274-458E-9C63-5C741FD39701} = {4E45624D-DE07-4991-862D-915E332D858C}
{2D26B064-EDCE-4528-9A7B-28BEEAD7ACB8} = {4E45624D-DE07-4991-862D-915E332D858C}
{1C69B015-B73B-4C6C-ACE6-B73614AA3425} = {4E45624D-DE07-4991-862D-915E332D858C}
{C0C0D751-0EAD-47BB-8947-0332D64D3E54} = {1C69B015-B73B-4C6C-ACE6-B73614AA3425}
{C7EE70E2-F808-417E-B929-02B37FE9D106} = {C0C0D751-0EAD-47BB-8947-0332D64D3E54}
{8C660C82-BC45-4154-B82E-FFDA7D365ACC} = {1C69B015-B73B-4C6C-ACE6-B73614AA3425}
{8E238CA1-1EFF-403F-A15C-DCD03DE165DD} = {0ED6785B-60EF-46B4-B938-EF04189FC8BC}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {0691467B-C257-46DB-BC4F-88EB7CD615B8}
Expand Down
2 changes: 1 addition & 1 deletion src/Core/src/Eventuous.Shared/Tools/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static ConfiguredCancelableAsyncEnumerable<T> NoContext<T>(this IAsyncEnu
}

public static ConfiguredTaskAwaitable NoThrow(this Task task) {
#if NET8_0
#if NET8_0_OR_GREATER
return task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
#else
return Try(task.ConfigureAwait(false)).ConfigureAwait(false);
Expand Down
53 changes: 53 additions & 0 deletions src/Core/src/Eventuous.Shared/Tools/TaskRunner.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (C) Ubiquitous AS.All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous.Tools;

public sealed class TaskRunner(Func<CancellationToken, Task> taskFactory) : IDisposable {
readonly CancellationTokenSource _stopSource = new();

Task? _runner;

public TaskRunner Start() {
_runner = Task.Run(Run);

return this;

async Task Run() {
try {
await taskFactory(_stopSource.Token);
} catch (OperationCanceledException) {
// ignore
}
}
}

public async ValueTask Stop(CancellationToken cancellationToken) {
if (_runner == null) return;

try {
#if NET8_0_OR_GREATER
await _stopSource.CancelAsync();
#else
_stopSource.Cancel();
#endif
// if (_runner != null) await _runner.NoContext();
} finally {
var state = new TaskCompletionSource<object>();
var registration = cancellationToken.Register((s => (((TaskCompletionSource<object>)s!)!).SetCanceled(cancellationToken)), state);

try {
await Task.WhenAny(_runner, state.Task).NoContext();
} finally {
await registration.DisposeAsync();
}

registration = new CancellationTokenRegistration();
}
}

public void Dispose() {
_stopSource.Dispose();
_runner?.Dispose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public ValueTask Write(T element, CancellationToken cancellationToken)
public async ValueTask DisposeAsync() {
_stopping = true;
await _channel.Stop(_cts, _readerTasks, OnDispose).NoContext();
#if NET8_0
#if NET8_0_OR_GREATER
await _cts.CancelAsync().NoContext();
#else
_cts.Cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public sealed class CheckpointCommitHandler : IAsyncDisposable {
readonly ILoggerFactory? _loggerFactory;
readonly string _subscriptionId;
readonly CommitCheckpoint _commitCheckpoint;
readonly CommitPositionSequence _positions = new();
readonly CommitPositionSequence _positions = [];
readonly BatchedChannelWorker<CommitPosition> _worker;

CommitPosition _lastCommit = CommitPosition.None;
Expand Down
24 changes: 19 additions & 5 deletions src/Core/src/Eventuous.Subscriptions/EventSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Eventuous.Subscriptions;
using Filters;
using Logging;

public abstract class EventSubscription<T> : IMessageSubscription where T : SubscriptionOptions {
public abstract class EventSubscription<T> : IMessageSubscription, IAsyncDisposable where T : SubscriptionOptions {
[PublicAPI]
public bool IsRunning { get; set; }

Expand All @@ -29,6 +29,8 @@ public abstract class EventSubscription<T> : IMessageSubscription where T : Subs
protected LogContext Log { get; }
protected CancellationTokenSource Stopping { get; } = new();

protected ulong Sequence;

protected EventSubscription(T options, ConsumePipe consumePipe, ILoggerFactory? loggerFactory) {
Ensure.NotEmptyString(options.SubscriptionId);

Expand Down Expand Up @@ -61,18 +63,18 @@ public abstract class EventSubscription<T> : IMessageSubscription where T : Subs
await Unsubscribe(cancellationToken).NoContext();
Log.InfoLog?.Log("Unsubscribed");
onUnsubscribed(Options.SubscriptionId);
await Pipe.DisposeAsync().NoContext();
await Finalize(cancellationToken);
Sequence = 0;
}

protected virtual ValueTask Finalize(CancellationToken cancellationToken) => default;

// ReSharper disable once CognitiveComplexity
protected async ValueTask Handler(IMessageConsumeContext context) {
var scope = new Dictionary<string, object> {
{"SubscriptionId", SubscriptionId},
{"Stream", context.Stream},
{"MessageType", context.MessageType},
{ "SubscriptionId", SubscriptionId },
{ "Stream", context.Stream },
{ "MessageType", context.MessageType },
};

// ReSharper disable once NullCoalescingConditionIsAlwaysNotNullAccordingToAPIContract
Expand Down Expand Up @@ -210,6 +212,18 @@ public abstract class EventSubscription<T> : IMessageSubscription where T : Subs
}
);
}

bool _disposed;

public async ValueTask DisposeAsync() {
if (_disposed) return;

await Pipe.DisposeAsync();

// Stopping.Dispose();
_disposed = true;
GC.SuppressFinalize(this);
}
}

public record struct EventPosition(ulong? Position, DateTime Created) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@ namespace Eventuous.Subscriptions;
using Filters;
using Logging;

public enum SubscriptionKind {
Stream,
All
}

public abstract class EventSubscriptionWithCheckpoint<T>(
T options,
ICheckpointStore checkpointStore,
ConsumePipe consumePipe,
int concurrencyLimit,
SubscriptionKind kind,
ILoggerFactory? loggerFactory
)
: EventSubscription<T>(Ensure.NotNull(options), ConfigurePipe(consumePipe, concurrencyLimit), loggerFactory) where T : SubscriptionWithCheckpointOptions {
Expand All @@ -26,17 +32,19 @@ public abstract class EventSubscriptionWithCheckpoint<T>(
static ConsumePipe ConfigurePipe(ConsumePipe pipe, int concurrencyLimit)
=> PipelineIsAsync(pipe) ? pipe : pipe.AddFilterFirst(new AsyncHandlingFilter((uint)concurrencyLimit));

EventPosition? LastProcessed { get; set; }
CheckpointCommitHandler CheckpointCommitHandler { get; } = new(
options.SubscriptionId,
checkpointStore,
TimeSpan.FromMilliseconds(options.CheckpointCommitDelayMs),
options.CheckpointCommitBatchSize,
loggerFactory
);
ICheckpointStore CheckpointStore { get; } = Ensure.NotNull(checkpointStore);
EventPosition? LastProcessed { get; set; }
CheckpointCommitHandler? CheckpointCommitHandler { get; set; }
ICheckpointStore CheckpointStore { get; } = Ensure.NotNull(checkpointStore);

protected abstract EventPosition GetPositionFromContext(IMessageConsumeContext context);
protected SubscriptionKind Kind { get; } = kind;

EventPosition GetPositionFromContext(IMessageConsumeContext context)
#pragma warning disable CS8524
=> Kind switch {
#pragma warning restore CS8524
SubscriptionKind.All => EventPosition.FromAllContext(context),
SubscriptionKind.Stream => EventPosition.FromContext(context)
};

protected async ValueTask HandleInternal(IMessageConsumeContext context) {
try {
Expand All @@ -58,21 +66,31 @@ static ConsumePipe ConfigurePipe(ConsumePipe pipe, int concurrencyLimit)
var eventPosition = GetPositionFromContext(context);
LastProcessed = eventPosition;

context.LogContext.TraceLog?.Log("Message {Type} acknowledged at {Position} {P}", context.MessageType, context.GlobalPosition, eventPosition.Position!.Value);
context.LogContext.MessageAcked(context.MessageType, context.GlobalPosition);

return CheckpointCommitHandler.Commit(
return CheckpointCommitHandler!.Commit(
new CommitPosition(eventPosition.Position!.Value, context.Sequence, eventPosition.Created) { LogContext = context.LogContext },
context.CancellationToken
);
}

ValueTask Nack(IMessageConsumeContext context, Exception exception) {
context.LogContext.WarnLog?.Log(exception, "Message {Type} not acknowledged at {Position}", context.MessageType, context.GlobalPosition);
context.LogContext.MessageNacked(context.MessageType, context.GlobalPosition, exception);

return Options.ThrowOnError ? throw exception : Ack(context);
}

protected async Task<Checkpoint> GetCheckpoint(CancellationToken cancellationToken) {
if (CheckpointCommitHandler == null) {
CheckpointCommitHandler = new CheckpointCommitHandler(
options.SubscriptionId,
checkpointStore,
TimeSpan.FromMilliseconds(options.CheckpointCommitDelayMs),
options.CheckpointCommitBatchSize,
LoggerFactory
);
}

if (IsRunning && LastProcessed != null) { return new Checkpoint(Options.SubscriptionId, LastProcessed?.Position); }

Logger.Current = Log;
Expand All @@ -84,5 +102,10 @@ static ConsumePipe ConfigurePipe(ConsumePipe pipe, int concurrencyLimit)
return checkpoint;
}

protected override ValueTask Finalize(CancellationToken cancellationToken) => CheckpointCommitHandler.DisposeAsync();
protected override async ValueTask Finalize(CancellationToken cancellationToken) {
if (CheckpointCommitHandler == null) return;

await CheckpointCommitHandler.DisposeAsync();
CheckpointCommitHandler = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,9 @@ public static void ThrowOnErrorIncompatible(this LogContext log)

public static void FailedToHandleMessageWithRetry(this LogContext log, string handlerType, string messageType, int retryCount, Exception exception)
=> log.ErrorLog?.Log(exception, "Failed to handle message {MessageType} with {HandlerType} after {RetryCount} retries", messageType, handlerType, retryCount);

public static void MessageAcked(this LogContext log, string messageType, ulong position) => log.TraceLog?.Log("Message {Type} acknowledged at {Position}", messageType, position);

public static void MessageNacked(this LogContext log, string messageType, ulong position, Exception exception)
=> log.WarnLog?.Log(exception, "Message {Type} not acknowledged at {Position}", messageType, position);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ namespace Microsoft.Extensions.DependencyInjection;
[PublicAPI]
public static class SubscriptionRegistrationExtensions {
public static IServiceCollection AddSubscription<T, TOptions>(
this IServiceCollection services,
string subscriptionId,
Action<SubscriptionBuilder<T, TOptions>> configureSubscription
) where T : EventSubscription<TOptions> where TOptions : SubscriptionOptions {
this IServiceCollection services,
string subscriptionId,
Action<SubscriptionBuilder<T, TOptions>> configureSubscription
) where T : EventSubscription<TOptions> where TOptions : SubscriptionOptions {
Ensure.NotNull(configureSubscription);
var builder = new SubscriptionBuilder<T, TOptions>(Ensure.NotNull(services), Ensure.NotEmptyString(subscriptionId));
configureSubscription(builder);
Expand All @@ -32,7 +32,7 @@ public static class SubscriptionRegistrationExtensions {
return services
.AddSubscriptionBuilder(builder)
.AddSingleton(sp => GetBuilder(sp).ResolveSubscription(sp))
.AddSingleton<IHostedService>(
.AddSingleton<IHostedService, SubscriptionHostedService>(
sp =>
new SubscriptionHostedService(
GetBuilder(sp).ResolveSubscription(sp),
Expand All @@ -41,11 +41,11 @@ public static class SubscriptionRegistrationExtensions {
)
);

SubscriptionBuilder<T, TOptions> GetBuilder(IServiceProvider sp)
=> sp.GetSubscriptionBuilder<T, TOptions>(subscriptionId);
SubscriptionBuilder<T, TOptions> GetBuilder(IServiceProvider sp) => sp.GetSubscriptionBuilder<T, TOptions>(subscriptionId);

GetSubscriptionEndOfStream GetEndOfStream(IServiceProvider sp) {
var subscription = GetBuilder(sp).ResolveSubscription(sp) as IMeasuredSubscription;

return subscription!.GetMeasure();
}
}
Expand All @@ -59,16 +59,13 @@ public static class SubscriptionRegistrationExtensions {
/// <param name="tags">Health check tags list</param>
/// <returns></returns>
public static IHealthChecksBuilder AddSubscriptionsHealthCheck(
this IHealthChecksBuilder builder,
string checkName,
HealthStatus? failureStatus,
string[] tags
) {
this IHealthChecksBuilder builder,
string checkName,
HealthStatus? failureStatus,
string[] tags
) {
builder.Services.TryAddSingleton<SubscriptionHealthCheck>();

builder.Services.TryAddSingleton<ISubscriptionHealth>(
sp => sp.GetRequiredService<SubscriptionHealthCheck>()
);
builder.Services.TryAddSingleton<ISubscriptionHealth>(sp => sp.GetRequiredService<SubscriptionHealthCheck>());

return builder.AddCheck<SubscriptionHealthCheck>(checkName, failureStatus, tags);
}
Expand All @@ -78,9 +75,7 @@ public static IServiceCollection AddCheckpointStore<T>(this IServiceCollection s
services.AddSingleton<T>();

return EventuousDiagnostics.Enabled
? services.AddSingleton<ICheckpointStore>(
sp => new MeasuredCheckpointStore(sp.GetRequiredService<T>())
)
? services.AddSingleton<ICheckpointStore>(sp => new MeasuredCheckpointStore(sp.GetRequiredService<T>()))
: services.AddSingleton<ICheckpointStore>(sp => sp.GetRequiredService<T>());
}

Expand All @@ -89,9 +84,7 @@ public static IServiceCollection AddCheckpointStore<T>(this IServiceCollection s
services.AddSingleton(getStore);

return EventuousDiagnostics.Enabled
? services.AddSingleton<ICheckpointStore>(
sp => new MeasuredCheckpointStore(sp.GetRequiredService<T>())
)
? services.AddSingleton<ICheckpointStore>(sp => new MeasuredCheckpointStore(sp.GetRequiredService<T>()))
: services.AddSingleton<ICheckpointStore>(sp => sp.GetRequiredService<T>());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<IncludeSutSubs>true</IncludeSutSubs>
<IncludeSutApp>true</IncludeSutApp>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NodaTime.Serialization.SystemTextJson" />
<PackageReference Include="MicroElements.AutoFixture.NodaTime" />
<PackageReference Include="Testcontainers" />
</ItemGroup>
</Project>
Loading

0 comments on commit bd59c90

Please sign in to comment.