diff --git a/.editorconfig b/.editorconfig index 5dd2ec191..3d2e76338 100644 --- a/.editorconfig +++ b/.editorconfig @@ -19,7 +19,7 @@ indent_size = 2 # we're putting the :severity options on everything where it can be put as it's bugged on net5 # check https://github.com/dotnet/roslyn/issues/50785#issuecomment-768606882 # default to all on warning -dotnet_analyzer_diagnostic.severity = warning +# dotnet_analyzer_diagnostic.severity = warning file_header_template = Copyright (c) Dolittle. All rights reserved.\nLicensed under the MIT license. See LICENSE file in the project root for full license information. #### .NET Conventions #### diff --git a/Integration/Shared/EventStoreExtensions.cs b/Integration/Shared/EventStoreExtensions.cs index c02c1ebb3..893a9b459 100644 --- a/Integration/Shared/EventStoreExtensions.cs +++ b/Integration/Shared/EventStoreExtensions.cs @@ -54,7 +54,9 @@ public static FetchForAggregateResponse Combine(this FetchForAggregateResponse[] AggregateRootId = responses.First().Events.AggregateRootId, EventSourceId = responses.First().Events.EventSourceId, CurrentAggregateRootVersion = responses.First().Events.CurrentAggregateRootVersion, +#pragma warning disable CS0612 // Type or member is obsolete AggregateRootVersion = responses.First().Events.AggregateRootVersion +#pragma warning restore CS0612 // Type or member is obsolete } }; diff --git a/Integration/Shared/Portable.cs b/Integration/Shared/Portable.cs new file mode 100644 index 000000000..36d0b1220 --- /dev/null +++ b/Integration/Shared/Portable.cs @@ -0,0 +1,36 @@ +// Copyright (c) Dolittle. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Net; +using System.Net.Sockets; + +namespace Integration.Shared; + +public static class Portable +{ + public static (int,int,int) FindFreePorts() + { + var listener = new TcpListener(IPAddress.Loopback, 0); + var listener2 = new TcpListener(IPAddress.Loopback, 0); + var listener3 = new TcpListener(IPAddress.Loopback, 0); + try + { + // Start the listener to obtain the port. + listener.Start(); + listener2.Start(); + listener3.Start(); + // Get the port number assigned by the system. + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + var port2 = ((IPEndPoint)listener2.LocalEndpoint).Port; + var port3 = ((IPEndPoint)listener3.LocalEndpoint).Port; + return (port, port2, port3); + } + finally + { + // Stop the listener. + listener.Stop(); + listener2.Stop(); + listener3.Stop(); + } + } +} diff --git a/Integration/Shared/Runtime.cs b/Integration/Shared/Runtime.cs index 6b07415cf..d939f6638 100644 --- a/Integration/Shared/Runtime.cs +++ b/Integration/Shared/Runtime.cs @@ -14,6 +14,7 @@ using Dolittle.Runtime.Domain.Platform; using Dolittle.Runtime.Domain.Tenancy; using Dolittle.Runtime.Execution; +using Dolittle.Runtime.Metrics.Configuration; using Dolittle.Runtime.Metrics.Hosting; using Dolittle.Runtime.Server.Web; using Dolittle.Runtime.Services; @@ -52,6 +53,8 @@ public static RunningRuntime CreateAndStart(int numberOfTenants) var configuration = new Dictionary(); var (databases, tenants) = CreateRuntimeConfiguration(configuration, numberOfTenants); + var (manPort, freePort, pubPort) = Portable.FindFreePorts(); + var runtimeHost = Host.CreateDefaultBuilder() .UseDolittleServices() .ConfigureOpenTelemetry(cfg) @@ -70,9 +73,13 @@ public static RunningRuntime CreateAndStart(int numberOfTenants) coll.AddLogging(builder => builder.ClearProviders()); coll.AddOptions().Configure(builder => { - builder.Management = new EndpointConfiguration { Port = 0 }; - // builder.Private = new EndpointConfiguration { Port = 0 }; - builder.Public = new EndpointConfiguration { Port = 0 }; + builder.Management = new EndpointConfiguration { Port = manPort }; + builder.Private = new EndpointConfiguration { Port = freePort }; + builder.Public = new EndpointConfiguration { Port = pubPort }; + }); + coll.AddOptions().Configure(builder => + { + builder.Port = 0; }); }) .AddActorSystem() diff --git a/Integration/Tests/Events.Processing/EventHandlers/given/single_tenant_and_event_handlers.cs b/Integration/Tests/Events.Processing/EventHandlers/given/single_tenant_and_event_handlers.cs index 7405db627..7838ca0a5 100644 --- a/Integration/Tests/Events.Processing/EventHandlers/given/single_tenant_and_event_handlers.cs +++ b/Integration/Tests/Events.Processing/EventHandlers/given/single_tenant_and_event_handlers.cs @@ -124,7 +124,10 @@ protected static IEnumerable get_partitioned_events_in_stream(IEven while (!cts.IsCancellationRequested) { var evt = Task.Run(async () => await reader.ReadAsync(CancellationToken.None), cts.Token).GetAwaiter().GetResult(); - events.Add(evt); + if (evt.IsEvent) + { + events.Add(evt.StreamEvent); + } } } catch (Exception) diff --git a/Source/Actors/Hosting/ActorSystemClusterHostedService.cs b/Source/Actors/Hosting/ActorSystemClusterHostedService.cs index 451bd31f0..43b6438cc 100644 --- a/Source/Actors/Hosting/ActorSystemClusterHostedService.cs +++ b/Source/Actors/Hosting/ActorSystemClusterHostedService.cs @@ -40,10 +40,11 @@ public ActorSystemClusterHostedService(ActorSystem actorSystem, IStreamProcessor } /// - public Task StartAsync(CancellationToken cancellationToken) + public async Task StartAsync(CancellationToken cancellationToken) { Log.SetLoggerFactory(_loggerFactory); - return _actorSystem.Cluster().StartMemberAsync(); + await _actorSystem.Cluster().StartMemberAsync(); + _logger.LogInformation("Actor system started"); } /// diff --git a/Source/CLI/Options/Parsers/Aggregates/AggregateIdOrAliasParser.cs b/Source/CLI/Options/Parsers/Aggregates/AggregateIdOrAliasParser.cs index 6f8a8da6a..6239314f1 100644 --- a/Source/CLI/Options/Parsers/Aggregates/AggregateIdOrAliasParser.cs +++ b/Source/CLI/Options/Parsers/Aggregates/AggregateIdOrAliasParser.cs @@ -22,4 +22,4 @@ public object Parse(string argName, string value, CultureInfo culture) => Guid.TryParse(value, out var aggregateRootId) ? new AggregateRootIdOrAlias(aggregateRootId) : new AggregateRootIdOrAlias(new AggregateRootAlias(value)); -} \ No newline at end of file +} diff --git a/Source/CLI/Options/Parsers/EventHandlers/EventHandlerIdOrAliasParser.cs b/Source/CLI/Options/Parsers/EventHandlers/EventHandlerIdOrAliasParser.cs index 520a393c3..36556c35c 100644 --- a/Source/CLI/Options/Parsers/EventHandlers/EventHandlerIdOrAliasParser.cs +++ b/Source/CLI/Options/Parsers/EventHandlers/EventHandlerIdOrAliasParser.cs @@ -20,10 +20,10 @@ public class EventHandlerIdOrAliasParser : IValueParser public Type TargetType => typeof(EventHandlerIdOrAlias); /// - public object Parse(string argName, string value, CultureInfo culture) + public object Parse(string? argName, string? value, CultureInfo culture) { var scope = ScopeId.Default; - var segments = value.Split(":"); + var segments = value?.Split(":") ?? Array.Empty(); ThrowIfInvalidFormat(value, segments); if (segments.Length > 1) { @@ -35,7 +35,7 @@ public object Parse(string argName, string value, CultureInfo culture) : new EventHandlerIdOrAlias(segments[0], scope); } - static void ThrowIfInvalidFormat(string value, string[] segments) + static void ThrowIfInvalidFormat(string? value, string[] segments) { switch (segments.Length) { @@ -45,4 +45,4 @@ static void ThrowIfInvalidFormat(string value, string[] segments) throw new InvalidEventHandlerIdOrAlias(value); } } -} \ No newline at end of file +} diff --git a/Source/CLI/Options/Parsers/Versioning/VersionParser.cs b/Source/CLI/Options/Parsers/Versioning/VersionParser.cs index 9b1f30efd..bf288049a 100644 --- a/Source/CLI/Options/Parsers/Versioning/VersionParser.cs +++ b/Source/CLI/Options/Parsers/Versioning/VersionParser.cs @@ -29,6 +29,13 @@ public VersionParser(IVersionConverter converter) public Type TargetType => typeof(Version); /// - public object Parse(string argName, string value, CultureInfo culture) - => _converter.FromString(value); + public object Parse(string? argName, string? value, CultureInfo culture) + { + if (value == null) + { + throw new InvalidVersionString(""); + } + + return _converter.FromString(value); + } } diff --git a/Source/CLI/Program.cs b/Source/CLI/Program.cs index 7ba42d408..397a86258 100644 --- a/Source/CLI/Program.cs +++ b/Source/CLI/Program.cs @@ -16,7 +16,7 @@ namespace Dolittle.Runtime.CLI; /// [Command("dolittle", Description = "The Dolittle CLI tool")] [Subcommand(typeof(Runtime.Command))] -class Program +sealed class Program { static int Main(string[] args) { diff --git a/Source/CLI/Runtime/EventHandlers/IManagementClient.cs b/Source/CLI/Runtime/EventHandlers/IManagementClient.cs index 1ba4f9dfa..7b3cc6be1 100644 --- a/Source/CLI/Runtime/EventHandlers/IManagementClient.cs +++ b/Source/CLI/Runtime/EventHandlers/IManagementClient.cs @@ -49,6 +49,6 @@ public interface IManagementClient /// The Event Handler identifier. /// The Tenant to get Stream Processor states for, or null to get all. /// A that, when resolved, returns the containing the - - Task> Get(MicroserviceAddress runtime, EventHandlerId eventHandler, TenantId tenant = null); + Task> Get(MicroserviceAddress runtime, EventHandlerId eventHandler, TenantId? tenant); } diff --git a/Source/CLI/Runtime/EventHandlers/ManagementClient.cs b/Source/CLI/Runtime/EventHandlers/ManagementClient.cs index 3ee8dbefe..a77bb8404 100644 --- a/Source/CLI/Runtime/EventHandlers/ManagementClient.cs +++ b/Source/CLI/Runtime/EventHandlers/ManagementClient.cs @@ -93,7 +93,7 @@ public async Task> GetAll(MicroserviceAddress ru } /// - public async Task> Get(MicroserviceAddress runtime, EventHandlerId eventHandler, TenantId tenant = null) + public async Task> Get(MicroserviceAddress runtime, EventHandlerId eventHandler, TenantId? tenant = null) { var client = _clients.CreateClientFor(runtime); var request = new GetOneRequest diff --git a/Source/CLI/Runtime/EventHandlers/Replay/FromCommand.cs b/Source/CLI/Runtime/EventHandlers/Replay/FromCommand.cs index 4273dc480..507dc9e87 100644 --- a/Source/CLI/Runtime/EventHandlers/Replay/FromCommand.cs +++ b/Source/CLI/Runtime/EventHandlers/Replay/FromCommand.cs @@ -36,7 +36,7 @@ public FromCommand(ICanLocateRuntimes runtimes, IManagementClient client, IResol StreamPosition Position { get; init; } [Option("--tenant", CommandOptionType.SingleValue, Description = "The tenant to replay events for. Defaults to the development tenant.")] - TenantId Tenant { get; init; } + TenantId? Tenant { get; init; } /// /// The entrypoint for the "dolittle runtime eventhandlers replay from" command. diff --git a/Source/Diagnostics/OpenTelemetry/Metrics/RuntimeMetrics.cs b/Source/Diagnostics/OpenTelemetry/Metrics/RuntimeMetrics.cs new file mode 100644 index 000000000..6e9ad28c6 --- /dev/null +++ b/Source/Diagnostics/OpenTelemetry/Metrics/RuntimeMetrics.cs @@ -0,0 +1,12 @@ +// Copyright (c) Dolittle. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Diagnostics.Metrics; + +namespace Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics; + +public class RuntimeMetrics +{ + public const string SourceName = "Dolittle.Runtime"; + public static readonly Meter Meter = new(SourceName); +} diff --git a/Source/Diagnostics/OpenTelemetry/OpenTelemetryConfiguration.cs b/Source/Diagnostics/OpenTelemetry/OpenTelemetryConfiguration.cs index 0252f0e87..827ba9b55 100644 --- a/Source/Diagnostics/OpenTelemetry/OpenTelemetryConfiguration.cs +++ b/Source/Diagnostics/OpenTelemetry/OpenTelemetryConfiguration.cs @@ -17,4 +17,5 @@ public class OpenTelemetryConfiguration public bool Logging { get; set; } = true; public bool Tracing { get; set; } = true; + public bool Metrics { get; set; } = true; } diff --git a/Source/Diagnostics/OpenTelemetry/OpenTelemetryConfigurationExtensions.cs b/Source/Diagnostics/OpenTelemetry/OpenTelemetryConfigurationExtensions.cs index d3f8d343f..08566fa0b 100644 --- a/Source/Diagnostics/OpenTelemetry/OpenTelemetryConfigurationExtensions.cs +++ b/Source/Diagnostics/OpenTelemetry/OpenTelemetryConfigurationExtensions.cs @@ -2,11 +2,13 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using OpenTelemetry.Logs; +using OpenTelemetry.Metrics; using OpenTelemetry.Resources; using OpenTelemetry.Trace; using Proto.OpenTelemetry; @@ -26,7 +28,8 @@ public static IHostBuilder ConfigureOpenTelemetry(this IHostBuilder builder, ICo if (!Uri.TryCreate(configuration.Endpoint, UriKind.RelativeOrAbsolute, out var otlpEndpoint)) { - var logger = LoggerFactory.Create(opt => opt.AddConfiguration(cfg)).CreateLogger(typeof(OpenTelemetryConfigurationExtensions)); + var logger = LoggerFactory.Create(opt => opt.AddConfiguration(cfg)) + .CreateLogger(typeof(OpenTelemetryConfigurationExtensions)); #pragma warning disable CA1848 logger.LogWarning("Unable to parse otlp endpoint {Input}", configuration.Endpoint); #pragma warning restore CA1848 @@ -50,6 +53,11 @@ public static IHostBuilder ConfigureOpenTelemetry(this IHostBuilder builder, ICo builder.AddOpenTelemetryTracing(resourceBuilder, otlpEndpoint); } + if (configuration.Metrics) + { + builder.AddOpenTelemetryMetrics(resourceBuilder, otlpEndpoint); + } + return builder; } @@ -74,9 +82,9 @@ static void AddOpenTelemetryTracing(this IHostBuilder builder, ResourceBuilder r { builder.ConfigureServices(services => services.AddOpenTelemetry() - .WithTracing(_ => + .WithTracing(providerBuilder => { - _.SetResourceBuilder(resourceBuilder) + providerBuilder.SetResourceBuilder(resourceBuilder) .AddSource(RuntimeActivity.SourceName) .AddHttpClientInstrumentation() .AddAspNetCoreInstrumentation() @@ -86,4 +94,22 @@ static void AddOpenTelemetryTracing(this IHostBuilder builder, ResourceBuilder r .AddOtlpExporter(options => { options.Endpoint = otlpEndpoint; }); })); } + + static void AddOpenTelemetryMetrics(this IHostBuilder builder, ResourceBuilder resourceBuilder, Uri otlpEndpoint) + { + builder.ConfigureServices(services => + services.AddOpenTelemetry() + .WithMetrics(providerBuilder => + { + providerBuilder.SetResourceBuilder(resourceBuilder) + .AddMeter(RuntimeMetrics.SourceName) + .AddAspNetCoreInstrumentation() + .AddProtoActorInstrumentation() + .AddOtlpExporter((exporterOptions, readerOptions) => + { + exporterOptions.Endpoint = otlpEndpoint; + readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds = 5000; + }); + })); + } } diff --git a/Source/EventHorizon/Consumer/Connections/MetricsCollector.cs b/Source/EventHorizon/Consumer/Connections/MetricsCollector.cs index 8bbd3d3a0..9026157d1 100644 --- a/Source/EventHorizon/Consumer/Connections/MetricsCollector.cs +++ b/Source/EventHorizon/Consumer/Connections/MetricsCollector.cs @@ -2,7 +2,9 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Diagnostics.Metrics; using Dolittle.Runtime.DependencyInversion.Lifecycle; +using Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics; using Dolittle.Runtime.Metrics; using Prometheus; @@ -24,6 +26,16 @@ public class MetricsCollector : IMetricsCollector readonly Counter _totalSubscriptionsWithMissingArguments; readonly Counter _totalSubscriptionsWithMissingConsent; + readonly Counter _totalConnectionAttemptsOtel; + readonly Counter _totalConnectionsFailedOtel; + readonly Counter _totalSuccessfulResponsesOtel; + readonly Counter _totalFailureResponsesOtel; + readonly Counter _totalEventHorizonEventsHandledOtel; + readonly Counter _totalEventHorizonEventsFailedHandlingOtel; + readonly Counter _totalTimeSpentConnectingOtel; + readonly Counter _totalSubscriptionsWithMissingArgumentsOtel; + readonly Counter _totalSubscriptionsWithMissingConsentOtel; + public MetricsCollector(IMetricFactory metricFactory) { _totalConnectionAttempts = metricFactory.CreateCounter( @@ -61,41 +73,116 @@ public MetricsCollector(IMetricFactory metricFactory) _totalSubscriptionsWithMissingConsent = metricFactory.CreateCounter( "dolittle_shared_runtime_event_horizon_consumer_subscriptions_with_missing_consent_total", "EventHorizonConnection total number of subscriptions failed due to missing consent"); + + + // OpenTelemetry + _totalConnectionAttemptsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_connection_attempts_total", + "connection_attempts", + "EventHorizonConnection total number of connection attempts"); + + _totalConnectionsFailedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_failed_connections_total", + "errors", + "EventHorizonConnection total number of failed connections"); + + _totalSuccessfulResponsesOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_connections_successful_responses_total", + "responses", + "EventHorizonConnection total number of successful connection responses"); + + _totalFailureResponsesOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_connections_failure_responses_total", + "errors", + "EventHorizonConnection total number of failure connection responses"); + + _totalEventHorizonEventsHandledOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_handled_events_total", + "events", + "EventHorizonConnection total number of event horizon events handled"); + + _totalEventHorizonEventsFailedHandlingOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_failed_event_handling_total", + "errors", + "EventHorizonConnection total number of event horizon events failed handling"); + + _totalTimeSpentConnectingOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_time_spent_connecting_to_event_horizon_total", + "seconds", + "EventHorizonConnection total time spent successfully connecting to an event horizon"); + + _totalSubscriptionsWithMissingArgumentsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_subscriptions_with_missing_arguments_total", + "errors", + "EventHorizonConnection total number of subscriptions failed due to missing request arguments"); + + _totalSubscriptionsWithMissingConsentOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_subscriptions_with_missing_consent_total", + "errors", + "EventHorizonConnection total number of subscriptions failed due to missing consent"); + } /// public void IncrementTotalConnectionAttempts() - => _totalConnectionAttempts.Inc(); + { + _totalConnectionAttempts.Inc(); + _totalConnectionAttemptsOtel.Add(1); + } /// public void IncrementTotalConnectionsFailed() - => _totalConnectionsFailed.Inc(); + { + _totalConnectionsFailed.Inc(); + _totalConnectionsFailedOtel.Add(1); + } /// public void IncrementTotalSuccessfulResponses() - => _totalSuccessfulResponses.Inc(); + { + _totalSuccessfulResponses.Inc(); + _totalSuccessfulResponsesOtel.Add(1); + } /// public void IncrementTotalFailureResponses() - => _totalFailureResponses.Inc(); + { + _totalFailureResponses.Inc(); + _totalFailureResponsesOtel.Add(1); + } /// public void IncrementTotalEventHorizonEventsHandled() - => _totalEventHorizonEventsHandled.Inc(); + { + _totalEventHorizonEventsHandled.Inc(); + _totalEventHorizonEventsHandledOtel.Add(1); + } /// public void IncrementTotalEventHorizonEventsFailedHandling() - => _totalEventHorizonEventsFailedHandling.Inc(); + { + _totalEventHorizonEventsFailedHandling.Inc(); + _totalEventHorizonEventsFailedHandlingOtel.Add(1); + } /// public void AddTotalTimeSpentConnecting(TimeSpan elapsed) - => _totalTimeSpentConnecting.Inc(elapsed.TotalSeconds); + { + _totalTimeSpentConnecting.Inc(elapsed.TotalSeconds); + _totalTimeSpentConnectingOtel.Add(elapsed.TotalSeconds); + } /// public void IncrementTotalSubcriptionsWithMissingArguments() - => _totalSubscriptionsWithMissingArguments.Inc(); + { + _totalSubscriptionsWithMissingArguments.Inc(); + _totalSubscriptionsWithMissingArgumentsOtel.Add(1); + } /// public void IncrementTotalSubscriptionsWithMissingConsent() - => _totalSubscriptionsWithMissingConsent.Inc(); + { + _totalSubscriptionsWithMissingConsent.Inc(); + _totalSubscriptionsWithMissingConsentOtel.Add(1); + } } diff --git a/Source/EventHorizon/Consumer/MetricsCollector.cs b/Source/EventHorizon/Consumer/MetricsCollector.cs index ad7edec89..582576ec0 100644 --- a/Source/EventHorizon/Consumer/MetricsCollector.cs +++ b/Source/EventHorizon/Consumer/MetricsCollector.cs @@ -1,7 +1,9 @@ // Copyright (c) Dolittle. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Diagnostics.Metrics; using Dolittle.Runtime.DependencyInversion.Lifecycle; +using Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics; using Dolittle.Runtime.Metrics; using Prometheus; @@ -22,6 +24,15 @@ public class MetricsCollector : IMetricsCollector readonly Counter _totalSubscriptionLoops; readonly Gauge _currentConnectedSubscriptions; + readonly Counter _totalSubscriptionRequestsOtel; + readonly Counter _totalRegisteredSubscriptionsOtel; + readonly Counter _totalSubscriptionRequestsWhereAlreadyStartedOtel; + readonly Counter _totalSubscriptionsWithMissingProducerMicroserviceAddressOtel; + readonly Counter _totalSubscriptionsFailedDueToExceptionOtel; + readonly Counter _totalSubscriptionsFailedDueToReceivingOrWritingEventsCompletedOtel; + readonly Counter _totalSubscriptionLoopsOtel; + readonly UpDownCounter _currentConnectedSubscriptionsOtel; + public MetricsCollector(IMetricFactory metricFactory) { _totalSubscriptionRequests = metricFactory.CreateCounter( @@ -35,7 +46,7 @@ public MetricsCollector(IMetricFactory metricFactory) _currentConnectedSubscriptions = metricFactory.CreateGauge( "dolittle_shared_runtime_event_horizon_consumer_connected_subscriptions_current", "Subscription total number of connected subscriptions"); - + _totalSubscriptionRequestsWhereAlreadyStarted = metricFactory.CreateCounter( "dolittle_shared_runtime_event_horizon_consumer_subscription_already_started_total", "Subscriptions total number of subscription requests made where subscription was already started"); @@ -55,41 +66,108 @@ public MetricsCollector(IMetricFactory metricFactory) _totalSubscriptionLoops = metricFactory.CreateCounter( "dolittle_shared_runtime_event_horizon_consumer_subscription_loops_total", "Subscriptions total number of subscriptions loops"); + + _totalSubscriptionRequestsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_subscription_requests_total", + "requests", + "SubscriptionsService total number of subscription requests received from Head"); + + _totalRegisteredSubscriptionsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_registered_subscriptions_total", + "subscriptions", + "Subscriptions total number of registered subscriptions"); + + _currentConnectedSubscriptionsOtel = RuntimeMetrics.Meter.CreateUpDownCounter( + "dolittle_shared_runtime_event_horizon_consumer_connected_subscriptions_current", + "subscriptions", + "Subscription total number of connected subscriptions"); + + _totalSubscriptionRequestsWhereAlreadyStartedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_subscription_already_started_total", + "subscriptions", + "Subscriptions total number of subscription requests made where subscription was already started"); + + _totalSubscriptionsWithMissingProducerMicroserviceAddressOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_subscription_with_missing_producer_microservice_address_total", + "errors", + "Subscriptions total number of subscriptions where producer microservice address configuration was missing"); + + _totalSubscriptionsFailedDueToExceptionOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_subscription_failed_due_to_exception_total", + "errors", + "Subscriptions total number of subscriptions failed due to an exception"); + + _totalSubscriptionsFailedDueToReceivingOrWritingEventsCompletedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_subscription_failed_due_to_receiving_or_writing_events_completed_total", + "errors", + "Subscriptions total number of subscriptions failed due to receiving or writing events completed"); + + _totalSubscriptionLoopsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_subscription_loops_total", + "loops", + "Subscriptions total number of subscriptions loops"); } /// public void IncrementTotalSubscriptionsInitiatedFromHead() - => _totalSubscriptionRequests.Inc(); + { + _totalSubscriptionRequests.Inc(); + _totalSubscriptionRequestsOtel.Add(1); + } /// public void IncrementTotalRegisteredSubscriptions() - => _totalRegisteredSubscriptions.Inc(); + { + _totalRegisteredSubscriptions.Inc(); + _totalRegisteredSubscriptionsOtel.Add(1); + } /// public void IncrementCurrentConnectedSubscriptions() - => _currentConnectedSubscriptions.Inc(); + { + _currentConnectedSubscriptions.Inc(); + _currentConnectedSubscriptionsOtel.Add(1); + } /// public void DecrementCurrentConnectedSubscriptions() - => _currentConnectedSubscriptions.Dec(); + { + _currentConnectedSubscriptions.Dec(); + _currentConnectedSubscriptionsOtel.Add(-1); + } /// public void IncrementSubscriptionsAlreadyStarted() - => _totalSubscriptionRequestsWhereAlreadyStarted.Inc(); + { + _totalSubscriptionRequestsWhereAlreadyStarted.Inc(); + _totalSubscriptionRequestsWhereAlreadyStartedOtel.Add(1); + } /// public void IncrementSubscriptionsMissingProducerMicroserviceAddress() - => _totalSubscriptionsWithMissingProducerMicroserviceAddress.Inc(); + { + _totalSubscriptionsWithMissingProducerMicroserviceAddress.Inc(); + _totalSubscriptionsWithMissingProducerMicroserviceAddressOtel.Add(1); + } /// public void IncrementSubscriptionsFailedDueToException() - => _totalSubscriptionsFailedDueToException.Inc(); + { + _totalSubscriptionsFailedDueToException.Inc(); + _totalSubscriptionsFailedDueToExceptionOtel.Add(1); + } /// public void IncrementSubscriptionsFailedDueToReceivingOrWritingEventsCompleted() - => _totalSubscriptionsFailedDueToReceivingOrWritingEventsCompleted.Inc(); + { + _totalSubscriptionsFailedDueToReceivingOrWritingEventsCompleted.Inc(); + _totalSubscriptionsFailedDueToReceivingOrWritingEventsCompletedOtel.Add(1); + } /// public void IncrementSubscriptionLoops() - => _totalSubscriptionLoops.Inc(); + { + _totalSubscriptionLoops.Inc(); + _totalSubscriptionLoopsOtel.Add(1); + } } diff --git a/Source/EventHorizon/Consumer/Processing/MetricsCollector.cs b/Source/EventHorizon/Consumer/Processing/MetricsCollector.cs index 76dfdde2f..689ac5ccf 100644 --- a/Source/EventHorizon/Consumer/Processing/MetricsCollector.cs +++ b/Source/EventHorizon/Consumer/Processing/MetricsCollector.cs @@ -1,7 +1,9 @@ // Copyright (c) Dolittle. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Diagnostics.Metrics; using Dolittle.Runtime.DependencyInversion.Lifecycle; +using Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics; using Dolittle.Runtime.Metrics; using Prometheus; @@ -18,6 +20,12 @@ public class MetricsCollector : IMetricsCollector readonly Counter _totalEventsFetched; readonly Counter _totalStreamProcessorsStarted; readonly Counter _totalStreamProcessorStartAttempts; + + readonly Counter _totalEventHorizonEventsProcessedOtel; + readonly Counter _totalEventHorizonEventWritesFailedOtel; + readonly Counter _totalEventsFetchedOtel; + readonly Counter _totalStreamProcessorsStartedOtel; + readonly Counter _totalStreamProcessorStartAttemptsOtel; public MetricsCollector(IMetricFactory metricFactory) { @@ -40,25 +48,66 @@ public MetricsCollector(IMetricFactory metricFactory) _totalStreamProcessorStartAttempts = metricFactory.CreateCounter( "dolittle_system_runtime_event_horizon_consumer_stream_processor_start_attempts_total", "StreamProcessor total number of stream processors attempted started"); + + // OpenTelemetry + _totalEventHorizonEventsProcessedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_events_processed_total", + "events", + "EventProcessor total number of event horizon events processed"); + + _totalEventHorizonEventWritesFailedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_consumer_event_writes_failed_total", + "errors", + "EventProcessor total number of event horizon event writes failed"); + + _totalEventsFetchedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_event_horizon_consumer_events_fetched_total", + "events", + "EventsFromEventHorizonFetcher total number of event horizon events that has been fetched from stream processors"); + + _totalStreamProcessorsStartedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_event_horizon_consumer_stream_processors_started_total", + "count", + "StreamProcessor total number of stream processors started"); + + _totalStreamProcessorStartAttemptsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_event_horizon_consumer_stream_processor_start_attempts_total", + "count", + "StreamProcessor total number of stream processors attempted started"); } /// public void IncrementTotalEventHorizonEventsProcessed() - => _totalEventHorizonEventsProcessed.Inc(); + { + _totalEventHorizonEventsProcessed.Inc(); + _totalEventHorizonEventsProcessedOtel.Add(1); + } /// public void IncrementTotalEventHorizonEventWritesFailed() - => _totalEventHorizonEventWritesFailed.Inc(); + { + _totalEventHorizonEventWritesFailed.Inc(); + _totalEventHorizonEventWritesFailedOtel.Add(1); + } /// public void IncrementTotalEventsFetched() - => _totalEventsFetched.Inc(); + { + _totalEventsFetched.Inc(); + _totalEventsFetchedOtel.Add(1); + } /// public void IncrementTotalStreamProcessorStarted() - => _totalStreamProcessorsStarted.Inc(); + { + _totalStreamProcessorsStarted.Inc(); + _totalStreamProcessorsStartedOtel.Add(1); + } /// public void IncrementTotalStreamProcessorStartAttempts() - => _totalStreamProcessorStartAttempts.Inc(); + { + _totalStreamProcessorStartAttempts.Inc(); + _totalStreamProcessorStartAttemptsOtel.Add(1); + } } diff --git a/Source/EventHorizon/Producer/EventHorizon.cs b/Source/EventHorizon/Producer/EventHorizon.cs index db2a7fe1a..ff509353c 100644 --- a/Source/EventHorizon/Producer/EventHorizon.cs +++ b/Source/EventHorizon/Producer/EventHorizon.cs @@ -19,7 +19,7 @@ namespace Dolittle.Runtime.EventHorizon.Producer; /// -/// Represents an an event horizon in the system. +/// Represents an event horizon in the system. /// public class EventHorizon : IDisposable { @@ -101,7 +101,7 @@ public async Task Start() Id.ConsumerTenant, Id.ProducerTenant, Id.Partition, - Id.PublicStream);; + Id.PublicStream); tasks.OnAllTasksCompleted += () => Log.EventHorizonStopped( _logger, Id.ConsumerMicroservice, diff --git a/Source/EventHorizon/Producer/MetricsCollector.cs b/Source/EventHorizon/Producer/MetricsCollector.cs index aa8a43711..b9c392881 100644 --- a/Source/EventHorizon/Producer/MetricsCollector.cs +++ b/Source/EventHorizon/Producer/MetricsCollector.cs @@ -1,7 +1,9 @@ // Copyright (c) Dolittle. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Diagnostics.Metrics; using Dolittle.Runtime.DependencyInversion.Lifecycle; +using Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics; using Dolittle.Runtime.Metrics; using Prometheus; @@ -17,6 +19,11 @@ public class MetricsCollector : IMetricsCollector readonly Counter _totalRejectedSubscriptions; readonly Counter _totalAcceptedSubscriptions; readonly Counter _totalEventsWrittenToEventHorizon; + + readonly Counter _totalIncomingSubscriptionsOtel; + readonly Counter _totalRejectedSubscriptionsOtel; + readonly Counter _totalAcceptedSubscriptionsOtel; + readonly Counter _totalEventsWrittenToEventHorizonOtel; public MetricsCollector(IMetricFactory metricFactory) { @@ -34,21 +41,53 @@ public MetricsCollector(IMetricFactory metricFactory) _totalEventsWrittenToEventHorizon = metricFactory.CreateCounter( "dolittle_shared_runtime_event_horizon_producer_events_written_total", "ConsumerService total number of events written to event horizon"); + + _totalIncomingSubscriptionsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_producer_incoming_subscriptions_total", + "subscriptions", + "ConsumerService total number of subscription received from other Runtimes"); + + _totalRejectedSubscriptionsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_producer_rejected_subscriptions_total", + "subscriptions", + "ConsumerService total number of rejected subscriptions"); + + _totalAcceptedSubscriptionsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_producer_accepted_subscriptions_total", + "subscriptions", + "ConsumerService total number of accepted subscriptions"); + + _totalEventsWrittenToEventHorizonOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_shared_runtime_event_horizon_producer_events_written_total", + "events", + "ConsumerService total number of events written to event horizon"); } /// public void IncrementTotalIncomingSubscriptions() - => _totalIncomingSubscriptions.Inc(); + { + _totalIncomingSubscriptions.Inc(); + _totalIncomingSubscriptionsOtel.Add(1); + } /// public void IncrementTotalRejectedSubscriptions() - => _totalRejectedSubscriptions.Inc(); + { + _totalRejectedSubscriptions.Inc(); + _totalRejectedSubscriptionsOtel.Add(1); + } /// public void IncrementTotalAcceptedSubscriptions() - => _totalAcceptedSubscriptions.Inc(); + { + _totalAcceptedSubscriptions.Inc(); + _totalAcceptedSubscriptionsOtel.Add(1); + } /// public void IncrementTotalEventsWrittenToEventHorizon() - => _totalEventsWrittenToEventHorizon.Inc(); + { + _totalEventsWrittenToEventHorizon.Inc(); + _totalEventsWrittenToEventHorizonOtel.Add(1); + } } diff --git a/Source/Events/Events.csproj b/Source/Events/Events.csproj index 31dd8e229..b3f7d7f5a 100644 --- a/Source/Events/Events.csproj +++ b/Source/Events/Events.csproj @@ -8,6 +8,7 @@ + diff --git a/Source/Events/Processing/EventHandlers/Actors/ConcurrentPartitionedProcessor.cs b/Source/Events/Processing/EventHandlers/Actors/ConcurrentPartitionedProcessor.cs index 6427edc38..1ff9a2b2c 100644 --- a/Source/Events/Processing/EventHandlers/Actors/ConcurrentPartitionedProcessor.cs +++ b/Source/Events/Processing/EventHandlers/Actors/ConcurrentPartitionedProcessor.cs @@ -13,6 +13,7 @@ using Dolittle.Runtime.Domain.Tenancy; using Dolittle.Runtime.Events.Processing.Streams; using Dolittle.Runtime.Events.Processing.Streams.Partitioned; +using Dolittle.Runtime.Events.Store; using Dolittle.Runtime.Events.Store.Streams; using Microsoft.Extensions.Logging; using Proto; @@ -118,6 +119,14 @@ public bool TryGetTimeToRetry(ActiveRequests activeRequests, out TimeSpan timeTo /// public State WithSkippedEvent(StreamEvent streamEvent) => this with { ProcessorState = ProcessorState.WithResult(SkippedProcessing.Instance, streamEvent, DateTimeOffset.UtcNow) }; + + /// + /// When there are unhandled events in the event log, the processor state should be updated to reflect that the event was skipped. + /// + /// + /// + public State WithNextProcessingPosition(EventLogSequenceNumber nextProcessingPosition) => + this with { ProcessorState = ProcessorState.WithNextEventLogSequence(nextProcessingPosition) }; } readonly ImmutableHashSet _handledTypes; @@ -145,7 +154,7 @@ public ConcurrentPartitionedProcessor( _handledTypes = handledEventTypes.Select(_ => _.Value).ToImmutableHashSet(); } - public async Task Process(ChannelReader messages, IStreamProcessorState state, CancellationToken cancellationToken, + public async Task Process(ChannelReader messages, IStreamProcessorState state, CancellationToken cancellationToken, CancellationToken deadlineToken) { var currentState = new State(AsPartitioned(state), new ActiveRequests(_concurrency)); @@ -211,18 +220,30 @@ async Task WaitForCompletions(State currentState, CancellationToken deadlineToke } } - async Task ProcessNextEvent(ChannelReader messages, State currentState, CancellationToken stoppingToken, + async Task ProcessNextEvent(ChannelReader messages, State currentState, CancellationToken stoppingToken, CancellationToken deadlineToken) { - var evt = await messages.ReadAsync(stoppingToken); - if (currentState.ProcessorState.FailingPartitions.TryGetValue(evt.Partition, out _)) + var message = await messages.ReadAsync(stoppingToken); + if (message.IsEvent) { - await currentState.ActiveRequests.AddSkipped(Task.FromResult(AsSkippedEvent(evt))); - return currentState; + var evt = message.StreamEvent; + if (currentState.ProcessorState.FailingPartitions.TryGetValue(evt.Partition, out _)) + { + await currentState.ActiveRequests.AddSkipped(Task.FromResult(AsSkippedEvent(evt))); + return currentState; + } + + var newTask = ProcessEventAndReturnStateUpdateCallback(evt, deadlineToken); + await currentState.ActiveRequests.Add(evt.Partition, newTask).ConfigureAwait(false); } - var newTask = ProcessEventAndReturnStateUpdateCallback(evt, deadlineToken); - await currentState.ActiveRequests.Add(evt.Partition, newTask).ConfigureAwait(false); + else + { + // Eventlog has unhandled events, skip them + currentState = currentState.WithNextProcessingPosition(message.NextEventLogSequenceNumber); + await PersistNewState(currentState.ProcessorState, deadlineToken); + } + return currentState; } @@ -255,7 +276,7 @@ internal enum NextAction /// Process the next event in the event stream. /// ProcessNextEvent, - + /// /// Receive the result of the current event being processed /// @@ -269,7 +290,7 @@ internal enum NextAction /// /// Processing is complete. /// - Completed + Completed, } /// @@ -283,7 +304,7 @@ internal enum NextAction /// /// internal static async ValueTask<(NextAction, PartitionId?)> WaitForNextAction( - ChannelReader messages, + ChannelReader messages, State state, CancellationToken cancellationToken) { @@ -354,9 +375,9 @@ internal enum NextAction return (NextAction.ReceiveResult, default); } - if (messages.TryPeek(out var streamEvent) && state.ActiveRequests.IsProcessing(streamEvent.Partition)) + if (messages.TryPeek(out var message) && message.IsEvent && state.ActiveRequests.IsProcessing(message.StreamEvent.Partition)) { - return (NextAction.ReceiveResult, streamEvent.Partition); + return (NextAction.ReceiveResult, message.StreamEvent.Partition); } return (NextAction.ProcessNextEvent, default); diff --git a/Source/Events/Processing/EventHandlers/Actors/NonPartitionedProcessor.cs b/Source/Events/Processing/EventHandlers/Actors/NonPartitionedProcessor.cs index 34f889155..71cc71a54 100644 --- a/Source/Events/Processing/EventHandlers/Actors/NonPartitionedProcessor.cs +++ b/Source/Events/Processing/EventHandlers/Actors/NonPartitionedProcessor.cs @@ -33,7 +33,7 @@ public NonPartitionedProcessor( } - public async Task Process(ChannelReader messages, IStreamProcessorState state, CancellationToken cancellationToken, + public async Task Process(ChannelReader messages, IStreamProcessorState state, CancellationToken cancellationToken, CancellationToken deadlineToken) { var currentState = AsNonPartitioned(state); @@ -41,7 +41,18 @@ public async Task Process(ChannelReader messages, IStreamProcessorS { try { - var evt = await messages.ReadAsync(cancellationToken); + var message = await messages.ReadAsync(cancellationToken); + + if (!message.IsEvent) + { + var nextSequenceNumber = message.NextEventLogSequenceNumber; + // No event in batch, update offset and wait for next batch + currentState = currentState.WithNextEventLogSequence(nextSequenceNumber); + await PersistNewState(currentState, deadlineToken); + continue; + } + + var evt = message.StreamEvent; (currentState, var processingResult) = await ProcessEventAndHandleResult(evt, currentState, deadlineToken); await PersistNewState(currentState, deadlineToken); diff --git a/Source/Events/Processing/EventHandlers/Actors/PartitionedProcessor.cs b/Source/Events/Processing/EventHandlers/Actors/PartitionedProcessor.cs index eb789bd61..976ca9ab9 100644 --- a/Source/Events/Processing/EventHandlers/Actors/PartitionedProcessor.cs +++ b/Source/Events/Processing/EventHandlers/Actors/PartitionedProcessor.cs @@ -28,7 +28,8 @@ record State(StreamProcessorState ProcessorState) { public bool NoFailingEvents => ProcessorState.FailingPartitions.IsEmpty; - public bool TryGetTimeToRetry(out TimeSpan timeToRetry, [NotNullWhen(true)] out PartitionId? selectedPartitionId) + public bool TryGetTimeToRetry(out TimeSpan timeToRetry, + [NotNullWhen(true)] out PartitionId? selectedPartitionId) { timeToRetry = TimeSpan.MaxValue; selectedPartitionId = default; @@ -36,7 +37,8 @@ public bool TryGetTimeToRetry(out TimeSpan timeToRetry, [NotNullWhen(true)] out foreach (var (partitionId, failingPartitionState) in ProcessorState.FailingPartitions) { - if (failingPartitionState.TryGetTimespanToRetry(out var partitionTimeToRetry) && partitionTimeToRetry < timeToRetry) + if (failingPartitionState.TryGetTimespanToRetry(out var partitionTimeToRetry) && + partitionTimeToRetry < timeToRetry) { timeToRetry = partitionTimeToRetry; selectedPartitionId = partitionId; @@ -64,13 +66,16 @@ public PartitionedProcessor( ILogger logger) : base( - streamProcessorId, processor, streamProcessorStates, executionContext, onProcessed, onFailedToProcess, tenantId, logger) + streamProcessorId, processor, streamProcessorStates, executionContext, onProcessed, onFailedToProcess, + tenantId, logger) { _fetcher = fetcher; _handledTypes = handledEventTypes.Select(_ => _.Value).ToImmutableHashSet(); } - public async Task Process(ChannelReader messages, IStreamProcessorState state, CancellationToken cancellationToken, CancellationToken deadlineToken) + public async Task Process( + ChannelReader messages, + IStreamProcessorState state, CancellationToken cancellationToken, CancellationToken deadlineToken) { var currentState = new State(AsPartitioned(state)); @@ -91,7 +96,8 @@ public async Task Process(ChannelReader messages, IStreamProcessorS currentState = await HandleNewEvent(evt, currentState, deadlineToken); break; case NextAction.ProcessFailedEvents: - currentState = await CatchUpForPartition(currentState, partitionId!, cancellationToken,deadlineToken); + currentState = await CatchUpForPartition(currentState, partitionId!, cancellationToken, + deadlineToken); break; } } @@ -130,7 +136,9 @@ enum NextAction /// /// /// - async ValueTask<(NextAction, PartitionId?)> WaitForNextAction(ChannelReader messages, State state, CancellationToken cancellationToken) + async ValueTask<(NextAction, PartitionId?)> WaitForNextAction( + ChannelReader messages, State state, + CancellationToken cancellationToken) { if (!state.TryGetTimeToRetry(out var timeToRetry, out var partitionId)) { @@ -158,9 +166,9 @@ async Task WaitForNextEvent() var notClosed = await messages.WaitToReadAsync(cancellationToken); if (notClosed) { - if (_catchingUp && messages.TryPeek(out var evt)) + if (_catchingUp && messages.TryPeek(out var message)) { - if (evt.Event.EventLogSequenceNumber < state.ProcessorState.Position.EventLogPosition) + if (message.IsEvent && message.StreamEvent.Event.EventLogSequenceNumber < state.ProcessorState.Position.EventLogPosition) { return NextAction.ProcessCatchUpEvent; } @@ -177,25 +185,40 @@ async Task WaitForNextEvent() } } - async Task HandleNewEvent(StreamEvent evt, State state, CancellationToken deadlineToken) + async Task HandleNewEvent(StreamSubscriptionMessage subscriptionMessage, + State state, CancellationToken deadlineToken) { - if (state.ProcessorState.FailingPartitions.TryGetValue(evt.Partition, out _)) + if (subscriptionMessage.IsEvent) { - return state with + var evt = subscriptionMessage.StreamEvent; + if (state.ProcessorState.FailingPartitions.TryGetValue(evt.Partition, out _)) { - ProcessorState = state.ProcessorState.WithResult(SkippedProcessing.Instance, evt, DateTimeOffset.UtcNow) + return state with + { + ProcessorState = state.ProcessorState.WithResult(SkippedProcessing.Instance, evt, DateTimeOffset.UtcNow) + }; + } + + var (processorState, _) = await ProcessEventAndHandleResult(evt, state.ProcessorState, deadlineToken); + state = state with + { + ProcessorState = processorState }; } - - var (processorState, _) = await ProcessEventAndHandleResult(evt, state.ProcessorState, deadlineToken); - state = state with + else { - ProcessorState = processorState - }; - + // Unhandled events, skip forward + state = state with + { + ProcessorState = state.ProcessorState.WithNextEventLogSequence(subscriptionMessage.NextEventLogSequenceNumber) + }; + } + + return state; } + StreamProcessorState AsPartitioned(IStreamProcessorState state) { switch (state) @@ -206,8 +229,10 @@ StreamProcessorState AsPartitioned(IStreamProcessorState state) case Dolittle.Runtime.Events.Processing.Streams.StreamProcessorState nonPartitionedState: if (!nonPartitionedState.IsFailing) { - Logger.LogInformation("Converting non-partitioned state to partitioned for {StreamProcessorId}", Identifier); - return new StreamProcessorState(nonPartitionedState.Position, nonPartitionedState.LastSuccessfullyProcessed); + Logger.LogInformation("Converting non-partitioned state to partitioned for {StreamProcessorId}", + Identifier); + return new StreamProcessorState(nonPartitionedState.Position, + nonPartitionedState.LastSuccessfullyProcessed); } throw new ArgumentException("State is not convertible to partitioned"); @@ -224,19 +249,22 @@ async Task CatchUpForPartition( CancellationToken deadlineToken) { var failingPartitionState = state.ProcessorState.FailingPartitions[partition]; - if (!ShouldRetryProcessing(failingPartitionState)) return state; // Should not really happen, since we explicitly wait for each partition + if (!ShouldRetryProcessing(failingPartitionState)) + return state; // Should not really happen, since we explicitly wait for each partition var startPosition = new StreamPosition(failingPartitionState.Position.EventLogPosition.Value); var highWatermark = new StreamPosition(state.ProcessorState.Position.EventLogPosition.Value); - var (events, hasMoreEvents) = await _fetcher.FetchInPartition(partition, startPosition, highWatermark, _handledTypes, cancellationToken); + var (events, hasMoreEvents) = await _fetcher.FetchInPartition(partition, startPosition, highWatermark, + _handledTypes, cancellationToken); foreach (var streamEvent in events) { if (cancellationToken.IsCancellationRequested) { return state; } + var (newState, processingResult) = await RetryProcessingEventAndHandleResult( streamEvent, state.ProcessorState, @@ -268,5 +296,6 @@ async Task CatchUpForPartition( return state; } - static bool ShouldRetryProcessing(FailingPartitionState state) => DateTimeOffset.UtcNow.CompareTo(state.RetryTime) >= 0; + static bool ShouldRetryProcessing(FailingPartitionState state) => + DateTimeOffset.UtcNow.CompareTo(state.RetryTime) >= 0; } diff --git a/Source/Events/Processing/EventHandlers/Actors/TenantScopedStreamProcessorActor.cs b/Source/Events/Processing/EventHandlers/Actors/TenantScopedStreamProcessorActor.cs index f97a3b3e5..2daf15351 100644 --- a/Source/Events/Processing/EventHandlers/Actors/TenantScopedStreamProcessorActor.cs +++ b/Source/Events/Processing/EventHandlers/Actors/TenantScopedStreamProcessorActor.cs @@ -265,7 +265,7 @@ void LogInitialState(IStreamProcessorState initialState) return (shutdownTokenSource.Token, deadlineTokenSource.Token); } - async Task StartProcessing(IStreamProcessorState streamProcessorState, ChannelReader events, IContext context, CancellationToken stoppingToken, + async Task StartProcessing(IStreamProcessorState streamProcessorState, ChannelReader events, IContext context, CancellationToken stoppingToken, CancellationToken deadlineToken) { try @@ -342,14 +342,14 @@ async Task StartProcessing(IStreamProcessorState streamProcessorState, ChannelRe } } - ChannelReader SubscribeUntil(ProcessingPosition from, DateTimeOffset to, CancellationToken token) + ChannelReader SubscribeUntil(ProcessingPosition from, DateTimeOffset to, CancellationToken token) { var unixTimeSeconds = to.ToUnixTimeSeconds(); return StartSubscription(from, evt => evt.Occurred.Seconds >= unixTimeSeconds, token); } - ChannelReader StartSubscription(ProcessingPosition from, Predicate? until, CancellationToken token) + ChannelReader StartSubscription(ProcessingPosition from, Predicate? until, CancellationToken token) { return _eventSubscriber.Subscribe( Identifier.ScopeId, @@ -427,7 +427,7 @@ async ValueTask GetInitialProcessorState(CancellationToke /// /// Gets the . /// - protected ILogger Logger { get; } + ILogger Logger { get; } public void Dispose() { diff --git a/Source/Events/Processing/EventHandlers/MetricsCollector.cs b/Source/Events/Processing/EventHandlers/MetricsCollector.cs index 3002a213a..22624534d 100644 --- a/Source/Events/Processing/EventHandlers/MetricsCollector.cs +++ b/Source/Events/Processing/EventHandlers/MetricsCollector.cs @@ -2,7 +2,10 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Collections.Generic; +using System.Diagnostics.Metrics; using Dolittle.Runtime.DependencyInversion.Lifecycle; +using Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics; using Dolittle.Runtime.Domain.Tenancy; using Dolittle.Runtime.Events.Store; using Dolittle.Runtime.Events.Store.Streams; @@ -25,7 +28,15 @@ public class MetricsCollector : IMetricsCollector readonly Counter _customerEventProcessingFailuresTotal; readonly Histogram _customerEventProcessingTime; - /// + // OpenTelemetry + readonly Counter _customerRegistrationsTotalOtel; + readonly Counter _systemRegistrationsTotalOtel; + readonly Counter _customerFailedRegistrationsTotalOtel; + readonly Counter _systemFailedRegistrationsTotalOtel; + readonly Counter _customerEventProcessingFailuresTotalOtel; + readonly Histogram _customerEventProcessingTimeOtel; + + /// /// Creates a new instance of the class. /// /// The metric factory to use to create metrics. @@ -61,43 +72,111 @@ public MetricsCollector(IMetricFactory metricFactory, IEventTypes eventTypes) "The processing time of events processed by event handlers", new[] { "scope", "eventHandlerId", "eventHandlerAlias", "tenantId", "eventTypeId", "eventTypeAlias" }, new[] { 0.001, 0.01, 0.1, 1, 10 }); + + // OpenTelemetry + _customerRegistrationsTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_customer_runtime_eventhandlers_registrations_total", + "count", + "Total number of event handler registrations", + new Dictionary + { + { "eventHandlerId", "Event handler id" }, + { "eventHandlerAlias", "Event handler alias" } + }); + + _systemRegistrationsTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_eventhandlers_registrations_total", + "registrations", + "Total number of event handler registrations"); + + _customerFailedRegistrationsTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_customer_runtime_eventhandlers_failed_registrations_total", + "errors", + "Total number of failed event handler registrations"); + + _systemFailedRegistrationsTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_eventhandlers_failed_registrations_total", + "errors", + "Total number of failed event handler registrations"); + + _customerEventProcessingFailuresTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_customer_runtime_eventhandlers_event_processing_failures_total", + "errors", + "Total number of failed event processing attempts by an event handler"); + + _customerEventProcessingTimeOtel = RuntimeMetrics.Meter.CreateHistogram( + "dolittle_customer_runtime_eventhandlers_event_processing_time_seconds", + "seconds", + "The processing time of events processed by event handlers"); } - /// public void IncrementRegistrationsTotal(EventHandlerInfo info) { - _customerRegistrationsTotal.WithLabels(info.Id.EventHandler.ToString(), info.HasAlias ? info.Alias : string.Empty).Inc(); + string handlerAlias = info.HasAlias ? info.Alias : string.Empty; + var handlerId = info.Id.EventHandler.ToString(); + _customerRegistrationsTotal.WithLabels(handlerId, handlerAlias).Inc(); _systemRegistrationsTotal.Inc(); + _customerRegistrationsTotalOtel.Add(1, new KeyValuePair("eventHandlerId", handlerId), new KeyValuePair("eventHandlerAlias", handlerAlias)); + _systemRegistrationsTotalOtel.Add(1); } - /// - public void IncrementFailedRegistrationsTotal(EventHandlerInfo info) + public void IncrementFailedRegistrationsTotal(EventHandlerInfo info) { - _customerFailedRegistrationsTotal.WithLabels(info.Id.EventHandler.ToString(), info.HasAlias ? info.Alias : string.Empty).Inc(); + string handlerAlias = info.HasAlias ? info.Alias : string.Empty; + var handlerId = info.Id.EventHandler.ToString(); + _customerFailedRegistrationsTotal.WithLabels(handlerId, handlerAlias).Inc(); _systemFailedRegistrationsTotal.Inc(); + _customerFailedRegistrationsTotalOtel.Add(1, new KeyValuePair("eventHandlerId", handlerId), new KeyValuePair("eventHandlerAlias", handlerAlias)); + _systemFailedRegistrationsTotalOtel.Add(1); } - /// public void IncrementEventsProcessedTotal(EventHandlerInfo info, TenantId tenant, StreamEvent @event, TimeSpan processingTime) { + var scopeId = info.Id.Scope.ToString(); + var handlerId = info.Id.EventHandler.ToString(); + string handlerAlias = info.HasAlias ? info.Alias : string.Empty; + var tenantId = tenant.ToString(); + var eventTypeId = @event.Event.Type.Id.ToString(); + var eventTypeAlias = _eventTypes.GetEventTypeAliasOrEmptyString(@event.Event.Type); _customerEventProcessingTime.WithLabels( - info.Id.Scope.ToString(), - info.Id.EventHandler.ToString(), - info.HasAlias ? info.Alias : string.Empty, - tenant.ToString(), - @event.Event.Type.Id.ToString(), - _eventTypes.GetEventTypeAliasOrEmptyString(@event.Event.Type)) + scopeId, + handlerId, + handlerAlias, + tenantId, + eventTypeId, + eventTypeAlias) .Observe(processingTime.TotalSeconds); + _customerEventProcessingTimeOtel.Record(processingTime.TotalSeconds, + new KeyValuePair("scope", scopeId), + new KeyValuePair("eventHandlerId", handlerId), + new KeyValuePair("eventHandlerAlias", handlerAlias), + new KeyValuePair("tenantId", tenantId), + new KeyValuePair("eventTypeId", eventTypeId), + new KeyValuePair("eventTypeAlias", eventTypeAlias)); } - /// public void IncrementEventProcessingFailuresTotal(EventHandlerInfo info, TenantId tenant, StreamEvent @event) - => _customerEventProcessingFailuresTotal.WithLabels( - info.Id.Scope.ToString(), - info.Id.EventHandler.ToString(), - info.HasAlias ? info.Alias : string.Empty, - tenant.ToString(), - @event.Event.Type.Id.ToString(), - _eventTypes.GetEventTypeAliasOrEmptyString(@event.Event.Type)) + { + var scopeId = info.Id.Scope.ToString(); + var eventHandlerId = info.Id.EventHandler.ToString(); + string handlerAlias = info.HasAlias ? info.Alias : string.Empty; + var tenantId = tenant.ToString(); + var eventTypeId = @event.Event.Type.Id.ToString(); + var eventHandlerAlias = _eventTypes.GetEventTypeAliasOrEmptyString(@event.Event.Type); + _customerEventProcessingFailuresTotal.WithLabels( + scopeId, + eventHandlerId, + handlerAlias, + tenantId, + eventTypeId, + eventHandlerAlias) .Inc(); + _customerEventProcessingFailuresTotalOtel.Add(1, + new KeyValuePair("scope", scopeId), + new KeyValuePair("eventHandlerId", eventHandlerId), + new KeyValuePair("eventHandlerAlias", handlerAlias), + new KeyValuePair("tenantId", tenantId), + new KeyValuePair("eventTypeId", eventTypeId), + new KeyValuePair("eventTypeAlias", eventHandlerAlias)); + } } diff --git a/Source/Events/Processing/EventProcessorKind.cs b/Source/Events/Processing/EventProcessorKind.cs index 11ede31a0..04e8e3165 100644 --- a/Source/Events/Processing/EventProcessorKind.cs +++ b/Source/Events/Processing/EventProcessorKind.cs @@ -6,7 +6,7 @@ namespace Dolittle.Runtime.Events.Processing; /// -/// Represents the kind of an . +/// Represents the kind of . /// public record EventProcessorKind(string Value) : ConceptAs(Value) { diff --git a/Source/Events/Processing/Streams/MetricsCollector.cs b/Source/Events/Processing/Streams/MetricsCollector.cs index a7dd98027..58ad18728 100644 --- a/Source/Events/Processing/Streams/MetricsCollector.cs +++ b/Source/Events/Processing/Streams/MetricsCollector.cs @@ -2,7 +2,10 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Collections.Generic; +using System.Diagnostics.Metrics; using Dolittle.Runtime.DependencyInversion.Lifecycle; +using Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics; using Dolittle.Runtime.Metrics; using Prometheus; @@ -24,6 +27,16 @@ public class MetricsCollector : IMetricsCollector readonly Histogram _eventProcessingTime; readonly Counter _failedEventsProcessedTotal; + readonly Counter _registrationsTotalOtel; + readonly Counter _failedRegistrationsTotalOtel; + readonly Counter _initializationsTotalOtel; + readonly Counter _startsTotalOtel; + readonly Counter _failuresTotalOtel; + readonly Counter _positionSetTotalOtel; + readonly Counter _initialPositionSetForAllTenantsTotalOtel; + readonly Counter _eventProcessingTimeOtel; + readonly Counter _failedEventsProcessedTotalOtel; + /// /// Creates a new instance of the class. @@ -76,42 +89,116 @@ public MetricsCollector(IMetricFactory metricFactory) "dolittle_system_runtime_streamprocessors_failed_event_processings_total", "Total number of event processing attempts per event processor kind.", new[] { "eventProcessorKind" }); + // New OpenTelemetry metrics initialization + _registrationsTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_streamprocessors_registrations_total", + "registrations", + "Total number of stream processors registration attempts per event processor kind"); + + _failedRegistrationsTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_streamprocessors_failed_registrations_total", + "errors", + "Total number of failed stream processor registrations per event processor kind."); + + _initializationsTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_streamprocessors_initializations_total", + "count", + "Total number of stream processor initializations per event processor kind."); + + _startsTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_streamprocessors_starts_total", + "count", + "Total number of stream processor starts per event processor kind."); + + _failuresTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_streamprocessors_failures_total", + "errors", + "Total number of stream processor faliures per event processor kind."); + + _positionSetTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_streamprocessors_position_set_total", + "count", + "Total number of times a stream processor has been reset to a specific position for one tenant per event processor kind."); + + _initialPositionSetForAllTenantsTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_streamprocessors_initial_position_set_for_all_tenants_total", + "count", + "Total number of times a stream processor has been reset the the beginning for all tenants per event processor kind."); + + _eventProcessingTimeOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_streamprocessors_event_processing_time_seconds", + "seconds", + "The time spent processing events per event processor kind."); + + _failedEventsProcessedTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_streamprocessors_failed_event_processings_total", + "errors", + "Total number of event processing attempts per event processor kind."); } /// public void IncrementRegistrations(EventProcessorKind kind) - => _registrationsTotal.WithLabels(kind).Inc(); + { + _registrationsTotal.WithLabels(kind).Inc(); + _registrationsTotalOtel.Add(1, ToTag(kind)); + } /// public void IncrementFailedRegistrations(EventProcessorKind kind) - => _failedRegistrationsTotal.WithLabels(kind).Inc(); + { + _failedRegistrationsTotal.WithLabels(kind).Inc(); + _failedRegistrationsTotalOtel.Add(1, ToTag(kind)); + } /// public void IncrementInitializations(EventProcessorKind kind) - => _initializationsTotal.WithLabels(kind).Inc(); + { + _initializationsTotal.WithLabels(kind).Inc(); + _initializationsTotalOtel.Add(1, ToTag(kind)); + } /// public void IncrementStarts(EventProcessorKind kind) - => _startsTotal.WithLabels(kind).Inc(); + { + _startsTotal.WithLabels(kind).Inc(); + _startsTotalOtel.Add(1, ToTag(kind)); + } /// public void IncrementFailures(EventProcessorKind kind) - => _failuresTotal.WithLabels(kind).Inc(); + { + _failuresTotal.WithLabels(kind).Inc(); + _failuresTotalOtel.Add(1, ToTag(kind)); + } /// public void IncrementPositionSet(EventProcessorKind kind) - => _positionSetTotal.WithLabels(kind).Inc(); + { + _positionSetTotal.WithLabels(kind).Inc(); + _positionSetTotalOtel.Add(1, ToTag(kind)); + } /// public void IncrementInitialPositionSetForAllTenants(EventProcessorKind kind) - => _initialPositionSetForAllTenantsTotal.WithLabels(kind).Inc(); + { + _initialPositionSetForAllTenantsTotal.WithLabels(kind).Inc(); + _initialPositionSetForAllTenantsTotalOtel.Add(1, ToTag(kind)); + } /// public void IncrementEventsProcessed(EventProcessorKind kind, TimeSpan elapsed) - => _eventProcessingTime.WithLabels(kind).Observe(elapsed.TotalSeconds); + { + _eventProcessingTime.WithLabels(kind).Observe(elapsed.TotalSeconds); + _eventProcessingTimeOtel.Add(elapsed.TotalSeconds, ToTag(kind)); + } /// public void IncrementFailedEventsProcessed(EventProcessorKind kind) - => _failedEventsProcessedTotal.WithLabels(kind).Inc(); + { + _failedEventsProcessedTotal.WithLabels(kind).Inc(); + _failedEventsProcessedTotalOtel.Add(1, ToTag(kind)); + } + + static KeyValuePair ToTag(EventProcessorKind kind) => new("eventProcessorKind", kind.ToString()); } diff --git a/Source/Events/Processing/Streams/Partitioned/StreamProcessorState.cs b/Source/Events/Processing/Streams/Partitioned/StreamProcessorState.cs index 55ec536dc..1aba7a93c 100644 --- a/Source/Events/Processing/Streams/Partitioned/StreamProcessorState.cs +++ b/Source/Events/Processing/Streams/Partitioned/StreamProcessorState.cs @@ -240,6 +240,21 @@ public StreamProcessorState SkipEventsBefore(EventLogSequenceNumber eventLogSequ }; } + IStreamProcessorState IStreamProcessorState.WithNextEventLogSequence(EventLogSequenceNumber nextEventLogSequenceNumber) => WithNextEventLogSequence(nextEventLogSequenceNumber); + + public StreamProcessorState WithNextEventLogSequence(EventLogSequenceNumber nextEventLogSequenceNumber) + { + if(nextEventLogSequenceNumber < Position.EventLogPosition) + { + throw new ArgumentException("Cannot set the next event log sequence number to a lower value than the current event log sequence number", nameof(nextEventLogSequenceNumber)); + } + + return this with + { + Position = Position with { EventLogPosition = nextEventLogSequenceNumber } + }; + } + static StreamProcessorState AddFailingPartitionFor( StreamProcessorState oldState, ProcessingPosition failedPosition, diff --git a/Source/Events/Processing/Streams/StreamProcessorState.cs b/Source/Events/Processing/Streams/StreamProcessorState.cs index 7b21cca53..1bcba8714 100644 --- a/Source/Events/Processing/Streams/StreamProcessorState.cs +++ b/Source/Events/Processing/Streams/StreamProcessorState.cs @@ -182,6 +182,13 @@ public StreamProcessorState SkipEventsBefore(EventLogSequenceNumber position) return new StreamProcessorState(Position with { EventLogPosition = position }, LastSuccessfullyProcessed); } + IStreamProcessorState IStreamProcessorState.WithNextEventLogSequence(EventLogSequenceNumber nextEventLogSequenceNumber) => WithNextEventLogSequence(nextEventLogSequenceNumber); + + public StreamProcessorState WithNextEventLogSequence(EventLogSequenceNumber nextEventLogSequenceNumber) => this with + { + Position = Position with { EventLogPosition = nextEventLogSequenceNumber } + }; + bool RetryTimeIsInThePast(DateTimeOffset retryTime) => DateTimeOffset.UtcNow.CompareTo(retryTime) >= 0; diff --git a/Source/Events/Store/Actors/Bucket.cs b/Source/Events/Store/Actors/Bucket.cs index 93a5e0d01..9d806c6cf 100644 --- a/Source/Events/Store/Actors/Bucket.cs +++ b/Source/Events/Store/Actors/Bucket.cs @@ -34,7 +34,7 @@ IStreamProcessorState FromProtobufNonPartitioned() } } - ProcessingPosition Position() => new(CurrentOffset, CurrentEventLogOffset); + public ProcessingPosition Position() => new(CurrentOffset, CurrentEventLogOffset); IStreamProcessorState FromProtobufPartitioned() { diff --git a/Source/Events/Store/Actors/Committer.cs b/Source/Events/Store/Actors/Committer.cs index b5b86674a..144b1e141 100644 --- a/Source/Events/Store/Actors/Committer.cs +++ b/Source/Events/Store/Actors/Committer.cs @@ -42,6 +42,7 @@ public record EventLogSubscriptionManagerSpawned(PID Pid); bool _readyToSend = true; bool _shuttingDown = false; + ulong _lastCommittedOffset; /// /// Initializes a new instance of the class. @@ -95,6 +96,8 @@ async Task OnStarted(IContext context) }); var nextSequenceNumber = await _committedEvents.FetchNextSequenceNumber(ScopeId.Default, context.CancellationToken).ConfigureAwait(false); _pipeline = CommitPipeline.NewFromEventLogSequenceNumber(nextSequenceNumber); + _lastCommittedOffset = nextSequenceNumber - 1; + _metrics.RegisterEventLogOffset(_tenant, ScopeId.Default, () => _lastCommittedOffset); } Task OnStreamSubscriptionManagerSet(EventLogSubscriptionManagerSpawned msg) @@ -156,12 +159,11 @@ Task CommitForAggregate(IContext context, CommitAggregateEventsRequest request, } var aggregate = new Aggregate(request.Events.AggregateRootId.ToGuid(), request.Events.EventSourceId); - if (_aggregateCommitInFlight.Contains(aggregate)) + if (!_aggregateCommitInFlight.Add(aggregate)) { return RespondWithFailure(new EventsForAggregateAlreadyAddedToCommit(aggregate).ToFailure()); } - _aggregateCommitInFlight.Add(aggregate); if (_aggregateRootVersionCache.TryGetValue(aggregate, out var aggregateRootVersion)) { return CommitForAggregate(context, request, aggregate, aggregateRootVersion, respond); @@ -344,7 +346,7 @@ void TrySendBatch(IContext context) return Task.CompletedTask; } - _metrics.IncrementTotalBatchesSuccessfullyPersisted(batchToSend.Batch); + _metrics.IncrementTotalBatchesSuccessfullyPersisted(_tenant,batchToSend.Batch); batchToSend.Complete(); context.Send(_streamSubscriptionManagerPid!, batchToSend.Batch); _readyToSend = true; @@ -353,7 +355,9 @@ void TrySendBatch(IContext context) { _shutdownHook!.MarkCompleted(); } - + + Interlocked.Exchange(ref _lastCommittedOffset, batchToSend.Batch.LastSequenceNumber.Value); + return Task.CompletedTask; }, (ex, _) => diff --git a/Source/Events/Store/Actors/IMetricsCollector.cs b/Source/Events/Store/Actors/IMetricsCollector.cs index 8b1d826b0..7277b2b87 100644 --- a/Source/Events/Store/Actors/IMetricsCollector.cs +++ b/Source/Events/Store/Actors/IMetricsCollector.cs @@ -1,14 +1,16 @@ // Copyright (c) Dolittle. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System; using Dolittle.Runtime.Artifacts; using Dolittle.Runtime.Domain.Tenancy; using Dolittle.Runtime.Events.Store.Persistence; +using Dolittle.Runtime.Events.Store.Streams; namespace Dolittle.Runtime.Events.Store.Actors; /// -/// Defines a system for collecting metrics about the projection store. +/// Defines a system for collecting metrics about the projection store & processing offsets. /// public interface IMetricsCollector { @@ -30,7 +32,7 @@ public interface IMetricsCollector /// /// Increments the total number of commits that has been successfully written to the event store. /// - void IncrementTotalBatchesSuccessfullyPersisted(Commit commit); + void IncrementTotalBatchesSuccessfullyPersisted(TenantId tenant, Commit commit); /// /// Increments the total number of batches that has been sent to the event store persistence layer. @@ -61,4 +63,7 @@ public interface IMetricsCollector /// Increments the number of catch-up events (read from DB) per subscription. /// void IncrementCatchupSubscriptionEvents(string eventProcessorKind, int incBy); + + void RegisterStreamProcessorOffset(TenantId tenant, StreamProcessorId streamProcessorId, Func getNextProcessingPosition); + void RegisterEventLogOffset(TenantId tenant, ScopeId scopeId, Func eventLogPosition); } diff --git a/Source/Events/Store/Actors/MetricsCollector.cs b/Source/Events/Store/Actors/MetricsCollector.cs index 1be946091..539b5a005 100644 --- a/Source/Events/Store/Actors/MetricsCollector.cs +++ b/Source/Events/Store/Actors/MetricsCollector.cs @@ -1,12 +1,21 @@ // Copyright (c) Dolittle. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics.Metrics; +using System.Linq; using Dolittle.Runtime.Aggregates; using Dolittle.Runtime.Artifacts; using Dolittle.Runtime.DependencyInversion.Lifecycle; +using Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics; using Dolittle.Runtime.Domain.Tenancy; +using Dolittle.Runtime.Events.Processing.EventHandlers; using Dolittle.Runtime.Events.Store.Persistence; +using Dolittle.Runtime.Events.Store.Streams; using Dolittle.Runtime.Metrics; +using Dolittle.Runtime.Protobuf; using Prometheus; namespace Dolittle.Runtime.Events.Store.Actors; @@ -18,7 +27,10 @@ namespace Dolittle.Runtime.Events.Store.Actors; public class MetricsCollector : IMetricsCollector { readonly IEventTypes _eventTypes; + readonly IEventHandlers _eventHandlers; readonly IAggregateRoots _aggregateRoots; + + // Prometheus readonly Counter _totalCommitsReceived; readonly Counter _totalCommitsForAggregateReceived; readonly Counter _totalAggregateRootVersionCacheInconsistencies; @@ -33,34 +45,56 @@ public class MetricsCollector : IMetricsCollector readonly Counter _streamedSubscriptionEventsTotal; readonly Counter _catchupSubscriptionEventsTotal; - public MetricsCollector(IMetricFactory metricFactory, IEventTypes eventTypes, IAggregateRoots aggregateRoots) + readonly ConcurrentDictionary> _eventLogOffsets = new(); + + readonly HashSet<(TenantId, StreamProcessorId)> _streamProcessorEventLogOffsets = new(); + + // OpenTelemetry + readonly Counter _totalCommitsReceivedOtel; + readonly Counter _totalCommitsForAggregateReceivedOtel; + readonly Counter _totalAggregateRootVersionCacheInconsistenciesOtel; + readonly Counter _totalBatchesSuccessfullyPersistedOtel; + readonly Counter _totalBatchedEventsSuccessfullyPersistedOtel; + readonly Counter _totalBatchesSentOtel; + readonly Counter _totalAggregateRootVersionCacheInconsistenciesResolvedOtel; + readonly Counter _totalBatchesFailedPersistingOtel; + readonly Counter _totalCommittedEventsOtel; + readonly Counter _totalCommittedAggregateEventsOtel; + readonly Counter _totalAggregateConcurrencyConflictsOtel; + readonly Counter _streamedSubscriptionEventsTotalOtel; + readonly Counter _catchupSubscriptionEventsTotalOtel; + + + public MetricsCollector(IMetricFactory metricFactory, IEventTypes eventTypes, IEventHandlers eventHandlers, IAggregateRoots aggregateRoots) { _eventTypes = eventTypes; + _eventHandlers = eventHandlers; _aggregateRoots = aggregateRoots; + _totalCommitsReceived = metricFactory.CreateCounter( "dolittle_system_runtime_events_store_commits_received_total", "EventStore total number of non-aggregate commits received"); - + _totalCommitsForAggregateReceived = metricFactory.CreateCounter( "dolittle_system_runtime_events_store_commits_for_aggregate_received_total", "EventStore total number of commits for aggregate received"); - + _totalAggregateRootVersionCacheInconsistencies = metricFactory.CreateCounter( "dolittle_system_runtime_events_store_aggregate_root_version_cache_inconsistencies_total", "EventStore total number of aggregate root version cache inconsistencies occurred"); - + _totalBatchesSuccessfullyPersisted = metricFactory.CreateCounter( "dolittle_system_runtime_events_store_batches_successfully_persisted_total", "EventStore total number of batches that has been successfully persisted"); - + _totalBatchedEventsSuccessfullyPersisted = metricFactory.CreateCounter( "dolittle_system_runtime_events_store_batched_events_successfully_persisted_total", "EventStore total number of batched events that has been successfully persisted"); - + _totalBatchesSent = metricFactory.CreateCounter( "dolittle_system_runtime_events_store_batches_sent_total", "EventStore total number of batches that has been sent to the event store"); - + _totalAggregateRootVersionCacheInconsistenciesResolved = metricFactory.CreateCounter( "dolittle_system_runtime_events_store_aggregate_root_version_cache_inconsistencies_resolved_total", "EventStore total number of aggregate root version cache inconsistencies that has been resolved"); @@ -72,18 +106,18 @@ public MetricsCollector(IMetricFactory metricFactory, IEventTypes eventTypes, IA _totalCommittedEvents = metricFactory.CreateCounter( "dolittle_customer_runtime_events_store_committed_events_total", "EventStore total number of committed events by type", - new[] {"tenantId", "eventTypeId", "eventTypeAlias"}); - + new[] { "tenantId", "eventTypeId", "eventTypeAlias" }); + _totalCommittedAggregateEvents = metricFactory.CreateCounter( "dolittle_customer_runtime_events_store_committed_aggregate_events_total", "EventStore total number of committed events by type", - new[] {"tenantId", "eventTypeId", "eventTypeAlias", "aggregateRootId", "aggregateRootAlias"}); - + new[] { "tenantId", "eventTypeId", "eventTypeAlias", "aggregateRootId", "aggregateRootAlias" }); + _totalAggregateConcurrencyConflicts = metricFactory.CreateCounter( "dolittle_customer_runtime_events_store_aggregate_concurrency_conflicts_total", "EventStore total number of aggregate concurrency conflicts by aggregate root", - new[] {"tenantId", "aggregateRootId", "aggregateRootAlias"}); - + new[] { "tenantId", "aggregateRootId", "aggregateRootAlias" }); + _streamedSubscriptionEventsTotal = metricFactory.CreateCounter( "dolittle_customer_runtime_events_store_streamed_events_total", "Total number of directly streamed events", @@ -93,79 +127,207 @@ public MetricsCollector(IMetricFactory metricFactory, IEventTypes eventTypes, IA "dolittle_customer_runtime_events_store_catchup_events_total", "Total number of catchup-events (events that are not streamed directly, but read from DB)", new[] { "subscriptionName" }); + + + // OpenTelemetry + _totalCommitsReceivedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_events_store_commits_received_total", + "commits", + "EventStore total number of non-aggregate commits received"); + + _totalCommitsForAggregateReceivedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_events_store_commits_for_aggregate_received_total", + "commits", + "EventStore total number of commits for aggregate received"); + + _totalAggregateRootVersionCacheInconsistenciesOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_events_store_aggregate_root_version_cache_inconsistencies_total", + "errors", + "EventStore total number of aggregate root version cache inconsistencies occurred"); + + _totalBatchesSuccessfullyPersistedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_events_store_batches_successfully_persisted_total", + "batches", + "EventStore total number of batches that has been successfully persisted"); + + _totalBatchedEventsSuccessfullyPersistedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_events_store_batched_events_successfully_persisted_total", + "events", + "EventStore total number of batched events that has been successfully persisted"); + + _totalBatchesSentOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_events_store_batches_sent_total", + "batches", + "EventStore total number of batches that has been sent to the event store"); + + _totalAggregateRootVersionCacheInconsistenciesResolvedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_events_store_aggregate_root_version_cache_inconsistencies_resolved_total", + "errors", + "EventStore total number of aggregate root version cache inconsistencies that has been resolved"); + + _totalBatchesFailedPersistingOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_events_store_batches_failed_persisting_total", + "errors", + "EventStore total number of batches that failed to be persisted"); + + _totalCommittedEventsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_customer_runtime_events_store_committed_events_total", + "events", + "EventStore total number of committed events by type"); + + + _totalCommittedAggregateEventsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_customer_runtime_events_store_committed_aggregate_events_total", + "events", + "EventStore total number of committed events by type"); + + _totalAggregateConcurrencyConflictsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_customer_runtime_events_store_aggregate_concurrency_conflicts_total", + "errors", + "EventStore total number of aggregate concurrency conflicts by aggregate root"); + + _streamedSubscriptionEventsTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_customer_runtime_events_store_streamed_events_total", + "events", + "Total number of directly streamed events"); + + _catchupSubscriptionEventsTotalOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_customer_runtime_events_store_catchup_events_total", + "events", + "Total number of catchup-events (events that are not streamed directly, but read from DB)"); } /// public void IncrementTotalCommitsReceived() - => _totalCommitsReceived.Inc(); + { + _totalCommitsReceived.Inc(); + _totalCommitsReceivedOtel.Add(1); + } /// public void IncrementTotalCommitsForAggregateReceived() - => _totalCommitsForAggregateReceived.Inc(); + { + _totalCommitsForAggregateReceived.Inc(); + _totalCommitsForAggregateReceivedOtel.Add(1); + } /// public void IncrementTotalAggregateRootVersionCacheInconsistencies() - => _totalAggregateRootVersionCacheInconsistencies.Inc(); + { + _totalAggregateRootVersionCacheInconsistencies.Inc(); + _totalAggregateRootVersionCacheInconsistenciesOtel.Add(1); + } /// - public void IncrementTotalBatchesSuccessfullyPersisted(Commit commit) + public void IncrementTotalBatchesSuccessfullyPersisted(TenantId tenant, Commit commit) { _totalBatchesSuccessfullyPersisted.Inc(); + _totalBatchesSuccessfullyPersistedOtel.Add(1); + var tenantId = tenant.ToString(); + var tenantLabel = new KeyValuePair("tenantId", tenantId); + foreach (var committedEvents in commit.Events) { foreach (var committedEvent in committedEvents) { + var eventTypeAliasOrEmptyString = _eventTypes.GetEventTypeAliasOrEmptyString(committedEvent.Type); + var typeId = committedEvent.Type.Id.ToString(); + var labels = new[] + { + tenantLabel, + new KeyValuePair("eventTypeId", typeId), + new KeyValuePair("eventTypeAlias", eventTypeAliasOrEmptyString) + }; _totalCommittedEvents .WithLabels( - committedEvent.ExecutionContext.Tenant.ToString(), - committedEvent.Type.Id.ToString(), - _eventTypes.GetEventTypeAliasOrEmptyString(committedEvent.Type) + tenantId, + typeId, + eventTypeAliasOrEmptyString ).Inc(); + _totalCommittedEventsOtel.Add(1, labels); _totalBatchedEventsSuccessfullyPersisted.Inc(); + _totalBatchedEventsSuccessfullyPersistedOtel.Add(1); } } + foreach (var commitAggregateEvents in commit.AggregateEvents) { foreach (var committedEvent in commitAggregateEvents) { + var eventTypeId = committedEvent.Type.Id.ToString(); + var eventTypeIdLabel = new KeyValuePair("eventTypeId", eventTypeId); + var eventTypeAlias = _eventTypes.GetEventTypeAliasOrEmptyString(committedEvent.Type); + var eventTypeAliasLabel = new KeyValuePair("eventTypeAlias", eventTypeAlias); + + _totalCommittedEvents .WithLabels( - committedEvent.ExecutionContext.Tenant.ToString(), - committedEvent.Type.Id.ToString(), - _eventTypes.GetEventTypeAliasOrEmptyString(committedEvent.Type) + tenantId, + eventTypeId, + eventTypeAlias ).Inc(); + _totalCommittedEventsOtel.Add(1, tenantLabel, eventTypeIdLabel, eventTypeAliasLabel); + + var aggregateRootId = committedEvent.AggregateRoot.Id.ToString(); + var aggregateRootIdLabel = new KeyValuePair("aggregateRootId", aggregateRootId); + var aggregateRootAlias = GetAggregateRootAliasOrEmptyString(committedEvent.AggregateRoot.Id); + var aggregateRootAliasLabel = + new KeyValuePair("aggregateRootAlias", aggregateRootAlias); _totalCommittedAggregateEvents .WithLabels( - committedEvent.ExecutionContext.Tenant.ToString(), - committedEvent.Type.Id.ToString(), - _eventTypes.GetEventTypeAliasOrEmptyString(committedEvent.Type), - committedEvent.AggregateRoot.Id.ToString(), - GetAggregateRootAliasOrEmptyString(committedEvent.AggregateRoot.Id) + tenantId, + eventTypeId, + eventTypeAlias, + aggregateRootId, + aggregateRootAlias ).Inc(); + _totalCommittedAggregateEventsOtel.Add(1, + tenantLabel, eventTypeIdLabel, eventTypeAliasLabel, + aggregateRootIdLabel, aggregateRootAliasLabel); _totalBatchedEventsSuccessfullyPersisted.Inc(); + _totalBatchedEventsSuccessfullyPersistedOtel.Add(1); } } } /// public void IncrementTotalBatchesSent() - => _totalBatchesSent.Inc(); + { + _totalBatchesSent.Inc(); + _totalBatchesSentOtel.Add(1); + } /// public void IncrementTotalAggregateRootVersionCacheInconsistenciesResolved() - => _totalAggregateRootVersionCacheInconsistenciesResolved.Inc(); + { + _totalAggregateRootVersionCacheInconsistenciesResolved.Inc(); + _totalAggregateRootVersionCacheInconsistenciesResolvedOtel.Add(1); + } /// public void IncrementTotalBatchesFailedPersisting() - => _totalBatchesFailedPersisting.Inc(); + { + _totalBatchesFailedPersisting.Inc(); + _totalBatchesFailedPersistingOtel.Add(1); + } public void IncrementTotalAggregateRootConcurrencyConflicts(TenantId tenant, ArtifactId aggregateRoot) - => _totalAggregateConcurrencyConflicts + { + var tenantId = tenant.ToString(); + var aggregateId = aggregateRoot.ToString(); + var aggregateAlias = GetAggregateRootAliasOrEmptyString(aggregateRoot); + _totalAggregateConcurrencyConflicts .WithLabels( - tenant.ToString(), - aggregateRoot.ToString(), - GetAggregateRootAliasOrEmptyString(aggregateRoot) + tenantId, + aggregateId, + aggregateAlias ).Inc(); + _totalAggregateConcurrencyConflictsOtel.Add(1, + new KeyValuePair("tenantId", tenantId), + new KeyValuePair("aggregateRootId", aggregateId), + new KeyValuePair("aggregateRootAlias", aggregateAlias) + ); + } string GetAggregateRootAliasOrEmptyString(ArtifactId aggregateRoot) { @@ -178,11 +340,104 @@ string GetAggregateRootAliasOrEmptyString(ArtifactId aggregateRoot) ? string.Empty : aggregateRootInfo.Alias.Value; } - - + + public void IncrementStreamedSubscriptionEvents(string subscriptionName, int incBy) - => _streamedSubscriptionEventsTotal.WithLabels(subscriptionName).Inc(incBy); + { + _streamedSubscriptionEventsTotal.WithLabels(subscriptionName).Inc(incBy); + _streamedSubscriptionEventsTotalOtel.Add(incBy, + new KeyValuePair("subscriptionName", subscriptionName)); + } public void IncrementCatchupSubscriptionEvents(string subscriptionName, int incBy) - => _catchupSubscriptionEventsTotal.WithLabels(subscriptionName).Inc(incBy); + { + _catchupSubscriptionEventsTotal.WithLabels(subscriptionName).Inc(incBy); + _catchupSubscriptionEventsTotalOtel.Add(incBy, + new KeyValuePair("subscriptionName", subscriptionName)); + } + + public void RegisterStreamProcessorOffset(TenantId tenant, StreamProcessorId streamProcessorId, Func getNextProcessingPosition) + { + var key = (tenant, streamProcessorId); + if (_streamProcessorEventLogOffsets.Contains(key)) return; + + var processorId = streamProcessorId.EventProcessorId.ToGuid().ToString(); + var scopeId = streamProcessorId.ScopeId.ToGuid().ToString(); + var eventHandlerInfo = _eventHandlers.All.FirstOrDefault(it => it.Id.EventHandler.Value.ToString().Equals(processorId) && it.Id.Scope.Value.ToString().Equals(scopeId)); + var alias = eventHandlerInfo?.HasAlias == true ? eventHandlerInfo.Alias.Value : string.Empty; + + + var labels = new[] + { + new KeyValuePair("tenantId", tenant.ToString()), + new KeyValuePair("scopeId", scopeId), + new KeyValuePair("eventProcessorId", processorId), + new KeyValuePair("alias", alias), + }; + + RuntimeMetrics.Meter.CreateObservableCounter( + $"dolittle_customer_runtime_stream_processors_offset", + GetEventLogPositionMeasurement, + "offset", + "The current offset of the stream processor", + labels); + + RuntimeMetrics.Meter.CreateObservableCounter( + $"dolittle_customer_runtime_stream_processors_processed_total", + GetProcessedEventsMeasurement, + "count", + "The total number of events processed by the stream processor", + labels); + + if(_eventLogOffsets.TryGetValue(tenant, out var getEventLogPosition)) + { + RuntimeMetrics.Meter.CreateObservableCounter( + $"dolittle_customer_runtime_stream_processor_consumer_lag", + () => + { + var processorOffset = (long)getNextProcessingPosition().EventLogPosition.Value-1; + var eventLogPosition = getEventLogPosition(); + var lag = eventLogPosition - processorOffset; + if(lag < 0) lag = 0; // Negative lag is not possible + + return new Measurement(lag, labels); + }, + "offset", + "The current offset of the event log", + labels); + } + + _streamProcessorEventLogOffsets.Add(key); + + // The reported value is the next event log position to be processed, so we subtract 1 to get the current position + Measurement GetEventLogPositionMeasurement() => new((long)getNextProcessingPosition().EventLogPosition.Value-1, labels); + Measurement GetProcessedEventsMeasurement() => new((long)getNextProcessingPosition().StreamPosition.Value, labels); + } + + public void RegisterEventLogOffset(TenantId tenant, ScopeId scopeId, Func getEventLogPosition) + { + var key = tenant; + if (!_eventLogOffsets.ContainsKey(key)) + { + var getValue = () => (long)getEventLogPosition().Value; + + var labels = new[] + { + new KeyValuePair("tenantId", tenant.ToString()), + new KeyValuePair("scopeId", scopeId.ToString()), + }; + Measurement GetMeasurement() + { + return new Measurement(getValue(), labels); + } + + RuntimeMetrics.Meter.CreateObservableCounter( + $"dolittle_system_runtime_events_store_offset", + GetMeasurement, + "offset", + "The current offset of the event log", + labels); + _eventLogOffsets.TryAdd(key, getValue); + } + } } diff --git a/Source/Events/Store/Actors/StreamProcessorStateManager.cs b/Source/Events/Store/Actors/StreamProcessorStateManager.cs index b50f7d24a..78c870b59 100644 --- a/Source/Events/Store/Actors/StreamProcessorStateManager.cs +++ b/Source/Events/Store/Actors/StreamProcessorStateManager.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using Dolittle.Runtime.Actors; using Dolittle.Runtime.Actors.Hosting; +using Dolittle.Runtime.Domain.Tenancy; using Dolittle.Runtime.Events.Store.Streams; using Dolittle.Runtime.Protobuf; using Microsoft.Extensions.Logging; @@ -14,16 +15,21 @@ namespace Dolittle.Runtime.Events.Store.Actors; -[TenantGrain(typeof(StreamProcessorStateActor), typeof(StreamProcessorStateClient),"dolittle.runtime.events.actors.StreamProcessorState")] +[TenantGrain(typeof(StreamProcessorStateActor), typeof(StreamProcessorStateClient), + "dolittle.runtime.events.actors.StreamProcessorState")] public class StreamProcessorStateManager : StreamProcessorStateBase { readonly IStreamProcessorStateRepository _repository; + readonly TenantId _tenantId; + readonly IMetricsCollector _metricsCollector; readonly ILogger _logger; readonly IApplicationLifecycleHooks _lifecycleHooks; readonly Dictionary> _processorStates = new(); readonly Dictionary> _changedSubscriptionStates = new(); + readonly HashSet _metricsRegisteredProcessorIds = new(); + readonly Dictionary _loadingScopes = new(); readonly HashSet _activeRequests = new(); @@ -32,10 +38,17 @@ public class StreamProcessorStateManager : StreamProcessorStateBase bool _shuttingDown; - public StreamProcessorStateManager(IContext context, IStreamProcessorStateRepository repository, ILogger logger, + public StreamProcessorStateManager( + IContext context, + IStreamProcessorStateRepository repository, + TenantId tenantId, + IMetricsCollector metricsCollector, + ILogger logger, IApplicationLifecycleHooks lifecycleHooks) : base(context) { _repository = repository; + _tenantId = tenantId; + _metricsCollector = metricsCollector; _logger = logger; _lifecycleHooks = lifecycleHooks; } @@ -54,6 +67,7 @@ public override async Task OnStarted() defaultStates.Add(streamProcessorKey.StreamProcessorId, state.ToProtobuf()); } + _processorStates.Add(ScopeId.Default, defaultStates); _logger.LogInformation("Retrieved the state of {Count} stream processors", defaultStates.Count); @@ -70,7 +84,8 @@ public override async Task OnStarted() } - public override Task GetByProcessorId(StreamProcessorId processorId, Action respond, Action onError) + public override Task GetByProcessorId(StreamProcessorId processorId, Action respond, + Action onError) { if (_shuttingDown) { @@ -114,7 +129,7 @@ bool TrySendResponse(ScopeId scopeId, StreamProcessorId processorId, Action + { + if (_processorStates.TryGetValue(scope, out var processorStates) && + processorStates.TryGetValue(id, out var state)) + { + return state.Position(); + } + + return ProcessingPosition.Initial; + }); + } + } + void PersistCurrentProcessorState(ScopeId scopeId) { if (_activeRequests.Contains(scopeId)) @@ -251,31 +284,33 @@ void PersistCurrentProcessorState(ScopeId scopeId) Persist(changes, scopeId); } - void Persist(IReadOnlyDictionary changes, ScopeId scopeId) + void Persist(IReadOnlyDictionary changes, + ScopeId scopeId) { _logger.LogTrace("Persisting {Count} changes for scope {ScopeId}", changes.Count, scopeId); - Context.ReenterAfter(_repository.PersistForScope(scopeId, changes, Context.CancellationToken), _ => + Context.ReenterAfter(_repository.PersistForScope(scopeId, changes, Context.CancellationToken), it => { - if (_.IsCompletedSuccessfully) + if (it.IsCompletedSuccessfully) { _activeRequests.Remove(scopeId); PersistCurrentProcessorState(scopeId); if (_shuttingDown && _activeRequests.Count == 0) { - _shutdownHook.MarkCompleted(); + _shutdownHook?.MarkCompleted(); } } else { - _logger.FailedToPersistStreamSubscriptionState(_.Exception!, scopeId); + _logger.FailedToPersistStreamSubscriptionState(it.Exception!, scopeId); // Try again Persist(changes, scopeId); } }); } - IReadOnlyDictionary FromProtobuf( - IDictionary changes) + IReadOnlyDictionary + FromProtobuf( + IDictionary changes) { var dict = new Dictionary(); foreach (var change in changes) @@ -290,7 +325,8 @@ void Persist(IReadOnlyDictionary GetByProcessorId(StreamProcessorId processorId) => throw new NotImplementedException("unused"); + public override Task GetByProcessorId(StreamProcessorId processorId) => + throw new NotImplementedException("unused"); #endregion } diff --git a/Source/Events/Store/Streams/IStreamEventSubscriber.cs b/Source/Events/Store/Streams/IStreamEventSubscriber.cs index 43c8bc328..6a2194230 100644 --- a/Source/Events/Store/Streams/IStreamEventSubscriber.cs +++ b/Source/Events/Store/Streams/IStreamEventSubscriber.cs @@ -3,18 +3,47 @@ using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Channels; using Dolittle.Runtime.Artifacts; namespace Dolittle.Runtime.Events.Store.Streams; -public interface IStreamEventSubscriber +/// +/// The message that is sent from when a new event is available. +/// If the event is not of a subscribed type, it will send the next upcoming instead. +/// +public readonly struct StreamSubscriptionMessage { - // public ChannelReader SubscribePublic(ProcessingPosition position, CancellationToken cancellationToken); + public StreamEvent? StreamEvent { get; } + + public EventLogSequenceNumber? NextEventLogSequenceNumber { get; } + + public StreamSubscriptionMessage(StreamEvent @event) + { + StreamEvent = @event; + IsEvent = true; + } + public StreamSubscriptionMessage(EventLogSequenceNumber nextEventLogSequenceNumber) + { + NextEventLogSequenceNumber = nextEventLogSequenceNumber; + IsEvent = false; + } + + [MemberNotNullWhen(true, nameof(StreamEvent))] + [MemberNotNullWhen(false, nameof(NextEventLogSequenceNumber))] + public bool IsEvent { get; } +} + + +public interface IStreamEventSubscriber +{ /// /// Subscribe to a stream of events for a specific scope and a set of event types. + /// If there are new events where none of the types are subscribed to, + /// it will send the next upcoming EventLogSequenceNumber instead. /// /// The source scope /// The set of subscribed events @@ -23,8 +52,8 @@ public interface IStreamEventSubscriber /// Identifier for the given subscription, used for debugging only /// Stops the subscription if the predicate returns true /// - /// - ChannelReader Subscribe(ScopeId scopeId, + /// Either StreamEvent or the next EventLogSequenceNumber + ChannelReader Subscribe(ScopeId scopeId, IReadOnlyCollection artifactIds, ProcessingPosition from, bool partitioned, diff --git a/Source/Events/Store/Streams/IStreamProcessorState.cs b/Source/Events/Store/Streams/IStreamProcessorState.cs index 30054959d..f5f9a9408 100644 --- a/Source/Events/Store/Streams/IStreamProcessorState.cs +++ b/Source/Events/Store/Streams/IStreamProcessorState.cs @@ -39,6 +39,7 @@ public interface IStreamProcessorState public IStreamProcessorState WithFailure(IProcessingResult failedProcessing, StreamEvent processedEvent, DateTimeOffset retryAt, DateTimeOffset timestamp); public IStreamProcessorState WithSuccessfullyProcessed(StreamEvent processedEvent, DateTimeOffset timestamp); public IStreamProcessorState SkipEventsBefore(EventLogSequenceNumber eventLogSequence); + public IStreamProcessorState WithNextEventLogSequence(EventLogSequenceNumber nextEventLogSequenceNumber); } /// @@ -54,7 +55,7 @@ IStreamProcessorState IStreamProcessorState.WithFailure(IProcessingResult failed IStreamProcessorState IStreamProcessorState.WithSuccessfullyProcessed(StreamEvent processedEvent, DateTimeOffset timestamp) => WithSuccessfullyProcessed(processedEvent, timestamp); - + IStreamProcessorState IStreamProcessorState.SkipEventsBefore(EventLogSequenceNumber eventLogSequence) => SkipEventsBefore(eventLogSequence); public new T WithResult(IProcessingResult result, StreamEvent processedEvent, DateTimeOffset timestamp); @@ -68,4 +69,13 @@ IStreamProcessorState IStreamProcessorState.WithSuccessfullyProcessed(StreamEven /// /// new T SkipEventsBefore(EventLogSequenceNumber position); + + /// + /// Update the next event log position to start from. + /// Will not clear partitions. + /// + /// + /// + new T WithNextEventLogSequence(EventLogSequenceNumber nextEventLogSequenceNumber); + } diff --git a/Source/Events/Store/Streams/StreamEvent.cs b/Source/Events/Store/Streams/StreamEvent.cs index 1146b8ac5..3722b5325 100644 --- a/Source/Events/Store/Streams/StreamEvent.cs +++ b/Source/Events/Store/Streams/StreamEvent.cs @@ -6,9 +6,13 @@ namespace Dolittle.Runtime.Events.Store.Streams; /// /// Represents a that is a part of a stream. /// -public record StreamEvent(CommittedEvent Event, StreamPosition Position, StreamId Stream, PartitionId Partition, bool Partitioned) +public record StreamEvent(CommittedEvent Event, StreamPosition Position, StreamId Stream, PartitionId Partition, bool Partitioned, EventLogSequenceNumber NextSequenceInStream) { - public ProcessingPosition NextProcessingPosition => new(Position.Increment(), Event.EventLogSequenceNumber.Increment()); - public ProcessingPosition CurrentProcessingPosition => new(Position, Event.EventLogSequenceNumber); + public StreamEvent(CommittedEvent @event, StreamPosition position, StreamId stream, PartitionId partition, bool partitioned) + : this(@event, position, stream, partition, partitioned, @event.EventLogSequenceNumber.Increment()) + { + } + public ProcessingPosition NextProcessingPosition => new(Position.Increment(), NextSequenceInStream); + public ProcessingPosition CurrentProcessingPosition { get; } = new(Position, Event.EventLogSequenceNumber); } diff --git a/Source/Events/Store/Streams/StreamEventSubscriber.cs b/Source/Events/Store/Streams/StreamEventSubscriber.cs index b6fd914c1..39fc4c9a8 100644 --- a/Source/Events/Store/Streams/StreamEventSubscriber.cs +++ b/Source/Events/Store/Streams/StreamEventSubscriber.cs @@ -22,7 +22,7 @@ public class StreamEventSubscriber : IStreamEventSubscriber public StreamEventSubscriber(IEventLogStream eventLogStream) => _eventLogStream = eventLogStream; - public ChannelReader Subscribe(ScopeId scopeId, + public ChannelReader Subscribe(ScopeId scopeId, IReadOnlyCollection artifactIds, ProcessingPosition from, bool partitioned, @@ -32,7 +32,7 @@ public ChannelReader Subscribe(ScopeId scopeId, { var eventTypes = artifactIds.Select(_ => _.Value.ToProtobuf()).ToHashSet(); - var channel = Channel.CreateBounded(ChannelCapacity); + var channel = Channel.CreateBounded(ChannelCapacity); var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); ToStreamEvents( @@ -46,36 +46,49 @@ public ChannelReader Subscribe(ScopeId scopeId, return channel.Reader; } - static void ToStreamEvents(ChannelReader reader, ChannelWriter writer, ProcessingPosition startingPosition, + static void ToStreamEvents(ChannelReader reader, ChannelWriter writer, ProcessingPosition startingPosition, Func include, bool partitioned, Predicate? until, CancellationTokenSource linkedTokenSource) => _ = Task.Run(async () => { - var current = startingPosition.StreamPosition; + var currentStreamPosition = startingPosition.StreamPosition; + + // This method owns the linkedTokenSource and will dispose it when the reader is done, even if it got it as a parameter. using var cts = linkedTokenSource; try { - while (!linkedTokenSource.Token.IsCancellationRequested) + while (!cts.Token.IsCancellationRequested) { var eventLogBatch = await reader.ReadAsync(linkedTokenSource.Token); - foreach (var evt in eventLogBatch.MatchedEvents) + var includedEvents = eventLogBatch.MatchedEvents.Where(include).ToList(); + if (includedEvents.Count == 0) { + // No events included, send the next sequence number and continue + await writer.WriteAsync(new StreamSubscriptionMessage(eventLogBatch.To.Increment()),cts.Token); + continue; + } + for (var index = 0; index < includedEvents.Count; index++) + { + var evt = includedEvents[index]; if (until != null && !cts.IsCancellationRequested && until(evt)) { cts.Cancel(); return; } + var nextEvent = includedEvents.ElementAtOrDefault(index + 1); - if (include(evt)) - { - var streamEvent = new StreamEvent(evt.FromProtobuf(), current, StreamId.EventLog, evt.EventSourceId, partitioned); + // Skip any event that are not included in the artifact set. + // If the next event is null, we are at the end of the batch, so we use the next sequence number from the batch. + EventLogSequenceNumber nextEventLogSequenceNumber = nextEvent?.EventLogSequenceNumber ?? eventLogBatch.To.Increment(); + + var streamEvent = new StreamEvent(evt.FromProtobuf(), currentStreamPosition, StreamId.EventLog, + evt.EventSourceId, partitioned, nextEventLogSequenceNumber); - // ReSharper disable once MethodSupportsCancellation - await writer.WriteAsync(streamEvent); - current = current.Increment(); - } + // ReSharper disable once MethodSupportsCancellation + await writer.WriteAsync(new StreamSubscriptionMessage(streamEvent)); + currentStreamPosition = currentStreamPosition.Increment(); } } diff --git a/Source/Metrics/Configuration/MetricsServerConfiguration.cs b/Source/Metrics/Configuration/MetricsServerConfiguration.cs index 82fec250f..353e0ebf1 100644 --- a/Source/Metrics/Configuration/MetricsServerConfiguration.cs +++ b/Source/Metrics/Configuration/MetricsServerConfiguration.cs @@ -14,10 +14,10 @@ public record MetricsServerConfiguration /// /// Gets a value indicating whether or not the endpoint should be enabled. /// - public bool Enabled { get; init; } = true; // TODO: It would be cool if this made it so that the scoped server host didn't start + public bool Enabled { get; set; } = true; // TODO: It would be cool if this made it so that the scoped server host didn't start /// /// Gets the port to serve the endpoint on. /// - public int Port { get; init; } = 9700; + public int Port { get; set; } = 9700; } diff --git a/Source/Metrics/Metrics.csproj b/Source/Metrics/Metrics.csproj index b92725fba..2400beefd 100644 --- a/Source/Metrics/Metrics.csproj +++ b/Source/Metrics/Metrics.csproj @@ -9,6 +9,7 @@ + diff --git a/Source/Rudimentary/Pipelines/ICanGetNextReadyBatch.cs b/Source/Rudimentary/Pipelines/ICanGetNextReadyBatch.cs index 6ab81e699..7f91bc984 100644 --- a/Source/Rudimentary/Pipelines/ICanGetNextReadyBatch.cs +++ b/Source/Rudimentary/Pipelines/ICanGetNextReadyBatch.cs @@ -17,7 +17,7 @@ public interface ICanGetNextReadyBatch /// /// The next . /// True if there was a ready batch, false if not. - bool TryGetNextBatch([NotNullWhen(true)] out ReadyBatch readyBatch); + bool TryGetNextBatch([NotNullWhen(true)] out ReadyBatch? readyBatch); /// /// Fails all ready batches. diff --git a/Source/Rudimentary/Pipelines/PipelineReadyBatchAggregator.cs b/Source/Rudimentary/Pipelines/PipelineReadyBatchAggregator.cs index 73fbf9767..dc9a360eb 100644 --- a/Source/Rudimentary/Pipelines/PipelineReadyBatchAggregator.cs +++ b/Source/Rudimentary/Pipelines/PipelineReadyBatchAggregator.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Diagnostics.CodeAnalysis; using System.Threading.Channels; using System.Threading.Tasks; @@ -54,7 +55,7 @@ public bool TryAddToBatch(Func } /// - public bool TryGetNextBatch(out ReadyBatch readyBatch) + public bool TryGetNextBatch([NotNullWhen(true)] out ReadyBatch? readyBatch) { if (_preparedBatches.Reader.TryRead(out readyBatch)) { diff --git a/Source/Rudimentary/TryExtensions.cs b/Source/Rudimentary/TryExtensions.cs index d1153184c..58dc1a340 100644 --- a/Source/Rudimentary/TryExtensions.cs +++ b/Source/Rudimentary/TryExtensions.cs @@ -54,14 +54,14 @@ public static Try Then(this Try resu { try { - return Try.Succeeded(callback(_.Result)); + return Try.Succeeded(callback(_.Result!)); } catch (Exception exception) { return Try.Failed(exception); } }, - _ => Try.Failed(_.Exception)); + _ => Try.Failed(_.Exception!)); /// /// Adds a callback to a that will be called if the operation is successful. diff --git a/Source/Services.Clients/MetricsCollector.cs b/Source/Services.Clients/MetricsCollector.cs index 8f92faa82..0b4eb79e7 100644 --- a/Source/Services.Clients/MetricsCollector.cs +++ b/Source/Services.Clients/MetricsCollector.cs @@ -2,7 +2,9 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Diagnostics.Metrics; using Dolittle.Runtime.DependencyInversion.Lifecycle; +using Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics; using Dolittle.Runtime.Metrics; using Prometheus; @@ -33,6 +35,28 @@ public class MetricsCollector : IMetricsCollector readonly Counter _totalFailedRequestCallbacks; readonly Counter _totalFailedResponseWrites; readonly Counter _totalRequestHandlingTime; + + + readonly Counter _totalStartedConnectionsOtel; + readonly UpDownCounter _pendingWritesOtel; + readonly Counter _totalWriteWaitTimeOtel; + readonly Counter _totalWriteTimeOtel; + readonly Counter _totalWritesOtel; + readonly Counter _totalWriteBytesOtel; + readonly Counter _totalWaitForConnectResponseTimeOtel; + readonly Counter _totalCancelledConnectionsOtel; + readonly Counter _totalReceivedMessagesOtel; + readonly Counter _totalReceivedBytesOtel; + readonly Counter _totalPingsReceivedOtel; + readonly Counter _totalPongsSentOtel; + readonly Counter _totalReceivedRequestsOtel; + readonly Counter _totalEmptyMessagesReceivedOtel; + readonly Counter _totalPingTimeoutsOtel; + readonly Counter _totalFailedRequestHandlersOtel; + readonly Counter _totalFailedRequestCallbacksOtel; + readonly Counter _totalFailedResponseWritesOtel; + readonly Counter _totalRequestHandlingTimeOtel; + public MetricsCollector(IMetricFactory metricFactory) { @@ -111,85 +135,241 @@ public MetricsCollector(IMetricFactory metricFactory) _totalRequestHandlingTime = metricFactory.CreateCounter( "dolittle_system_runtime_services_clients_reversecalls_request_handling_seconds_total", "ReverseCall total time spent handling requests"); + + // OpenTelemetry + _totalStartedConnectionsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_connections_started_total", + "connections", + "ReverseCall total number of connections that have been started"); + + _pendingWritesOtel = RuntimeMetrics.Meter.CreateUpDownCounter( + "dolittle_system_runtime_services_clients_reversecalls_pending_writes", + "count", + "ReverseCall current pending stream writes waiting"); + + _totalWriteWaitTimeOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_stream_write_wait_seconds_total", + "seconds", + "ReverseCall total time spent waiting to write to streams"); + + _totalWriteTimeOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_stream_write_seconds_total", + "seconds", + "ReverseCall total time spent writing to streams"); + + _totalWritesOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_stream_writes_total", + "count", + "ReverseCall total number of writes to streams"); + + _totalWriteBytesOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_stream_write_bytes_total", + "count", + "ReverseCall total number of bytes written to streams"); + + _totalWaitForConnectResponseTimeOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_connect_response_wait_seconds_total", + "seconds", + "ReverseCall total time spent waiting for connect response"); + + _totalCancelledConnectionsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_connections_cancelled_total", + "count", + "ReverseCall total number of connections that have been cancelled"); + + _totalReceivedMessagesOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_messages_received_total", + "count", + "ReverseCall total number of messages that have been received"); + + _totalReceivedBytesOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_stream_read_bytes_total", + "count", + "ReverseCall total number of bytes read from streams"); + + _totalPingsReceivedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_pings_received_total", + "count", + "ReverseCall total number of pings received"); + + _totalPongsSentOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_pongs_sent_total", + "count", + "ReverseCall total number of pongs sent"); + + _totalReceivedRequestsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_requests_received_total", + "count", + "ReverseCall total number of requests that have been received"); + + _totalEmptyMessagesReceivedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_empty_messages_received_total", + "count", + "ReverseCall total number of empty messages that have been received"); + + _totalPingTimeoutsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_keepalive_timeouts_total", + "errors", + "ReverseCall total number of times ping keepalive has timed out"); + + _totalFailedRequestHandlersOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_requests_failed_handlers_total", + "errors", + "ReverseCall total number of failed request handlers"); + + _totalFailedRequestCallbacksOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_requests_failed_callbacks_total", + "errors", + "ReverseCall total number of failed request callbacks"); + + _totalFailedResponseWritesOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_failed_response_writes_total", + "errors", + "ReverseCall total number of failed response writes"); + + _totalRequestHandlingTimeOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_clients_reversecalls_request_handling_seconds_total", + "seconds", + "ReverseCall total time spent handling requests"); } /// public void IncrementTotalStartedConnections() - => _totalStartedConnections.Inc(); + { + _totalStartedConnections.Inc(); + _totalStartedConnectionsOtel.Add(1); + } /// public void IncrementPendingWrites() - => _pendingWrites.Dec(); + { + _pendingWrites.Dec(); + _pendingWritesOtel.Add(-1); + } /// public void DecrementPendingWrites() - => _pendingWrites.Inc(); + { + _pendingWrites.Inc(); + _pendingWritesOtel.Add(1); + } /// public void AddToTotalWriteWaitTime(TimeSpan waitTime) - => _totalWriteWaitTime.Inc(waitTime.TotalSeconds); + { + _totalWriteWaitTime.Inc(waitTime.TotalSeconds); + _totalWriteWaitTimeOtel.Add(waitTime.TotalSeconds); + } /// public void AddToTotalWriteTime(TimeSpan writeTime) - => _totalWriteTime.Inc(writeTime.TotalSeconds); + { + _totalWriteTime.Inc(writeTime.TotalSeconds); + _totalWriteTimeOtel.Add(writeTime.TotalSeconds); + } /// public void IncrementTotalWrites() - => _totalWrites.Inc(); + { + _totalWrites.Inc(); + _totalWritesOtel.Add(1); + } /// public void AddToTotalWriteBytes(int messageSize) - => _totalWriteBytes.Inc(messageSize); + { + _totalWriteBytes.Inc(messageSize); + _totalWriteBytesOtel.Add(messageSize); + } /// public void AddToTotalWaitForConnectResponseTime(TimeSpan waitTime) - => _totalWaitForConnectResponseTime.Inc(waitTime.TotalSeconds); + { + _totalWaitForConnectResponseTime.Inc(waitTime.TotalSeconds); + _totalWaitForConnectResponseTimeOtel.Add(waitTime.TotalSeconds); + } /// public void IncrementTotalCancelledConnections() - => _totalCancelledConnections.Inc(); + { + _totalCancelledConnections.Inc(); + _totalCancelledConnectionsOtel.Add(1); + } /// public void IncrementTotalReceivedMessages() - => _totalReceivedMessages.Inc(); + { + _totalReceivedMessages.Inc(); + _totalReceivedMessagesOtel.Add(1); + } /// public void AddToTotalReceivedBytes(int messageSize) - => _totalReceivedBytes.Inc(messageSize); + { + _totalReceivedBytes.Inc(messageSize); + _totalReceivedBytesOtel.Add(messageSize); + } /// public void IncrementTotalPingsReceived() - => _totalPingsReceived.Inc(); + { + _totalPingsReceived.Inc(); + _totalPingsReceivedOtel.Add(1); + } /// public void IncrementTotalPongsSent() - => _totalPongsSent.Inc(); + { + _totalPongsSent.Inc(); + _totalPongsSentOtel.Add(1); + } /// public void IncrementTotalRequestsReceived() - => _totalReceivedRequests.Inc(); + { + _totalReceivedRequests.Inc(); + _totalReceivedRequestsOtel.Add(1); + } /// public void IncrementTotalEmptyMessagesReceived() - => _totalEmptyMessagesReceived.Inc(); + { + _totalEmptyMessagesReceived.Inc(); + _totalEmptyMessagesReceivedOtel.Add(1); + } /// public void IncrementTotalPingTimeouts() - => _totalPingTimeouts.Inc(); + { + _totalPingTimeouts.Inc(); + _totalPingTimeoutsOtel.Add(1); + } /// public void IncrementTotalFailedRequestHandlers() - => _totalFailedRequestHandlers.Inc(); + { + _totalFailedRequestHandlers.Inc(); + _totalFailedRequestHandlersOtel.Add(1); + } /// public void IncrementTotalFailedRequestCallbacks() - => _totalFailedRequestCallbacks.Inc(); + { + _totalFailedRequestCallbacks.Inc(); + _totalFailedRequestCallbacksOtel.Add(1); + } /// public void IncrementTotalFailedResponseWrites() - => _totalFailedResponseWrites.Inc(); + { + _totalFailedResponseWrites.Inc(); + _totalFailedResponseWritesOtel.Add(1); + } /// public void AddToTotalRequestHandlingTime(TimeSpan handleTime) - => _totalRequestHandlingTime.Inc(handleTime.TotalSeconds); + { + _totalRequestHandlingTime.Inc(handleTime.TotalSeconds); + _totalRequestHandlingTimeOtel.Add(handleTime.TotalSeconds); + } } diff --git a/Source/Services/Callbacks/MetricsCollector.cs b/Source/Services/Callbacks/MetricsCollector.cs index 45d14b0c6..6f1acc5e7 100644 --- a/Source/Services/Callbacks/MetricsCollector.cs +++ b/Source/Services/Callbacks/MetricsCollector.cs @@ -2,7 +2,9 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Diagnostics.Metrics; using Dolittle.Runtime.DependencyInversion.Lifecycle; +using Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics; using Dolittle.Runtime.Metrics; using Prometheus; @@ -22,12 +24,22 @@ public class MetricsCollector : IMetricsCollector readonly Counter _totalSchedulesMissed; readonly Counter _totalSchedulesMissedTime; readonly Counter _totalCallbackLoopsFailed; + + readonly Counter _totalCallbacksRegisteredOtel; + readonly Counter _totalCallbacksCalledOtel; + readonly Counter _totalCallbackTimeOtel; + readonly Counter _totalCallbacksFailedOtel; + readonly Counter _totalCallbacksUnregisteredOtel; + readonly Counter _totalSchedulesMissedOtel; + readonly Counter _totalSchedulesMissedTimeOtel; + readonly Counter _totalCallbackLoopsFailedOtel; public MetricsCollector(IMetricFactory metricFactory) { _totalCallbacksRegistered = metricFactory.CreateCounter( "dolittle_system_runtime_services_callbacks_registered_total", "Callbacks total number of registered callbacks"); + _totalCallbacksCalled = metricFactory.CreateCounter( "dolittle_system_runtime_services_callbacks_calls_total", @@ -56,37 +68,103 @@ public MetricsCollector(IMetricFactory metricFactory) _totalCallbackLoopsFailed = metricFactory.CreateCounter( "dolittle_system_runtime_services_callbacks_failed_call_loops_total", "Callbacks total number of failed callback loops"); + + + // OpenTelemetry + _totalCallbacksRegisteredOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_callbacks_registered_total", + "count", + "Callbacks total number of registered callbacks"); + + _totalCallbacksCalledOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_callbacks_calls_total", + "count", + "Callbacks total number of called callbacks"); + + _totalCallbackTimeOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_callbacks_time_seconds_total", + "seconds", + "Callbacks total time spent calling callbacks"); + + _totalCallbacksFailedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_callbacks_failed_calls_total", + "errors", + "Callbacks total number of called callbacks that failed"); + + _totalCallbacksUnregisteredOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_callbacks_unregistered_total", + "count", + "Callbacks total number of unregistered callbacks"); + + _totalSchedulesMissedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_callbacks_schedules_missed_total", + "errors", + "Callbacks total number of missed callback schedules"); + + _totalSchedulesMissedTimeOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_callbacks_schedules_missed_time_seconds_total", + "seconds", + "Callbacks total time delays for missed callback schedules"); + + _totalCallbackLoopsFailedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_callbacks_failed_call_loops_total", + "errors", + "Callbacks total number of failed callback loops"); } /// public void IncrementTotalCallbacksRegistered() - => _totalCallbacksRegistered.Inc(); + { + _totalCallbacksRegistered.Inc(); + _totalCallbacksRegisteredOtel.Add(1); + } /// public void IncrementTotalCallbacksCalled() - => _totalCallbacksCalled.Inc(); + { + _totalCallbacksCalled.Inc(); + _totalCallbacksCalledOtel.Add(1); + } /// public void AddToTotalCallbackTime(TimeSpan elapsed) - => _totalCallbackTime.Inc(elapsed.TotalSeconds); + { + _totalCallbackTime.Inc(elapsed.TotalSeconds); + _totalCallbackTimeOtel.Add(elapsed.TotalSeconds); + } /// public void IncrementTotalCallbacksFailed() - => _totalCallbacksFailed.Inc(); + { + _totalCallbacksFailed.Inc(); + _totalCallbacksFailedOtel.Add(1); + } /// public void IncrementTotalCallbacksUnregistered() - => _totalCallbacksUnregistered.Inc(); + { + _totalCallbacksUnregistered.Inc(); + _totalCallbacksUnregisteredOtel.Add(1); + } /// public void IncrementTotalSchedulesMissed() - => _totalSchedulesMissed.Inc(); + { + _totalSchedulesMissed.Inc(); + _totalSchedulesMissedOtel.Add(1); + } /// public void AddToTotalSchedulesMissedTime(TimeSpan elapsed) - => _totalSchedulesMissedTime.Inc(elapsed.TotalSeconds); + { + _totalSchedulesMissedTime.Inc(elapsed.TotalSeconds); + _totalSchedulesMissedTimeOtel.Add(elapsed.TotalSeconds); + } /// public void IncrementTotalCallbackLoopsFailed() - => _totalCallbackLoopsFailed.Inc(); + { + _totalCallbackLoopsFailed.Inc(); + _totalCallbackLoopsFailedOtel.Add(1); + } } diff --git a/Source/Services/MetricsCollector.cs b/Source/Services/MetricsCollector.cs index 05b7f48a9..6e6cf6b41 100644 --- a/Source/Services/MetricsCollector.cs +++ b/Source/Services/MetricsCollector.cs @@ -2,7 +2,9 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Diagnostics.Metrics; using Dolittle.Runtime.DependencyInversion.Lifecycle; +using Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics; using Dolittle.Runtime.Metrics; using Prometheus; @@ -18,12 +20,16 @@ public class MetricsCollector : IMetricsCollector readonly Counter _totalRequests; readonly Counter _totalFailedRequests; + readonly Counter _totalRequestTimeOtel; + readonly Counter _totalRequestsOtel; + readonly Counter _totalFailedRequestsOtel; + public MetricsCollector(IMetricFactory metricFactory) { _totalRequests = metricFactory.CreateCounter( "dolittle_system_runtime_services_reverse_call_requests_total", "ReverseCallDispatcher total requests"); - + _totalFailedRequests = metricFactory.CreateCounter( "dolittle_system_runtime_services_reverse_call_failed_requests_total", "ReverseCallDispatcher total failed requests"); @@ -31,17 +37,41 @@ public MetricsCollector(IMetricFactory metricFactory) _totalRequestTime = metricFactory.CreateCounter( "dolittle_system_runtime_services_reverse_call_requests_seconds_total", "ReverseCallDispatcher total time spent writing request and waiting for response"); + + _totalRequestsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_reverse_call_requests_total", + "requests", + "ReverseCallDispatcher total requests"); + + _totalFailedRequestsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_reverse_call_failed_requests_total", + "errors", + "ReverseCallDispatcher total failed requests"); + + _totalRequestTimeOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_reverse_call_requests_seconds_total", + "seconds", + "ReverseCallDispatcher total time spent writing request and waiting for response"); } /// public void AddToTotalRequestTime(TimeSpan requestTime) - => _totalRequestTime.Inc(requestTime.TotalSeconds); + { + _totalRequestTime.Inc(requestTime.TotalSeconds); + _totalRequestTimeOtel.Add(requestTime.TotalSeconds); + } /// public void AddRequest() - => _totalRequests.Inc(); + { + _totalRequests.Inc(); + _totalRequestsOtel.Add(1); + } /// public void AddFailedRequest() - => _totalFailedRequests.Inc(); + { + _totalFailedRequests.Inc(); + _totalFailedRequestsOtel.Add(1); + } } diff --git a/Source/Services/ReverseCalls/MetricsCollector.cs b/Source/Services/ReverseCalls/MetricsCollector.cs index 0e3734481..85e72e972 100644 --- a/Source/Services/ReverseCalls/MetricsCollector.cs +++ b/Source/Services/ReverseCalls/MetricsCollector.cs @@ -2,7 +2,9 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Diagnostics.Metrics; using Dolittle.Runtime.DependencyInversion.Lifecycle; +using Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics; using Dolittle.Runtime.Metrics; using Prometheus; @@ -27,6 +29,19 @@ public class MetricsCollector : IMetricsCollector readonly Counter _totalKeepaliveTimeouts; readonly Counter _totalFirstMessageWaitTime; + readonly UpDownCounter _currentPendingStreamWritesOtel; + readonly Counter _totalStreamWritesOtel; + readonly Counter _totalStreamWriteBytesOtel; + readonly Counter _totalStreamWriteWaitTimeOtel; + readonly Counter _totalStreamWriteTimeOtel; + readonly Counter _totalStreamReadsOtel; + readonly Counter _totalStreamReadBytesOtel; + readonly Counter _totalPingsSentOtel; + readonly Counter _totalPongsReceivedOtel; + readonly Counter _totalKeepaliveResetsOtel; + readonly Counter _totalKeepaliveTimeoutsOtel; + readonly Counter _totalFirstMessageWaitTimeOtel; + public MetricsCollector(IMetricFactory metricFactory) { _currentPendingStreamWrites = metricFactory.CreateGauge( @@ -76,57 +91,155 @@ public MetricsCollector(IMetricFactory metricFactory) _totalFirstMessageWaitTime = metricFactory.CreateCounter( "dolittle_system_runtime_services_reversecalls_first_message_wait_seconds_total", "ReverseCall total time spent waiting for first message"); + + _currentPendingStreamWritesOtel = RuntimeMetrics.Meter.CreateUpDownCounter( + "dolittle_system_runtime_services_reversecalls_pending_writes", + "ReverseCall current pending stream writes waiting"); + + _totalStreamWritesOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_reversecalls_stream_writes_total", + "ReverseCall total number of writes to streams"); + + + // Opentelemetry + _totalStreamWriteBytesOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_reversecalls_stream_write_bytes_total", + "bytes", + "ReverseCall total number of bytes written to streams"); + + _totalStreamWriteWaitTimeOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_reversecalls_stream_write_wait_seconds_total", + "seconds", + "ReverseCall total time spent waiting to write to streams"); + + _totalStreamWriteTimeOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_reversecalls_stream_write_seconds_total", + "seconds", + "ReverseCall total time spent writing to streams"); + + _totalStreamReadsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_reversecalls_stream_reads_total", + "count", + "ReverseCall total number of reads from streams"); + + _totalStreamReadBytesOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_reversecalls_stream_read_bytes_total", + "bytes", + "ReverseCall total number of bytes read from streams"); + + _totalPingsSentOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_reversecalls_pings_sent_total", + "ReverseCall total number of pings sent"); + + _totalPongsReceivedOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_reversecalls_pongs_received_total", + "count", + "ReverseCall total number of pongs received"); + + _totalKeepaliveResetsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_reversecalls_keepalive_resets_total", + "count", + "ReverseCall total number of times ping keepalive tokens have been reset"); + + _totalKeepaliveTimeoutsOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_reversecalls_keepalive_timeouts_total", + "count", + "ReverseCall total number of times ping keepalive tokens have timed out"); + + _totalFirstMessageWaitTimeOtel = RuntimeMetrics.Meter.CreateCounter( + "dolittle_system_runtime_services_reversecalls_first_message_wait_seconds_total", + "seconds", + "ReverseCall total time spent waiting for first message"); } /// public void IncrementPendingStreamWrites() - => _currentPendingStreamWrites.Inc(); + { + _currentPendingStreamWrites.Inc(); + _currentPendingStreamWritesOtel.Add(1); + } /// public void DecrementPendingStreamWrites() - => _currentPendingStreamWrites.Dec(); + { + _currentPendingStreamWrites.Dec(); + _currentPendingStreamWritesOtel.Add(-1); + } /// public void IncrementTotalStreamWrites() - => _totalStreamWrites.Inc(); + { + _totalStreamWrites.Inc(); + _totalStreamWritesOtel.Add(1); + } /// public void IncrementTotalStreamWriteBytes(int writtenBytes) - => _totalStreamWriteBytes.Inc(); + { + _totalStreamWriteBytes.Inc(); + _totalStreamWriteBytesOtel.Add(writtenBytes); + } /// public void AddToTotalStreamWriteWaitTime(TimeSpan waitTime) - => _totalStreamWriteWaitTime.Inc(waitTime.TotalSeconds); + { + _totalStreamWriteWaitTime.Inc(waitTime.TotalSeconds); + _totalStreamWriteWaitTimeOtel.Add(waitTime.TotalSeconds); + } /// public void AddToTotalStreamWriteTime(TimeSpan writeTime) - => _totalStreamWriteTime.Inc(writeTime.TotalSeconds); + { + _totalStreamWriteTime.Inc(writeTime.TotalSeconds); + _totalStreamWriteTimeOtel.Add(writeTime.TotalSeconds); + } /// public void IncrementTotalStreamReads() - => _totalStreamReads.Inc(); + { + _totalStreamReads.Inc(); + _totalStreamReadsOtel.Add(1); + } /// public void IncrementTotalStreamReadBytes(int writtenBytes) - => _totalStreamReadBytes.Inc(writtenBytes); + { + _totalStreamReadBytes.Inc(writtenBytes); + _totalStreamReadBytesOtel.Add(writtenBytes); + } /// public void IncrementTotalPingsSent() - => _totalPingsSent.Inc(); + { + _totalPingsSent.Inc(); + _totalPingsSentOtel.Add(1); + } /// public void IncrementTotalPongsReceived() - => _totalPongsReceived.Inc(); + { + _totalPongsReceived.Inc(); + _totalPongsReceivedOtel.Add(1); + } /// public void IncrementTotalKeepaliveTokenResets() - => _totalKeepaliveResets.Inc(); + { + _totalKeepaliveResets.Inc(); + _totalKeepaliveResetsOtel.Add(1); + } /// public void IncrementTotalKeepaliveTimeouts() - => _totalKeepaliveTimeouts.Inc(); + { + _totalKeepaliveTimeouts.Inc(); + _totalKeepaliveTimeoutsOtel.Add(1); + } /// public void AddToTotalWaitForFirstMessageTime(TimeSpan waitTime) - => _totalFirstMessageWaitTime.Inc(waitTime.TotalSeconds); + { + _totalFirstMessageWaitTime.Inc(waitTime.TotalSeconds); + _totalFirstMessageWaitTimeOtel.Add(waitTime.TotalSeconds); + } } diff --git a/Specifications/Events.Processing.Tests/concurrent/ConcurrentPartitionedProcessorTests.cs b/Specifications/Events.Processing.Tests/concurrent/ConcurrentPartitionedProcessorTests.cs index 5f170e7ce..e76a0cbdd 100644 --- a/Specifications/Events.Processing.Tests/concurrent/ConcurrentPartitionedProcessorTests.cs +++ b/Specifications/Events.Processing.Tests/concurrent/ConcurrentPartitionedProcessorTests.cs @@ -168,23 +168,23 @@ private static StreamProcessorState FailingProcessorStateWithRetryIn(TimeSpan ti }.ToImmutableDictionary(), DateTimeOffset.UtcNow); } - private static Channel ChannelWithEvent() + private static Channel ChannelWithEvent() { - var events = Channel.CreateBounded(100); - events.Writer.WriteAsync(FirstStreamEvent).GetAwaiter().GetResult(); + var events = Channel.CreateBounded(100); + events.Writer.WriteAsync(new StreamSubscriptionMessage(FirstStreamEvent)).GetAwaiter().GetResult(); return events; } - private static Channel ChannelWithEventAvailableAfter(TimeSpan timeSpan) + private static Channel ChannelWithEventAvailableAfter(TimeSpan timeSpan) { - var events = Channel.CreateBounded(100); + var events = Channel.CreateBounded(100); Task.Run(async () => { await Task.Delay(timeSpan); - events.Writer.WriteAsync(FirstStreamEvent).GetAwaiter().GetResult(); + events.Writer.WriteAsync(new StreamSubscriptionMessage(FirstStreamEvent)).GetAwaiter().GetResult(); }); return events; } - private static Channel ChannelWithoutEvents() => Channel.CreateBounded(100); + private static Channel ChannelWithoutEvents() => Channel.CreateBounded(100); } \ No newline at end of file diff --git a/versions.props b/versions.props index dd502117c..8d7e28ffb 100644 --- a/versions.props +++ b/versions.props @@ -7,14 +7,14 @@ 7.8.0 3.1.0 2.* - 2.62.0 + 2.63.0 2.46.6 - 2.63.0 + 2.64.0 2.10.2 1.0.0 17.9.0 8.0.0 - 2.22.0 + 2.25.0 1.0.0 4.18.* 6.12.0