Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Infrastructure;
using NServiceBus;
using NServiceBus.Logging;
using NServiceBus.Raw;
using NServiceBus.Transport;

Expand All @@ -30,10 +31,13 @@ public async Task EnsureStarted(CancellationToken cancellationToken)
{
try
{
logger.Debug("Ensure started. Start/stop semaphore acquiring");
await startStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
logger.Debug("Ensure started. Start/stop semaphore acquired");

if (ingestionEndpoint != null)
{
logger.Debug("Ensure started. Already started, skipping start up");
return; //Already started
}

Expand All @@ -43,16 +47,20 @@ public async Task EnsureStarted(CancellationToken cancellationToken)

rawConfiguration.CustomErrorHandlingPolicy(errorHandlingPolicy);

logger.Info("Ensure started. Infrastructure starting");
var startableRaw = await RawEndpoint.Create(rawConfiguration).ConfigureAwait(false);

await initialize(startableRaw).ConfigureAwait(false);

ingestionEndpoint = await startableRaw.Start()
.ConfigureAwait(false);
logger.Info("Ensure started. Infrastructure started");
}
finally
{
logger.Debug("Ensure started. Start/stop semaphore releasing");
startStopSemaphore.Release();
logger.Debug("Ensure started. Start/stop semaphore released");
}
}

Expand All @@ -62,19 +70,26 @@ public async Task EnsureStopped(CancellationToken cancellationToken)
{
try
{
logger.Info("Shutting down. Start/stop semaphore acquiring");
await startStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
logger.Info("Shutting down. Start/stop semaphore acquired");

if (ingestionEndpoint == null)
{
logger.Info("Shutting down. Already stopped, skipping shut down");
return; //Already stopped
}
var stoppable = ingestionEndpoint;
ingestionEndpoint = null;
logger.Info("Shutting down. Infrastructure shut down commencing");
await stoppable.Stop().ConfigureAwait(false);
logger.Info("Shutting down. Infrastructure shut down completed");
}
finally
{
logger.Info("Shutting down. Start/stop semaphore releasing");
startStopSemaphore.Release();
logger.Info("Shutting down. Start/stop semaphore released");
}
}

Expand All @@ -86,5 +101,7 @@ public async Task EnsureStopped(CancellationToken cancellationToken)
IErrorHandlingPolicy errorHandlingPolicy;
Func<string, Exception, Task> onCriticalError;
IReceivingRawEndpoint ingestionEndpoint;

static readonly ILog logger = LogManager.GetLogger<AuditIngestion>();
}
}
3 changes: 2 additions & 1 deletion src/ServiceControl.Audit/Auditing/AuditIngestionComponent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ AuditIngestionCustomCheck.State ingestionState
failedImporter = new ImportFailedAudits(documentStore, ingestor, rawEndpointFactory);

watchdog = new Watchdog(ingestion.EnsureStarted, ingestion.EnsureStopped, ingestionState.ReportError,
ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, log, "failed message ingestion");
ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, log, "audit message ingestion");

channel = Channel.CreateBounded<MessageContext>(new BoundedChannelOptions(settings.MaximumConcurrencyLevel)
{
Expand All @@ -78,6 +78,7 @@ FailedAuditImport FailedMessageFactory(FailedTransportMessage msg)

Task OnCriticalError(string failure, Exception arg2)
{
log.Warn($"OnCriticalError. '{failure}'", arg2);
return watchdog.OnFailure(failure);
}

Expand Down
17 changes: 17 additions & 0 deletions src/ServiceControl.Audit/Auditing/AuditIngestor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,26 @@ public AuditIngestor(AuditPersister auditPersister, Settings settings)

public async Task Ingest(List<MessageContext> contexts)
{
if (log.IsDebugEnabled)
{
log.Debug($"Ingesting {contexts.Count} message contexts");
}
var stored = await auditPersister.Persist(contexts).ConfigureAwait(false);

try
{
if (settings.ForwardAuditMessages)
{
if (log.IsDebugEnabled)
{
log.Debug($"Forwarding {contexts.Count} messages");
}
await Forward(stored, settings.AuditLogQueue)
.ConfigureAwait(false);
if (log.IsDebugEnabled)
{
log.Debug($"Forwarded messages");
}
}

foreach (var context in stored)
Expand All @@ -36,6 +48,11 @@ await Forward(stored, settings.AuditLogQueue)
}
catch (Exception e)
{
if (log.IsDebugEnabled)
{
log.Debug($"Forwarding messages failed");
}

// in case forwarding throws
foreach (var context in stored)
{
Expand Down
26 changes: 25 additions & 1 deletion src/ServiceControl.Audit/Auditing/AuditPersister.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,37 @@ public async Task<IReadOnlyList<MessageContext>> Persist(List<MessageContext> co
RecordKnownEndpoints(receivingEndpoint, knownEndpoints, processedMessage);
}

if (Logger.IsDebugEnabled)
{
Logger.Debug($"Adding audit message for bulk storage");
}
await bulkInsert.StoreAsync(processedMessage).ConfigureAwait(false);
storedContexts.Add(context);
}
else if (context.Extensions.TryGet(out SagaSnapshot sagaSnapshot))
{
if (Logger.IsDebugEnabled)
{
Logger.Debug($"Adding SagaSnapshot message for bulk storage");
}
await bulkInsert.StoreAsync(sagaSnapshot).ConfigureAwait(false);
storedContexts.Add(context);
}
}

foreach (var endpoint in knownEndpoints.Values)
{
if (Logger.IsDebugEnabled)
{
Logger.Debug($"Adding known endpoint for bulk storage");
}
await bulkInsert.StoreAsync(endpoint).ConfigureAwait(false);
}

if (Logger.IsDebugEnabled)
{
Logger.Debug($"Performing bulk storage write");
}
await bulkInsert.DisposeAsync().ConfigureAwait(false);
}
catch (Exception e)
Expand Down Expand Up @@ -207,11 +223,19 @@ await bodyStorageEnricher.StoreAuditMessageBody(context.Body, context.Headers, m
Id = $"ProcessedMessages/{context.Headers.ProcessingId()}"
};

if (Logger.IsDebugEnabled)
{
Logger.Debug($"Emitting {commandsToEmit.Count} commands");
}
foreach (var commandToEmit in commandsToEmit)
{
await messageSession.Send(commandToEmit)
.ConfigureAwait(false);
}
if (Logger.IsDebugEnabled)
{
Logger.Debug($"{commandsToEmit.Count} commands emitted.");
}

context.Extensions.Set(auditMessage);
if (metadata.TryGetValue("SendingEndpoint", out var sendingEndpoint))
Expand Down Expand Up @@ -244,4 +268,4 @@ await messageSession.Send(commandToEmit)
IMessageSession messageSession;
static ILog Logger = LogManager.GetLogger<AuditPersister>();
}
}
}