From a53ce8f394365db638b2ff7387e79148254153b9 Mon Sep 17 00:00:00 2001 From: Mike Minutillo Date: Tue, 3 Nov 2020 12:46:06 +0800 Subject: [PATCH 1/6] Log ingestion lifecycle --- .../Auditing/AuditIngestion.cs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 6851b17677..a027aaaf39 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Infrastructure; using NServiceBus; + using NServiceBus.Logging; using NServiceBus.Raw; using NServiceBus.Transport; @@ -30,10 +31,13 @@ public async Task EnsureStarted(CancellationToken cancellationToken) { try { + logger.Debug("Ensure started. Start/stop semaphore acquiring"); await startStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + logger.Debug("Ensure started. Start/stop semaphore acquired"); if (ingestionEndpoint != null) { + logger.Debug("Ensure started. Already started, skipping start up"); return; //Already started } @@ -43,16 +47,20 @@ public async Task EnsureStarted(CancellationToken cancellationToken) rawConfiguration.CustomErrorHandlingPolicy(errorHandlingPolicy); + logger.Info("Ensure started. Infrastructure starting"); var startableRaw = await RawEndpoint.Create(rawConfiguration).ConfigureAwait(false); await initialize(startableRaw).ConfigureAwait(false); ingestionEndpoint = await startableRaw.Start() .ConfigureAwait(false); + logger.Info("Ensure started. Infrastructure started"); } finally { + logger.Debug("Ensure started. Start/stop semaphore releasing"); startStopSemaphore.Release(); + logger.Debug("Ensure started. Start/stop semaphore released"); } } @@ -62,19 +70,26 @@ public async Task EnsureStopped(CancellationToken cancellationToken) { try { + logger.Info("Shutting down. Start/stop semaphore acquiring"); await startStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + logger.Info("Shutting down. Start/stop semaphore acquired"); if (ingestionEndpoint == null) { + logger.Info("Shutting down. Already stopped, skipping shut down"); return; //Already stopped } var stoppable = ingestionEndpoint; ingestionEndpoint = null; + logger.Info("Shutting down. Infrastructure shut down commencing"); await stoppable.Stop().ConfigureAwait(false); + logger.Info("Shutting down. Infrastructure shut down completed"); } finally { + logger.Info("Shutting down. Start/stop semaphore releasing"); startStopSemaphore.Release(); + logger.Info("Shutting down. Start/stop semaphore released"); } } @@ -86,5 +101,7 @@ public async Task EnsureStopped(CancellationToken cancellationToken) IErrorHandlingPolicy errorHandlingPolicy; Func onCriticalError; IReceivingRawEndpoint ingestionEndpoint; + + static readonly ILog logger = LogManager.GetLogger(); } } \ No newline at end of file From 7d65aad359cef1f960fec106db99674d9f686161 Mon Sep 17 00:00:00 2001 From: Mike Minutillo Date: Tue, 3 Nov 2020 12:46:28 +0800 Subject: [PATCH 2/6] Incorrect name used in audit ingestion watchdog --- src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs b/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs index 4e79f5f2b0..422ed0ccd9 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs @@ -55,7 +55,7 @@ AuditIngestionCustomCheck.State ingestionState failedImporter = new ImportFailedAudits(documentStore, ingestor, rawEndpointFactory); watchdog = new Watchdog(ingestion.EnsureStarted, ingestion.EnsureStopped, ingestionState.ReportError, - ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, log, "failed message ingestion"); + ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, log, "audit message ingestion"); channel = Channel.CreateBounded(new BoundedChannelOptions(settings.MaximumConcurrencyLevel) { From 9549df33e0ea6ae42a297b94eb2ab3e19ef4aa42 Mon Sep 17 00:00:00 2001 From: WilliamBZA Date: Tue, 3 Nov 2020 09:52:17 +0200 Subject: [PATCH 3/6] Log in OnCriticalError --- src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs b/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs index 422ed0ccd9..f7fb0747c8 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs @@ -78,6 +78,7 @@ FailedAuditImport FailedMessageFactory(FailedTransportMessage msg) Task OnCriticalError(string failure, Exception arg2) { + log.Debug($"OnCriticalError. '{failure}'", arg2); return watchdog.OnFailure(failure); } From 3e4b11849c7bba59bbf15422abdb247cad60dc20 Mon Sep 17 00:00:00 2001 From: WilliamBZA Date: Tue, 3 Nov 2020 16:01:33 +0200 Subject: [PATCH 4/6] Add more log statements --- .../Auditing/AuditIngestor.cs | 17 +++++++++++++ .../Auditing/AuditPersister.cs | 24 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index 2c75ef476e..c9f6b3f1d7 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -19,14 +19,26 @@ public AuditIngestor(AuditPersister auditPersister, Settings settings) public async Task Ingest(List contexts) { + if (log.IsDebugEnabled) + { + log.Debug($"Ingesting {contexts.Count} message contexts"); + } var stored = await auditPersister.Persist(contexts).ConfigureAwait(false); try { if (settings.ForwardAuditMessages) { + if (log.IsDebugEnabled) + { + log.Debug($"Forwarding {contexts.Count} messages"); + } await Forward(stored, settings.AuditLogQueue) .ConfigureAwait(false); + if (log.IsDebugEnabled) + { + log.Debug($"Forwarded messages"); + } } foreach (var context in stored) @@ -36,6 +48,11 @@ await Forward(stored, settings.AuditLogQueue) } catch (Exception e) { + if (log.IsDebugEnabled) + { + log.Debug($"Forwarding messages failed"); + } + // in case forwarding throws foreach (var context in stored) { diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index 9fa69b4dde..5ef0462dd7 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -74,11 +74,19 @@ public async Task> Persist(List co RecordKnownEndpoints(receivingEndpoint, knownEndpoints, processedMessage); } + if (Logger.IsDebugEnabled) + { + Logger.Debug($"Adding message for bulk storage"); + } await bulkInsert.StoreAsync(processedMessage).ConfigureAwait(false); storedContexts.Add(context); } else if (context.Extensions.TryGet(out SagaSnapshot sagaSnapshot)) { + if (Logger.IsDebugEnabled) + { + Logger.Debug($"Adding message for bulk storage"); + } await bulkInsert.StoreAsync(sagaSnapshot).ConfigureAwait(false); storedContexts.Add(context); } @@ -86,9 +94,17 @@ public async Task> Persist(List co foreach (var endpoint in knownEndpoints.Values) { + if (Logger.IsDebugEnabled) + { + Logger.Debug($"Adding known endpoint for bulk storage"); + } await bulkInsert.StoreAsync(endpoint).ConfigureAwait(false); } + if (Logger.IsDebugEnabled) + { + Logger.Debug($"Performing bulk storage write"); + } await bulkInsert.DisposeAsync().ConfigureAwait(false); } catch (Exception e) @@ -207,11 +223,19 @@ await bodyStorageEnricher.StoreAuditMessageBody(context.Body, context.Headers, m Id = $"ProcessedMessages/{context.Headers.ProcessingId()}" }; + if (Logger.IsDebugEnabled) + { + Logger.Debug($"Emitting {commandsToEmit.Count} commands"); + } foreach (var commandToEmit in commandsToEmit) { await messageSession.Send(commandToEmit) .ConfigureAwait(false); } + if (Logger.IsDebugEnabled) + { + Logger.Debug($"{commandsToEmit.Count} commands emitted."); + } context.Extensions.Set(auditMessage); if (metadata.TryGetValue("SendingEndpoint", out var sendingEndpoint)) From f4ffb6b6c4b7c6c1bbaddb8f86e8dcad24cbffa4 Mon Sep 17 00:00:00 2001 From: WilliamBZA Date: Tue, 3 Nov 2020 16:03:26 +0200 Subject: [PATCH 5/6] Change debug to warn in criticalerror --- src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs b/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs index f7fb0747c8..0a05b705b9 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs @@ -78,7 +78,7 @@ FailedAuditImport FailedMessageFactory(FailedTransportMessage msg) Task OnCriticalError(string failure, Exception arg2) { - log.Debug($"OnCriticalError. '{failure}'", arg2); + log.Warn($"OnCriticalError. '{failure}'", arg2); return watchdog.OnFailure(failure); } From d74d78ef2ee3a6238073c7d475db344a05f22f7c Mon Sep 17 00:00:00 2001 From: WilliamBZA Date: Thu, 5 Nov 2020 08:19:54 +0200 Subject: [PATCH 6/6] Apply suggestions from code review --- src/ServiceControl.Audit/Auditing/AuditPersister.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index 5ef0462dd7..29cc07d76e 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -76,7 +76,7 @@ public async Task> Persist(List co if (Logger.IsDebugEnabled) { - Logger.Debug($"Adding message for bulk storage"); + Logger.Debug($"Adding audit message for bulk storage"); } await bulkInsert.StoreAsync(processedMessage).ConfigureAwait(false); storedContexts.Add(context); @@ -85,7 +85,7 @@ public async Task> Persist(List co { if (Logger.IsDebugEnabled) { - Logger.Debug($"Adding message for bulk storage"); + Logger.Debug($"Adding SagaSnapshot message for bulk storage"); } await bulkInsert.StoreAsync(sagaSnapshot).ConfigureAwait(false); storedContexts.Add(context); @@ -268,4 +268,4 @@ await messageSession.Send(commandToEmit) IMessageSession messageSession; static ILog Logger = LogManager.GetLogger(); } -} \ No newline at end of file +}