Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added AsyncDaemonHealthCheck #2679

Merged
merged 9 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/.vitepress/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ export default withMermaid({
{ text: 'Custom Projections', link: '/events/projections/custom' },
{ text: 'Inline Projections', link: '/events/projections/inline' },
{ text: 'Asynchronous Projections', link: '/events/projections/async-daemon' },
{ text: 'Rebuilding Projections', link: '/events/projections/rebuilding' },]
{ text: 'Rebuilding Projections', link: '/events/projections/rebuilding' },
{ text: 'Async Daemon HealthChecks', link: '/events/projections/healthchecks' },]
},

{
Expand Down
4 changes: 3 additions & 1 deletion docs/cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
"upcasting",
"upcaster",
"Upcasters",
"helpdesk"
"helpdesk",
"HealthCheck",
"HealthChecks"
],
"ignoreWords": [
"JSONB",
Expand Down
19 changes: 19 additions & 0 deletions docs/events/projections/healthchecks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Async Daemon HealthChecks
daveHylde marked this conversation as resolved.
Show resolved Hide resolved

Marten supports a customizable [HealthChecks](https://learn.microsoft.com/en-us/aspnet/core/host-and-deploy/health-checks?view=aspnetcore-7.0). This can be useful when running the async daemon in a containerized environment such as Kubernetes. The check will verify that no projection's progression lags more than `maxEventLag` behind the `HighWaterMark`. The default `maxEventLag` is 100. Read more about events progression tracking and `HighWaterMark` in [Async Daemon documentation](/events/projections/async-daemon).

The `maxEventLag` setting controls how far behind the `HighWaterMark` any async projection is allowed to lag before it's considered unhealthy. E.g. if the `HighWaterMark` is 1000 and an a system with 3 async projections `ProjA`, `ProjB` and `ProjC` are processed respectively to sequence number 899, 901 and 901 then the system will be considered unhealthy with a `maxEventLag` of 100 (1000 - 899 = 101), BUT healthy with a `mavEventLag` of 101 or higher.

::: tip INFO
The healthcheck will only be checked against `Async` projections
:::

## Example configuration:

```cs
// Add HealthCheck
Services.AddHealthChecks().AddMartenAsyncDaemonHealthCheck(maxEventLag: 500);

// Map HealthCheck Endpoint
app.MapHealthChecks("/health");
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
using System;
using System.Threading.Tasks;
using JasperFx.Core;
using Marten.AsyncDaemon.Testing.TestingSupport;
using Marten.Events.Daemon;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Npgsql;
using Shouldly;
using Xunit;
using Xunit.Abstractions;
using static Marten.Events.Daemon.AsyncDaemonHealthCheckExtensions;

namespace Marten.AsyncDaemon.Testing;


public class AsyncDaemonHealthCheckExtensionsTests: DaemonContext
{
private FakeHealthCheckBuilderStub builder = new();

public AsyncDaemonHealthCheckExtensionsTests(ITestOutputHelper output) : base(output)
{
_output = output;
}

[Fact]
public void should_add_settings_to_services()
{
builder = new();
builder.Services.ShouldNotContain(x => x.ServiceType == typeof(AsyncDaemonHealthCheckSettings));

builder.AddMartenAsyncDaemonHealthCheck(200);

builder.Services.ShouldContain(x => x.ServiceType == typeof(AsyncDaemonHealthCheckSettings));
}

[Fact]
public void should_add_healthcheck_to_services()
{
builder = new();

builder.AddMartenAsyncDaemonHealthCheck();

var services = builder.Services.BuildServiceProvider();
var healthCheckRegistrations = services.GetServices<HealthCheckRegistration>();
healthCheckRegistrations.ShouldContain(reg => reg.Name == nameof(AsyncDaemonHealthCheck));
}

[Fact]
public async Task should_be_healty_with_one_projection_no_relevant_events()
{
StoreOptions(x =>
{
x.Projections.Add(new FakeSingleStream1Projection(), ProjectionLifecycle.Async);
});
var agent = await StartDaemon();
using var session = theStore.LightweightSession();
session.Events.Append(Guid.NewGuid(), new FakeIrrellevantEvent());
await session.SaveChangesAsync();
await agent.Tracker.WaitForHighWaterMark(1);
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(100));

var result = await healthCheck.CheckHealthAsync(new());

result.Status.ShouldBe(HealthStatus.Healthy);
}

[Fact]
public async Task should_be_unhealty_with_no_projection_lag_allowed()
{
StoreOptions(x =>
{
x.Projections.Add(new FakeSingleStream2Projection(), ProjectionLifecycle.Async);
});
var agent = await StartDaemon();
using var session = theStore.LightweightSession();
var stream = Guid.NewGuid();
var eventCount = 100;
for (var i = 0; i < eventCount; i++)
session.Events.Append(stream, new FakeEvent());
await session.SaveChangesAsync();
await agent.Tracker.WaitForHighWaterMark(eventCount);
await agent.Tracker.WaitForShardState(new ShardState("FakeStream2:All", eventCount), 15.Seconds());
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(0));

var result = await healthCheck.CheckHealthAsync(new());

result.Status.ShouldBe(HealthStatus.Unhealthy);
}


[Fact]
public async Task should_be_healty_with_all_projections_caught_up()
{

StoreOptions(x =>
{
x.Projections.Add(new FakeSingleStream3Projection(), ProjectionLifecycle.Async);
x.Projections.Add(new FakeSingleStream4Projection(), ProjectionLifecycle.Async);
});
var agent = await StartDaemon();
using var session = theStore.LightweightSession();
var stream1 = Guid.NewGuid();
var stream2 = Guid.NewGuid();
var eventCount = 100;
for (var i = 0; i < eventCount; i++)
{
session.Events.Append(stream1, new FakeEvent());
session.Events.Append(stream2, new FakeEvent());
}
await session.SaveChangesAsync();
await agent.Tracker.WaitForShardState(new ShardState("FakeStream3:All", eventCount), 15.Seconds());
await agent.Tracker.WaitForShardState(new ShardState("FakeStream4:All", eventCount), 15.Seconds());
await agent.Tracker.WaitForHighWaterMark(eventCount);
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(1));

var result = await healthCheck.CheckHealthAsync(new());

result.Status.ShouldBe(HealthStatus.Healthy);
}

[Fact]
public async Task should_be_unhealty_with_one_projection_lagging()
{
StoreOptions(x =>
{
x.Projections.Add(new FakeSingleStream5Projection(), ProjectionLifecycle.Async);
x.Projections.Add(new FakeSingleStream6Projection(), ProjectionLifecycle.Async);
});
var agent = await StartDaemon();
using var session = theStore.LightweightSession();
var stream1 = Guid.NewGuid();
var stream2 = Guid.NewGuid();
var eventCount = 500;
for (var i = 0; i < eventCount; i++)
{
session.Events.Append(stream1, new FakeEvent());
session.Events.Append(stream2, new FakeEvent());
}
await session.SaveChangesAsync();
await agent.Tracker.WaitForShardState(new ShardState("FakeStream5:All", eventCount), 15.Seconds());
await agent.Tracker.WaitForShardState(new ShardState("FakeStream6:All", eventCount), 15.Seconds());
await agent.Tracker.WaitForHighWaterMark(eventCount);
using var treeCommand = new NpgsqlCommand($"update {theStore.Events.DatabaseSchemaName}.mt_event_progression set last_seq_id = 0 where name = 'FakeStream6:All'", theSession.Connection);
await treeCommand.ExecuteScalarAsync();

var healthCheck = new AsyncDaemonHealthCheck(theStore, new(1));

var result = await healthCheck.CheckHealthAsync(new());

result.Status.ShouldBe(HealthStatus.Unhealthy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using Marten.Events.Aggregation;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;

namespace Marten.AsyncDaemon.Testing;

internal class FakeHealthCheckBuilderStub : IHealthChecksBuilder
{
public IServiceCollection Services { get; set; } = new ServiceCollection();

public IHealthChecksBuilder Add(HealthCheckRegistration registration)
{
Services.AddSingleton(registration);
return this;
}
}

public record FakeIrrellevantEvent();
public record FakeEvent();
public class FakeStream1 { public Guid Id { get; set; } }

public class FakeSingleStream1Projection : SingleStreamProjection<FakeStream1>
{
public void Apply(FakeEvent @event, FakeStream1 projection) { }
}

public class FakeStream2 { public Guid Id { get; set; } }
public class FakeSingleStream2Projection : SingleStreamProjection<FakeStream2>
{
public void Apply(FakeEvent @event, FakeStream2 projection) { }
}

public class FakeStream3 { public Guid Id { get; set; } }
public class FakeSingleStream3Projection : SingleStreamProjection<FakeStream3>
{
public void Apply(FakeEvent @event, FakeStream3 projection) { }
}

public class FakeStream4 { public Guid Id { get; set; } }
public class FakeSingleStream4Projection : SingleStreamProjection<FakeStream4>
{
public void Apply(FakeEvent @event, FakeStream4 projection) { }
}

public class FakeStream5 { public Guid Id { get; set; } }
public class FakeSingleStream5Projection : SingleStreamProjection<FakeStream5>
{
public void Apply(FakeEvent @event, FakeStream5 projection) { }
}

public class FakeStream6 { public Guid Id { get; set; } }
public class FakeSingleStream6Projection : SingleStreamProjection<FakeStream6>
{
public void Apply(FakeEvent @event, FakeStream6 projection) { }
}
95 changes: 95 additions & 0 deletions src/Marten/Events/Daemon/AsyncDaemonHealthCheckExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#nullable enable
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core;
using Marten.Events.Projections;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;

namespace Marten.Events.Daemon;

public static class AsyncDaemonHealthCheckExtensions
{
/// <summary>
daveHylde marked this conversation as resolved.
Show resolved Hide resolved
/// Adds a health check for Martens Async Daemon.
/// The health check will verify that no async projection progression is lagging behind more than the <paramref name="maxEventLag"/>
/// The check will return <see cref="HealthCheckResult.Unhealthy"/> if any progression is more than <paramref name="maxEventLag"/> behind the highWaterMark OR if any exception is thrown while doing the check.
/// <example>
/// <code>
/// Customized Injection Example: services.AddHealthChecks().AddAsyncDaemonHealthCheck(150);
/// </code>
/// </example>
/// Also - remember to add <c>app.MapHealthChecks("/your-health-path")</c> to the middleware pipeline
/// </summary>
/// <param name="builder"><see cref="IHealthChecksBuilder"/></param>
/// <param name="maxEventLag">(optional) Acceptable lag of an eventprojection before it's considered unhealthy - defaults to 100</param>
/// <returns>If healthy: <see cref="HealthCheckResult.Healthy"/> - else <see cref="HealthCheckResult.Unhealthy"/></returns>
public static IHealthChecksBuilder AddMartenAsyncDaemonHealthCheck(this IHealthChecksBuilder builder, int maxEventLag = 100)
{
builder.Services.AddSingleton(new AsyncDaemonHealthCheckSettings(maxEventLag));
return builder.AddCheck<AsyncDaemonHealthCheck>(nameof(AsyncDaemonHealthCheck), tags: new[] { "Marten", "AsyncDaemon" });
}

/// <summary>
/// Internal class used to DI settings to async daemon health check
/// </summary>
/// <param name="MaxEventLag"></param>
/// <returns></returns>
internal record AsyncDaemonHealthCheckSettings(int MaxEventLag);

/// <summary>
/// Health check implementation
/// </summary>
internal class AsyncDaemonHealthCheck: IHealthCheck
{
/// <summary>
/// The <see cref="DocumentStore"/> to check health for.
/// </summary>
private IDocumentStore _store;

/// <summary>
/// The allowed event projection processing lag compared to the HighWaterMark.
/// </summary>
private int _maxEventLag;

internal AsyncDaemonHealthCheck(IDocumentStore store, AsyncDaemonHealthCheckSettings settings)
{
_store = store;
_maxEventLag = settings.MaxEventLag;
}
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
CancellationToken cancellationToken = default)
{
try
{
var projectionsToCheck = _store.Options
.Events
.Projections()
.Where(x => x.Lifecycle == ProjectionLifecycle.Async) // Only check async projections to avoid issus where inline progression counter is set.
.Select(x => $"{x.ProjectionName}:All")
.ToHashSet();

var allProgress = await _store.Advanced.AllProjectionProgress(token: cancellationToken).ConfigureAwait(true);

var highWaterMark = allProgress.First(x => string.Equals("HighWaterMark", x.ShardName));
var projectionMarks = allProgress.Where(x => !string.Equals("HighWaterMark", x.ShardName));

var unhealthy = projectionMarks
.Where(x => projectionsToCheck.Contains(x.ShardName))
.Where(x => x.Sequence <= highWaterMark.Sequence - _maxEventLag)
.Select(x => x.ShardName)
.ToArray();

return unhealthy.Any()
? HealthCheckResult.Unhealthy($"Unhealthy: Async projection sequence is more than {_maxEventLag} events behind for projection(s): {unhealthy.Join(", ")}")
: HealthCheckResult.Healthy("Healthy");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy($"Unhealthy: {ex.Message}", ex);
}
}
}
}
1 change: 1 addition & 0 deletions src/Marten/Marten.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="7.0.0"/>
<PackageReference Include="Weasel.Postgresql" Version="6.1.0"/>
<PackageReference Include="System.Text.Json" Version="7.0.3"/>
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="7.0.0" />
</ItemGroup>

<!--SourceLink specific settings-->
Expand Down
Loading