Skip to content

Commit

Permalink
Fixes #224
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev committed Nov 22, 2023
1 parent 5a110ff commit 87dfc16
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the Apache License, Version 2.0.

using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc.Formatters;
using static Microsoft.AspNetCore.Http.StatusCodes;

namespace Eventuous.AspNetCore.Web;
Expand All @@ -11,70 +10,53 @@ public static class ResultExtensions {
public static IResult AsResult(this Result result) {
return result is ErrorResult error
? error.Exception switch {
OptimisticConcurrencyException => AsProblemDetails(Status409Conflict),
AggregateNotFoundException => AsProblemDetails(Status404NotFound),
DomainException => AsValidationProblemDetails(Status400BadRequest),
_ => AsProblemDetails(Status500InternalServerError)
OptimisticConcurrencyException => AsProblem(Status409Conflict),
AggregateNotFoundException => AsProblem(Status404NotFound),
DomainException => AsValidationProblem(Status400BadRequest),
_ => AsProblem(Status500InternalServerError)
}
: Results.Ok(result);

IResult AsProblemDetails(int statusCode)
=> Results.Problem(
new ProblemDetails {
Status = statusCode,
Title = error.ErrorMessage,
Detail = error.Exception?.ToString(),
Type = error.Exception?.GetType().Name
}
);
IResult AsProblem(int statusCode) => Results.Problem(PopulateDetails(new ProblemDetails(), error, statusCode));

IResult AsValidationProblemDetails(int statusCode)
=> Results.ValidationProblem(
errors: error.AsErrors(),
statusCode: statusCode,
title: error.ErrorMessage,
detail: error.Exception?.ToString(),
type: error.Exception?.GetType().Name
);
IResult AsValidationProblem(int statusCode) => Results.Problem(PopulateDetails(new ValidationProblemDetails(error.AsErrors()), error, statusCode));
}

public static ActionResult AsActionResult(this Result result) {
return result is ErrorResult error
? error.Exception switch {
OptimisticConcurrencyException => AsProblemResult(Status409Conflict),
AggregateNotFoundException => AsProblemResult(Status404NotFound),
DomainException => AsValidationProblemResult(Status400BadRequest),
_ => AsProblemResult(Status500InternalServerError)
OptimisticConcurrencyException => AsProblem(Status409Conflict),
AggregateNotFoundException => AsProblem(Status404NotFound),
DomainException => AsValidationProblem(Status400BadRequest),
_ => AsProblem(Status500InternalServerError)
}
: new OkObjectResult(result);

ActionResult AsProblemResult(int statusCode)
=> new ObjectResult(
new ProblemDetails {
Status = statusCode,
Title = error.ErrorMessage,
Detail = error.Exception?.ToString(),
Type = error.Exception?.GetType().Name
}
) {
StatusCode = Status400BadRequest,
ContentTypes = new MediaTypeCollection { ContentTypes.ProblemDetails },
};
ActionResult AsProblem(int statusCode) => new ObjectResult(CreateResult(new ProblemDetails(), statusCode));

ActionResult AsValidationProblem(int statusCode) => CreateResult(new ValidationProblemDetails(error.AsErrors()), statusCode);

ActionResult CreateResult<T>(T details, int statusCode) where T : ProblemDetails {
details.Status = statusCode;
details.Title = error.ErrorMessage;
details.Detail = error.Exception?.ToString();
details.Type = error.Exception?.GetType().Name;

ActionResult AsValidationProblemResult(int statusCode)
=> new ObjectResult(
new ValidationProblemDetails(error.AsErrors()) {
Status = statusCode,
Title = error.ErrorMessage,
Detail = error.Exception?.ToString(),
Type = error.Exception?.GetType().Name
}
) {
return new ObjectResult(details) {
StatusCode = Status400BadRequest,
ContentTypes = new MediaTypeCollection { ContentTypes.ProblemDetails },
ContentTypes = [ContentTypes.ProblemDetails],
};
}
}

static T PopulateDetails<T>(T details, ErrorResult error, int statusCode) where T : ProblemDetails {
details.Status = statusCode;
details.Title = error.ErrorMessage;
details.Detail = error.Exception?.ToString();
details.Type = error.Exception?.GetType().Name;

return details;
}

public static IDictionary<string, string[]> AsErrors(this ErrorResult error)
=> new Dictionary<string, string[]> { ["Domain"] = new[] { error.ErrorMessage } };
static IDictionary<string, string[]> AsErrors(this ErrorResult error) => new Dictionary<string, string[]> { ["Domain"] = [error.ErrorMessage] };
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,43 +32,74 @@ public abstract class SqlSubscriptionBase<TOptions, TConnection>(

protected virtual bool IsStopping(Exception exception) => exception is OperationCanceledException;

// ReSharper disable once CognitiveComplexity
async Task PollingQuery(ulong? position, CancellationToken cancellationToken) {
var start = position.HasValue ? (long)position : -1;

var retryDelay = 10;
var retryCount = 0;
var currentDelay = Options.Polling.MinIntervalMs;

while (!cancellationToken.IsCancellationRequested) {
var result = await Poll().NoContext();

if (!result.Continue) break;

if (result.Retry) {
await Task.Delay(Options.Retry.InitialDelayMs * retryCount++, cancellationToken).NoContext();

continue;
}

retryCount = 0;

// Poll again immediately if we received events
if (result.ReceivedEvents > 0) {
currentDelay = Options.Polling.MinIntervalMs;

continue;
}

// Otherwise, wait a bit
// Exponentially increase delay but do not exceed maxDelay
currentDelay = Math.Min((int)(currentDelay * Options.Polling.GrowFactor), Options.Polling.MaxIntervalMs);
await Task.Delay(currentDelay, cancellationToken).NoContext();
}

Log.InfoLog?.Log("Polling query stopped");

return;

async Task<PollingResult> Poll() {
try {
await using var connection = await OpenConnection(cancellationToken).NoContext();
await using var cmd = PrepareCommand(connection, start);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).NoContext();

var result = reader.ReadEvents(cancellationToken);

var received = 0;

await foreach (var persistedEvent in result.NoContext(cancellationToken)) {
await HandleInternal(ToConsumeContext(persistedEvent, cancellationToken)).NoContext();
start = MoveStart(persistedEvent);
received++;
}

retryDelay = 10;
return new PollingResult(true, false, received);
} catch (Exception e) {
if (IsStopping(e)) {
IsDropped = true;
Log.InfoLog?.Log("Polling query stopped");

return;
return new PollingResult(false, false, 0);
}

if (IsTransient(e)) {
await Task.Delay(retryDelay, cancellationToken);
retryDelay *= 2;
return new PollingResult(true, true, 0);
}
else {
Log.InfoLog?.Log("Polling query stopped");
Dropped(DropReason.ServerError, e);

break;
}
Dropped(DropReason.ServerError, e);

return new PollingResult(false, false, 0);
}
}
}
Expand All @@ -92,12 +123,11 @@ public abstract class SqlSubscriptionBase<TOptions, TConnection>(
}
}

protected virtual Task BeforeSubscribe(CancellationToken cancellationToken)
=> Task.CompletedTask;
protected virtual Task BeforeSubscribe(CancellationToken cancellationToken) => Task.CompletedTask;

protected abstract long MoveStart(PersistedEvent evt);

protected IMessageConsumeContext ToConsumeContext(PersistedEvent evt, CancellationToken cancellationToken) {
IMessageConsumeContext ToConsumeContext(PersistedEvent evt, CancellationToken cancellationToken) {
Logger.Current = Log;

var data = DeserializeData(ContentType, evt.MessageType, Encoding.UTF8.GetBytes(evt.JsonData), evt.StreamName!, (ulong)evt.StreamPosition);
Expand All @@ -114,4 +144,6 @@ protected virtual Task BeforeSubscribe(CancellationToken cancellationToken)
Task? _runner;

protected const string ContentType = "application/json";

record struct PollingResult(bool Continue, bool Retry, int ReceivedEvents);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,22 @@

using Eventuous.Subscriptions;

namespace Eventuous.Sql.Base.Subscriptions;
namespace Eventuous.Sql.Base.Subscriptions;

public abstract record class SqlSubscriptionOptionsBase : SubscriptionWithCheckpointOptions {
public string Schema { get; set; } = "eventuous";
public int ConcurrencyLimit { get; set; } = 1;
public int MaxPageSize { get; set; } = 1024;
public abstract record SqlSubscriptionOptionsBase : SubscriptionWithCheckpointOptions {
public string Schema { get; set; } = "eventuous";
public int ConcurrencyLimit { get; set; } = 1;
public int MaxPageSize { get; set; } = 1024;
public PollingOptions Polling { get; set; } = new();
public RetryOptions Retry { get; set; } = new();

public record PollingOptions {
public int MinIntervalMs { get; set; } = 5;
public int MaxIntervalMs { get; set; } = 1000;
public double GrowFactor { get; set; } = 1.5;
}

public record RetryOptions {
public int InitialDelayMs { get; set; } = 50;
}
}

0 comments on commit 87dfc16

Please sign in to comment.