diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs index 6851b17677..a027aaaf39 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestion.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestion.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Infrastructure; using NServiceBus; + using NServiceBus.Logging; using NServiceBus.Raw; using NServiceBus.Transport; @@ -30,10 +31,13 @@ public async Task EnsureStarted(CancellationToken cancellationToken) { try { + logger.Debug("Ensure started. Start/stop semaphore acquiring"); await startStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + logger.Debug("Ensure started. Start/stop semaphore acquired"); if (ingestionEndpoint != null) { + logger.Debug("Ensure started. Already started, skipping start up"); return; //Already started } @@ -43,16 +47,20 @@ public async Task EnsureStarted(CancellationToken cancellationToken) rawConfiguration.CustomErrorHandlingPolicy(errorHandlingPolicy); + logger.Info("Ensure started. Infrastructure starting"); var startableRaw = await RawEndpoint.Create(rawConfiguration).ConfigureAwait(false); await initialize(startableRaw).ConfigureAwait(false); ingestionEndpoint = await startableRaw.Start() .ConfigureAwait(false); + logger.Info("Ensure started. Infrastructure started"); } finally { + logger.Debug("Ensure started. Start/stop semaphore releasing"); startStopSemaphore.Release(); + logger.Debug("Ensure started. Start/stop semaphore released"); } } @@ -62,19 +70,26 @@ public async Task EnsureStopped(CancellationToken cancellationToken) { try { + logger.Info("Shutting down. Start/stop semaphore acquiring"); await startStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + logger.Info("Shutting down. Start/stop semaphore acquired"); if (ingestionEndpoint == null) { + logger.Info("Shutting down. Already stopped, skipping shut down"); return; //Already stopped } var stoppable = ingestionEndpoint; ingestionEndpoint = null; + logger.Info("Shutting down. Infrastructure shut down commencing"); await stoppable.Stop().ConfigureAwait(false); + logger.Info("Shutting down. Infrastructure shut down completed"); } finally { + logger.Info("Shutting down. Start/stop semaphore releasing"); startStopSemaphore.Release(); + logger.Info("Shutting down. Start/stop semaphore released"); } } @@ -86,5 +101,7 @@ public async Task EnsureStopped(CancellationToken cancellationToken) IErrorHandlingPolicy errorHandlingPolicy; Func onCriticalError; IReceivingRawEndpoint ingestionEndpoint; + + static readonly ILog logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs b/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs index 4e79f5f2b0..0a05b705b9 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs @@ -55,7 +55,7 @@ AuditIngestionCustomCheck.State ingestionState failedImporter = new ImportFailedAudits(documentStore, ingestor, rawEndpointFactory); watchdog = new Watchdog(ingestion.EnsureStarted, ingestion.EnsureStopped, ingestionState.ReportError, - ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, log, "failed message ingestion"); + ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, log, "audit message ingestion"); channel = Channel.CreateBounded(new BoundedChannelOptions(settings.MaximumConcurrencyLevel) { @@ -78,6 +78,7 @@ FailedAuditImport FailedMessageFactory(FailedTransportMessage msg) Task OnCriticalError(string failure, Exception arg2) { + log.Warn($"OnCriticalError. '{failure}'", arg2); return watchdog.OnFailure(failure); } diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index 2c75ef476e..c9f6b3f1d7 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -19,14 +19,26 @@ public AuditIngestor(AuditPersister auditPersister, Settings settings) public async Task Ingest(List contexts) { + if (log.IsDebugEnabled) + { + log.Debug($"Ingesting {contexts.Count} message contexts"); + } var stored = await auditPersister.Persist(contexts).ConfigureAwait(false); try { if (settings.ForwardAuditMessages) { + if (log.IsDebugEnabled) + { + log.Debug($"Forwarding {contexts.Count} messages"); + } await Forward(stored, settings.AuditLogQueue) .ConfigureAwait(false); + if (log.IsDebugEnabled) + { + log.Debug($"Forwarded messages"); + } } foreach (var context in stored) @@ -36,6 +48,11 @@ await Forward(stored, settings.AuditLogQueue) } catch (Exception e) { + if (log.IsDebugEnabled) + { + log.Debug($"Forwarding messages failed"); + } + // in case forwarding throws foreach (var context in stored) { diff --git a/src/ServiceControl.Audit/Auditing/AuditPersister.cs b/src/ServiceControl.Audit/Auditing/AuditPersister.cs index 9fa69b4dde..29cc07d76e 100644 --- a/src/ServiceControl.Audit/Auditing/AuditPersister.cs +++ b/src/ServiceControl.Audit/Auditing/AuditPersister.cs @@ -74,11 +74,19 @@ public async Task> Persist(List co RecordKnownEndpoints(receivingEndpoint, knownEndpoints, processedMessage); } + if (Logger.IsDebugEnabled) + { + Logger.Debug($"Adding audit message for bulk storage"); + } await bulkInsert.StoreAsync(processedMessage).ConfigureAwait(false); storedContexts.Add(context); } else if (context.Extensions.TryGet(out SagaSnapshot sagaSnapshot)) { + if (Logger.IsDebugEnabled) + { + Logger.Debug($"Adding SagaSnapshot message for bulk storage"); + } await bulkInsert.StoreAsync(sagaSnapshot).ConfigureAwait(false); storedContexts.Add(context); } @@ -86,9 +94,17 @@ public async Task> Persist(List co foreach (var endpoint in knownEndpoints.Values) { + if (Logger.IsDebugEnabled) + { + Logger.Debug($"Adding known endpoint for bulk storage"); + } await bulkInsert.StoreAsync(endpoint).ConfigureAwait(false); } + if (Logger.IsDebugEnabled) + { + Logger.Debug($"Performing bulk storage write"); + } await bulkInsert.DisposeAsync().ConfigureAwait(false); } catch (Exception e) @@ -207,11 +223,19 @@ await bodyStorageEnricher.StoreAuditMessageBody(context.Body, context.Headers, m Id = $"ProcessedMessages/{context.Headers.ProcessingId()}" }; + if (Logger.IsDebugEnabled) + { + Logger.Debug($"Emitting {commandsToEmit.Count} commands"); + } foreach (var commandToEmit in commandsToEmit) { await messageSession.Send(commandToEmit) .ConfigureAwait(false); } + if (Logger.IsDebugEnabled) + { + Logger.Debug($"{commandsToEmit.Count} commands emitted."); + } context.Extensions.Set(auditMessage); if (metadata.TryGetValue("SendingEndpoint", out var sendingEndpoint)) @@ -244,4 +268,4 @@ await messageSession.Send(commandToEmit) IMessageSession messageSession; static ILog Logger = LogManager.GetLogger(); } -} \ No newline at end of file +}