From fdd69006afc59140225c86f091965ed7825f3c06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Tue, 22 Apr 2025 12:34:26 +0200 Subject: [PATCH 01/11] Refine test to show minimal metadata needed for errors to be imported successfully --- ...ng_message_with_missing_metadata_failed.cs | 38 ++++++++++--------- .../FailureDetails.cs | 7 +--- ...ectNewEndpointsFromErrorImportsEnricher.cs | 9 +---- .../Operations/ErrorIngestor.cs | 12 ++---- .../Operations/ErrorProcessor.cs | 19 +++------- 5 files changed, 31 insertions(+), 54 deletions(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs index 6fad5fd287..522b595255 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs @@ -42,13 +42,13 @@ public async Task TimeSent_should_not_be_casted() var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); - await Define(ctx => { ctx.TimeSent = sentTime; }) + await Define(ctx => ctx.TimeSent = sentTime) .WithEndpoint() .Done(async c => { var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); failure = result; - return c.UniqueMessageId != null & result; + return (c.UniqueMessageId != null) & result; }) .Run(); @@ -61,22 +61,31 @@ public async Task Should_be_able_to_get_the_message_by_id() { FailedMessageView failure = null; - await Define() + var testStartTime = DateTime.UtcNow; + + var context = await Define() .WithEndpoint() .Done(async c => { var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); failure = result; - return c.UniqueMessageId != null & result; + return (c.UniqueMessageId != null) & result; }) .Run(); Assert.That(failure, Is.Not.Null); + + //No failure time will result in utc now being used + Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); + + // ServicePulse assumes that the receiving endpoint name is set making sure that its present + Assert.That(failure.ReceivingEndpoint, Is.Not.Null); + Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); } - public class Failing : EndpointConfigurationBuilder + class Failing : EndpointConfigurationBuilder { - public Failing() => EndpointSetup(c => { c.Recoverability().Delayed(x => x.NumberOfRetries(0)); }); + public Failing() => EndpointSetup(c => c.Recoverability().Delayed(x => x.NumberOfRetries(0))); class SendFailedMessage : DispatchRawMessages { @@ -89,30 +98,23 @@ protected override TransportOperations CreateMessage(MyContext context) var headers = new Dictionary { [Headers.MessageId] = context.MessageId, - [Headers.ProcessingEndpoint] = context.EndpointNameOfReceivingEndpoint, - ["NServiceBus.ExceptionInfo.ExceptionType"] = "2014-11-11 02:26:57:767462 Z", - ["NServiceBus.ExceptionInfo.Message"] = "An error occurred while attempting to extract logical messages from transport message NServiceBus.TransportMessage", - ["NServiceBus.ExceptionInfo.InnerExceptionType"] = "System.Exception", - ["NServiceBus.ExceptionInfo.Source"] = "NServiceBus.Core", - ["NServiceBus.ExceptionInfo.StackTrace"] = string.Empty, ["NServiceBus.FailedQ"] = Conventions.EndpointNamingConvention(typeof(Failing)), - ["NServiceBus.TimeOfFailure"] = "2014-11-11 02:26:58:000462 Z" + [Headers.ProcessingMachine] = "unknown", // This is needed for endpoint detection to work, endpoint name is detected from the FailedQ header }; + if (context.TimeSent.HasValue) { headers["NServiceBus.TimeSent"] = DateTimeOffsetHelper.ToWireFormattedString(context.TimeSent.Value); } - var outgoingMessage = new OutgoingMessage(context.MessageId, headers, new byte[0]); + var outgoingMessage = new OutgoingMessage(context.MessageId, headers, Array.Empty()); - return new TransportOperations( - new TransportOperation(outgoingMessage, new UnicastAddressTag("error")) - ); + return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error"))); } } } - public class MyContext : ScenarioContext + class MyContext : ScenarioContext { public string MessageId { get; set; } diff --git a/src/ServiceControl.Persistence/FailureDetails.cs b/src/ServiceControl.Persistence/FailureDetails.cs index 81d7be97e7..180dd830a1 100644 --- a/src/ServiceControl.Persistence/FailureDetails.cs +++ b/src/ServiceControl.Persistence/FailureDetails.cs @@ -4,14 +4,9 @@ namespace ServiceControl.Contracts.Operations public class FailureDetails { - public FailureDetails() - { - TimeOfFailure = DateTime.UtcNow; - } - public string AddressOfFailingEndpoint { get; set; } - public DateTime TimeOfFailure { get; set; } + public DateTime TimeOfFailure { get; set; } = DateTime.UtcNow; public ExceptionDetails Exception { get; set; } } diff --git a/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs b/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs index 1e01e73b97..c067fa1746 100644 --- a/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs +++ b/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs @@ -5,13 +5,8 @@ using ServiceControl.Contracts.Operations; using ServiceControl.Persistence; - class DetectNewEndpointsFromErrorImportsEnricher : IEnrichImportedErrorMessages + class DetectNewEndpointsFromErrorImportsEnricher(IEndpointInstanceMonitoring monitoring) : IEnrichImportedErrorMessages { - public DetectNewEndpointsFromErrorImportsEnricher(IEndpointInstanceMonitoring monitoring) - { - this.monitoring = monitoring; - } - public void Enrich(ErrorEnricherContext context) { var sendingEndpoint = EndpointDetailsParser.SendingEndpoint(context.Headers); @@ -47,7 +42,5 @@ void TryAddEndpoint(EndpointDetails endpointDetails, ErrorEnricherContext contex context.Add(endpointDetails); } } - - IEndpointInstanceMonitoring monitoring; } } \ No newline at end of file diff --git a/src/ServiceControl/Operations/ErrorIngestor.cs b/src/ServiceControl/Operations/ErrorIngestor.cs index 660dcafbb1..88b3513738 100644 --- a/src/ServiceControl/Operations/ErrorIngestor.cs +++ b/src/ServiceControl/Operations/ErrorIngestor.cs @@ -37,13 +37,7 @@ public ErrorIngestor(Metrics metrics, bulkInsertDurationMeter = metrics.GetMeter("Error ingestion - bulk insert duration", FrequencyInMilliseconds); var ingestedMeter = metrics.GetCounter("Error ingestion - ingested"); - var enrichers = new IEnrichImportedErrorMessages[] - { - new MessageTypeEnricher(), - new EnrichWithTrackingIds(), - new ProcessingStatisticsEnricher() - - }.Concat(errorEnrichers).ToArray(); + var enrichers = new IEnrichImportedErrorMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher() }.Concat(errorEnrichers).ToArray(); errorProcessor = new ErrorProcessor(enrichers, failedMessageEnrichers.ToArray(), domainEvents, ingestedMeter); retryConfirmationProcessor = new RetryConfirmationProcessor(domainEvents); @@ -67,7 +61,6 @@ public async Task Ingest(List contexts, CancellationToken cancel } } - var storedFailed = await PersistFailedMessages(failedMessages, retriedMessages, cancellationToken); try @@ -77,6 +70,7 @@ public async Task Ingest(List contexts, CancellationToken cancel { announcerTasks.Add(errorProcessor.Announce(context)); } + foreach (var context in retriedMessages) { announcerTasks.Add(retryConfirmationProcessor.Announce(context)); @@ -90,6 +84,7 @@ public async Task Ingest(List contexts, CancellationToken cancel { Logger.Debug($"Forwarding {storedFailed.Count} messages"); } + await Forward(storedFailed, cancellationToken); if (Logger.IsDebugEnabled) { @@ -133,6 +128,7 @@ async Task> PersistFailedMessages(List> Process(IReadOnlyList contexts, IIngestionUnitOfWork unitOfWork) { var storedContexts = new List(contexts.Count); @@ -169,10 +163,7 @@ static void RecordKnownEndpoints(EndpointDetails observedEndpoint, Dictionary(); } } \ No newline at end of file From b61f8cf0f6c9f6cf3040a98173aa1620da5dd340 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Tue, 22 Apr 2025 12:39:14 +0200 Subject: [PATCH 02/11] Wording --- .../When_processing_message_with_missing_metadata_failed.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs index 522b595255..e93d552ccc 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs @@ -78,7 +78,7 @@ public async Task Should_be_able_to_get_the_message_by_id() //No failure time will result in utc now being used Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); - // ServicePulse assumes that the receiving endpoint name is set making sure that its present + // ServicePulse assumes that the receiving endpoint name Assert.That(failure.ReceivingEndpoint, Is.Not.Null); Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); } From 3a44196c997963f176cafcc34861be959b061194 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Tue, 22 Apr 2025 12:39:49 +0200 Subject: [PATCH 03/11] Better comment --- .../When_processing_message_with_missing_metadata_failed.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs index e93d552ccc..bf05777fc4 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs @@ -99,7 +99,7 @@ protected override TransportOperations CreateMessage(MyContext context) { [Headers.MessageId] = context.MessageId, ["NServiceBus.FailedQ"] = Conventions.EndpointNamingConvention(typeof(Failing)), - [Headers.ProcessingMachine] = "unknown", // This is needed for endpoint detection to work, endpoint name is detected from the FailedQ header + [Headers.ProcessingMachine] = "unknown", // This is needed for endpoint detection to work since "host" is required, endpoint name is detected from the FailedQ header }; if (context.TimeSent.HasValue) From 3f8d5b71930eb48a585cfa14f1a2776311fafcf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Tue, 22 Apr 2025 12:43:08 +0200 Subject: [PATCH 04/11] More comments --- .../When_processing_message_with_missing_metadata_failed.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs index bf05777fc4..7bc6536156 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs @@ -78,7 +78,7 @@ public async Task Should_be_able_to_get_the_message_by_id() //No failure time will result in utc now being used Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); - // ServicePulse assumes that the receiving endpoint name + // ServicePulse assumes that the receiving endpoint name is present Assert.That(failure.ReceivingEndpoint, Is.Not.Null); Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); } From d786a34274a2c643a29ca29f61d4210d74100a1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Wed, 23 Apr 2025 09:46:39 +0200 Subject: [PATCH 05/11] Isolate changes --- src/ServiceControl.Persistence/FailureDetails.cs | 7 ++++++- .../DetectNewEndpointsFromErrorImportsEnricher.cs | 9 ++++++++- src/ServiceControl/Operations/ErrorIngestor.cs | 12 ++++++++---- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/ServiceControl.Persistence/FailureDetails.cs b/src/ServiceControl.Persistence/FailureDetails.cs index 180dd830a1..81d7be97e7 100644 --- a/src/ServiceControl.Persistence/FailureDetails.cs +++ b/src/ServiceControl.Persistence/FailureDetails.cs @@ -4,9 +4,14 @@ namespace ServiceControl.Contracts.Operations public class FailureDetails { + public FailureDetails() + { + TimeOfFailure = DateTime.UtcNow; + } + public string AddressOfFailingEndpoint { get; set; } - public DateTime TimeOfFailure { get; set; } = DateTime.UtcNow; + public DateTime TimeOfFailure { get; set; } public ExceptionDetails Exception { get; set; } } diff --git a/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs b/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs index c067fa1746..1e01e73b97 100644 --- a/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs +++ b/src/ServiceControl/Monitoring/DetectNewEndpointsFromErrorImportsEnricher.cs @@ -5,8 +5,13 @@ using ServiceControl.Contracts.Operations; using ServiceControl.Persistence; - class DetectNewEndpointsFromErrorImportsEnricher(IEndpointInstanceMonitoring monitoring) : IEnrichImportedErrorMessages + class DetectNewEndpointsFromErrorImportsEnricher : IEnrichImportedErrorMessages { + public DetectNewEndpointsFromErrorImportsEnricher(IEndpointInstanceMonitoring monitoring) + { + this.monitoring = monitoring; + } + public void Enrich(ErrorEnricherContext context) { var sendingEndpoint = EndpointDetailsParser.SendingEndpoint(context.Headers); @@ -42,5 +47,7 @@ void TryAddEndpoint(EndpointDetails endpointDetails, ErrorEnricherContext contex context.Add(endpointDetails); } } + + IEndpointInstanceMonitoring monitoring; } } \ No newline at end of file diff --git a/src/ServiceControl/Operations/ErrorIngestor.cs b/src/ServiceControl/Operations/ErrorIngestor.cs index 88b3513738..660dcafbb1 100644 --- a/src/ServiceControl/Operations/ErrorIngestor.cs +++ b/src/ServiceControl/Operations/ErrorIngestor.cs @@ -37,7 +37,13 @@ public ErrorIngestor(Metrics metrics, bulkInsertDurationMeter = metrics.GetMeter("Error ingestion - bulk insert duration", FrequencyInMilliseconds); var ingestedMeter = metrics.GetCounter("Error ingestion - ingested"); - var enrichers = new IEnrichImportedErrorMessages[] { new MessageTypeEnricher(), new EnrichWithTrackingIds(), new ProcessingStatisticsEnricher() }.Concat(errorEnrichers).ToArray(); + var enrichers = new IEnrichImportedErrorMessages[] + { + new MessageTypeEnricher(), + new EnrichWithTrackingIds(), + new ProcessingStatisticsEnricher() + + }.Concat(errorEnrichers).ToArray(); errorProcessor = new ErrorProcessor(enrichers, failedMessageEnrichers.ToArray(), domainEvents, ingestedMeter); retryConfirmationProcessor = new RetryConfirmationProcessor(domainEvents); @@ -61,6 +67,7 @@ public async Task Ingest(List contexts, CancellationToken cancel } } + var storedFailed = await PersistFailedMessages(failedMessages, retriedMessages, cancellationToken); try @@ -70,7 +77,6 @@ public async Task Ingest(List contexts, CancellationToken cancel { announcerTasks.Add(errorProcessor.Announce(context)); } - foreach (var context in retriedMessages) { announcerTasks.Add(retryConfirmationProcessor.Announce(context)); @@ -84,7 +90,6 @@ public async Task Ingest(List contexts, CancellationToken cancel { Logger.Debug($"Forwarding {storedFailed.Count} messages"); } - await Forward(storedFailed, cancellationToken); if (Logger.IsDebugEnabled) { @@ -128,7 +133,6 @@ async Task> PersistFailedMessages(List Date: Wed, 23 Apr 2025 09:47:43 +0200 Subject: [PATCH 06/11] More rollbacks --- .../Operations/ErrorProcessor.cs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/ServiceControl/Operations/ErrorProcessor.cs b/src/ServiceControl/Operations/ErrorProcessor.cs index 45689ce6b4..6ac04c8edd 100644 --- a/src/ServiceControl/Operations/ErrorProcessor.cs +++ b/src/ServiceControl/Operations/ErrorProcessor.cs @@ -15,11 +15,17 @@ using ServiceControl.Persistence; using ServiceControl.Persistence.UnitOfWork; - class ErrorProcessor(IEnrichImportedErrorMessages[] enrichers, - IFailedMessageEnricher[] failedMessageEnrichers, - IDomainEvents domainEvents, - Counter ingestedCounter) + class ErrorProcessor { + public ErrorProcessor(IEnrichImportedErrorMessages[] enrichers, IFailedMessageEnricher[] failedMessageEnrichers, IDomainEvents domainEvents, + Counter ingestedCounter) + { + this.enrichers = enrichers; + this.domainEvents = domainEvents; + this.ingestedCounter = ingestedCounter; + failedMessageFactory = new FailedMessageFactory(failedMessageEnrichers); + } + public async Task> Process(IReadOnlyList contexts, IIngestionUnitOfWork unitOfWork) { var storedContexts = new List(contexts.Count); @@ -163,7 +169,10 @@ static void RecordKnownEndpoints(EndpointDetails observedEndpoint, Dictionary(); } } \ No newline at end of file From 6c9622f7ea81d7e9061d0d8829be172664708d3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 24 Apr 2025 10:33:03 +0200 Subject: [PATCH 07/11] Separate tests for minimal required for ingestion vs minimal required for UX experience --- ...ng_failed_message_with_missing_headers.cs} | 103 ++++++++++-------- 1 file changed, 57 insertions(+), 46 deletions(-) rename src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/{When_processing_message_with_missing_metadata_failed.cs => When_ingesting_failed_message_with_missing_headers.cs} (50%) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs similarity index 50% rename from src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs rename to src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs index 7bc6536156..67a5764c46 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_processing_message_with_missing_metadata_failed.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs @@ -12,38 +12,44 @@ using NServiceBus.Transport; using NUnit.Framework; using ServiceControl.MessageFailures.Api; - using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions; - class When_processing_message_with_missing_metadata_failed : AcceptanceTest + class When_ingesting_failed_message_with_missing_headers : AcceptanceTest { [Test] - public async Task Null_TimeSent_should_not_be_cast_to_DateTimeMin() + public async Task TimeSent_should_not_be_casted() { FailedMessageView failure = null; - await Define() - .WithEndpoint() + var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); + + await Define(c => + { + c.AddMinimalRequiredHeaders(); + c.Headers.Add("NServiceBus.TimeSent", DateTimeOffsetHelper.ToWireFormattedString(sentTime)); + }) + .WithEndpoint() .Done(async c => { - var result = await this.TryGetSingle("/api/errors/", m => m.Id == c.UniqueMessageId); + var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); failure = result; - return result; + return (c.UniqueMessageId != null) & result; }) .Run(); Assert.That(failure, Is.Not.Null); - Assert.That(failure.TimeSent, Is.Null); + Assert.That(failure.TimeSent, Is.EqualTo(sentTime)); } [Test] - public async Task TimeSent_should_not_be_casted() + public async Task Should_be_ingested_when_minimal_required_headers_is_present() { FailedMessageView failure = null; - var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); - - await Define(ctx => ctx.TimeSent = sentTime) - .WithEndpoint() + await Define(c => + { + c.AddMinimalRequiredHeaders(); + }) + .WithEndpoint() .Done(async c => { var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); @@ -53,18 +59,28 @@ await Define(ctx => ctx.TimeSent = sentTime) .Run(); Assert.That(failure, Is.Not.Null); - Assert.That(failure.TimeSent, Is.EqualTo(sentTime)); + Assert.That(failure.TimeSent, Is.Null); } [Test] - public async Task Should_be_able_to_get_the_message_by_id() + public async Task Should_include_headers_required_by_ServicePulse() { FailedMessageView failure = null; var testStartTime = DateTime.UtcNow; - var context = await Define() - .WithEndpoint() + var context = await Define(c => + { + c.AddMinimalRequiredHeaders(); + + // This is needed for ServiceControl to be able to detect both endpoint (via failed q header) and host via the processing machine header + // Missing endpoint or host will case a null ref in ServicePulse + c.Headers[Headers.ProcessingMachine] = "MyMachine"; + + c.Headers["NServiceBus.ExceptionInfo.ExceptionType"] = "SomeExceptionType"; + c.Headers["NServiceBus.ExceptionInfo.Message"] = "Some message"; + }) + .WithEndpoint() .Done(async c => { var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); @@ -81,48 +97,43 @@ public async Task Should_be_able_to_get_the_message_by_id() // ServicePulse assumes that the receiving endpoint name is present Assert.That(failure.ReceivingEndpoint, Is.Not.Null); Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); + Assert.That(failure.ReceivingEndpoint.Host, Is.EqualTo("MyMachine")); + + // ServicePulse needs both an exception type and description to render the UI in a resonable way + Assert.That(failure.Exception.ExceptionType, Is.EqualTo("SomeExceptionType")); + Assert.That(failure.Exception.Message, Is.EqualTo("Some message")); } - class Failing : EndpointConfigurationBuilder + class MyContext : ScenarioContext { - public Failing() => EndpointSetup(c => c.Recoverability().Delayed(x => x.NumberOfRetries(0))); - - class SendFailedMessage : DispatchRawMessages - { - protected override TransportOperations CreateMessage(MyContext context) - { - context.EndpointNameOfReceivingEndpoint = Conventions.EndpointNamingConvention(typeof(Failing)); - context.MessageId = Guid.NewGuid().ToString(); - context.UniqueMessageId = DeterministicGuid.MakeId(context.MessageId, context.EndpointNameOfReceivingEndpoint).ToString(); + public string MessageId { get; } = Guid.NewGuid().ToString(); - var headers = new Dictionary - { - [Headers.MessageId] = context.MessageId, - ["NServiceBus.FailedQ"] = Conventions.EndpointNamingConvention(typeof(Failing)), - [Headers.ProcessingMachine] = "unknown", // This is needed for endpoint detection to work since "host" is required, endpoint name is detected from the FailedQ header - }; + public string EndpointNameOfReceivingEndpoint => "MyEndpoint"; - if (context.TimeSent.HasValue) - { - headers["NServiceBus.TimeSent"] = DateTimeOffsetHelper.ToWireFormattedString(context.TimeSent.Value); - } + public string UniqueMessageId => DeterministicGuid.MakeId(MessageId, EndpointNameOfReceivingEndpoint).ToString(); - var outgoingMessage = new OutgoingMessage(context.MessageId, headers, Array.Empty()); + public Dictionary Headers { get; } = []; - return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error"))); - } + public void AddMinimalRequiredHeaders() + { + Headers["NServiceBus.FailedQ"] = EndpointNameOfReceivingEndpoint; + Headers[NServiceBus.Headers.MessageId] = MessageId; } } - class MyContext : ScenarioContext + class FailingEndpoint : EndpointConfigurationBuilder { - public string MessageId { get; set; } - - public string EndpointNameOfReceivingEndpoint { get; set; } + public FailingEndpoint() => EndpointSetup(c => c.Recoverability().Delayed(x => x.NumberOfRetries(0))); - public string UniqueMessageId { get; set; } + class SendFailedMessage : DispatchRawMessages + { + protected override TransportOperations CreateMessage(MyContext context) + { + var outgoingMessage = new OutgoingMessage(context.MessageId, context.Headers, Array.Empty()); - public DateTime? TimeSent { get; set; } + return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error"))); + } + } } } } \ No newline at end of file From 59050c333369d21bf8f80f2517630563a1342c62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 24 Apr 2025 10:40:49 +0200 Subject: [PATCH 08/11] Improve assertions --- ...ingesting_failed_message_with_missing_headers.cs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs index 67a5764c46..683d972af0 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs @@ -45,6 +45,8 @@ public async Task Should_be_ingested_when_minimal_required_headers_is_present() { FailedMessageView failure = null; + var testStartTime = DateTime.UtcNow; + await Define(c => { c.AddMinimalRequiredHeaders(); @@ -60,6 +62,12 @@ await Define(c => Assert.That(failure, Is.Not.Null); Assert.That(failure.TimeSent, Is.Null); + + //No failure time will result in utc now being used + Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); + + // Both host and endpoint name is currently needed so this will be null since no host can be detected from the failed q header5 + Assert.That(failure.ReceivingEndpoint, Is.Null); } [Test] @@ -67,8 +75,6 @@ public async Task Should_include_headers_required_by_ServicePulse() { FailedMessageView failure = null; - var testStartTime = DateTime.UtcNow; - var context = await Define(c => { c.AddMinimalRequiredHeaders(); @@ -91,9 +97,6 @@ public async Task Should_include_headers_required_by_ServicePulse() Assert.That(failure, Is.Not.Null); - //No failure time will result in utc now being used - Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); - // ServicePulse assumes that the receiving endpoint name is present Assert.That(failure.ReceivingEndpoint, Is.Not.Null); Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); From d1a7326750930078e049ba1dd41f4e311f4ab093 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 24 Apr 2025 12:19:53 +0200 Subject: [PATCH 09/11] Cleanup --- ...ing_failed_message_with_missing_headers.cs | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs index 683d972af0..d2213a3f54 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs @@ -15,31 +15,6 @@ class When_ingesting_failed_message_with_missing_headers : AcceptanceTest { - [Test] - public async Task TimeSent_should_not_be_casted() - { - FailedMessageView failure = null; - - var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); - - await Define(c => - { - c.AddMinimalRequiredHeaders(); - c.Headers.Add("NServiceBus.TimeSent", DateTimeOffsetHelper.ToWireFormattedString(sentTime)); - }) - .WithEndpoint() - .Done(async c => - { - var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); - failure = result; - return (c.UniqueMessageId != null) & result; - }) - .Run(); - - Assert.That(failure, Is.Not.Null); - Assert.That(failure.TimeSent, Is.EqualTo(sentTime)); - } - [Test] public async Task Should_be_ingested_when_minimal_required_headers_is_present() { @@ -66,7 +41,7 @@ await Define(c => //No failure time will result in utc now being used Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); - // Both host and endpoint name is currently needed so this will be null since no host can be detected from the failed q header5 + // Both host and endpoint name is currently needed so this will be null since no host can be detected from the failed q header Assert.That(failure.ReceivingEndpoint, Is.Null); } @@ -80,7 +55,7 @@ public async Task Should_include_headers_required_by_ServicePulse() c.AddMinimalRequiredHeaders(); // This is needed for ServiceControl to be able to detect both endpoint (via failed q header) and host via the processing machine header - // Missing endpoint or host will case a null ref in ServicePulse + // Missing endpoint or host will cause a null ref in ServicePulse c.Headers[Headers.ProcessingMachine] = "MyMachine"; c.Headers["NServiceBus.ExceptionInfo.ExceptionType"] = "SomeExceptionType"; @@ -107,6 +82,31 @@ public async Task Should_include_headers_required_by_ServicePulse() Assert.That(failure.Exception.Message, Is.EqualTo("Some message")); } + [Test] + public async Task TimeSent_should_not_be_casted() + { + FailedMessageView failure = null; + + var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); + + await Define(c => + { + c.AddMinimalRequiredHeaders(); + c.Headers.Add("NServiceBus.TimeSent", DateTimeOffsetHelper.ToWireFormattedString(sentTime)); + }) + .WithEndpoint() + .Done(async c => + { + var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); + failure = result; + return (c.UniqueMessageId != null) & result; + }) + .Run(); + + Assert.That(failure, Is.Not.Null); + Assert.That(failure.TimeSent, Is.EqualTo(sentTime)); + } + class MyContext : ScenarioContext { public string MessageId { get; } = Guid.NewGuid().ToString(); From 821f4d94a2ad6b1745f3956d74ef6c9e49ac5246 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Thu, 24 Apr 2025 12:43:59 +0200 Subject: [PATCH 10/11] Consolidate done criteria --- ...ing_failed_message_with_missing_headers.cs | 235 +++++++++--------- 1 file changed, 112 insertions(+), 123 deletions(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs index d2213a3f54..745d629e3c 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs @@ -1,141 +1,130 @@ -namespace ServiceControl.AcceptanceTests.Recoverability.MessageFailures +namespace ServiceControl.AcceptanceTests.Recoverability.MessageFailures; + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using AcceptanceTesting; +using AcceptanceTesting.EndpointTemplates; +using Infrastructure; +using NServiceBus; +using NServiceBus.AcceptanceTesting; +using NServiceBus.Routing; +using NServiceBus.Transport; +using NUnit.Framework; +using ServiceControl.MessageFailures.Api; + +class When_ingesting_failed_message_with_missing_headers : AcceptanceTest { - using System; - using System.Collections.Generic; - using System.Threading.Tasks; - using AcceptanceTesting; - using AcceptanceTesting.EndpointTemplates; - using Infrastructure; - using NServiceBus; - using NServiceBus.AcceptanceTesting; - using NServiceBus.Routing; - using NServiceBus.Transport; - using NUnit.Framework; - using ServiceControl.MessageFailures.Api; - - class When_ingesting_failed_message_with_missing_headers : AcceptanceTest + [Test] + public async Task Should_be_ingested_when_minimal_required_headers_is_present() { - [Test] - public async Task Should_be_ingested_when_minimal_required_headers_is_present() - { - FailedMessageView failure = null; - - var testStartTime = DateTime.UtcNow; - - await Define(c => - { - c.AddMinimalRequiredHeaders(); - }) - .WithEndpoint() - .Done(async c => - { - var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); - failure = result; - return (c.UniqueMessageId != null) & result; - }) - .Run(); - - Assert.That(failure, Is.Not.Null); - Assert.That(failure.TimeSent, Is.Null); - - //No failure time will result in utc now being used - Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); - - // Both host and endpoint name is currently needed so this will be null since no host can be detected from the failed q header - Assert.That(failure.ReceivingEndpoint, Is.Null); - } + var testStartTime = DateTime.UtcNow; - [Test] - public async Task Should_include_headers_required_by_ServicePulse() - { - FailedMessageView failure = null; - - var context = await Define(c => - { - c.AddMinimalRequiredHeaders(); - - // This is needed for ServiceControl to be able to detect both endpoint (via failed q header) and host via the processing machine header - // Missing endpoint or host will cause a null ref in ServicePulse - c.Headers[Headers.ProcessingMachine] = "MyMachine"; - - c.Headers["NServiceBus.ExceptionInfo.ExceptionType"] = "SomeExceptionType"; - c.Headers["NServiceBus.ExceptionInfo.Message"] = "Some message"; - }) - .WithEndpoint() - .Done(async c => - { - var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); - failure = result; - return (c.UniqueMessageId != null) & result; - }) - .Run(); - - Assert.That(failure, Is.Not.Null); - - // ServicePulse assumes that the receiving endpoint name is present - Assert.That(failure.ReceivingEndpoint, Is.Not.Null); - Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); - Assert.That(failure.ReceivingEndpoint.Host, Is.EqualTo("MyMachine")); - - // ServicePulse needs both an exception type and description to render the UI in a resonable way - Assert.That(failure.Exception.ExceptionType, Is.EqualTo("SomeExceptionType")); - Assert.That(failure.Exception.Message, Is.EqualTo("Some message")); - } + var context = await Define(c => c.AddMinimalRequiredHeaders()) + .WithEndpoint() + .Done(async c => await TryGetFailureFromApi(c)) + .Run(); - [Test] - public async Task TimeSent_should_not_be_casted() - { - FailedMessageView failure = null; - - var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); - - await Define(c => - { - c.AddMinimalRequiredHeaders(); - c.Headers.Add("NServiceBus.TimeSent", DateTimeOffsetHelper.ToWireFormattedString(sentTime)); - }) - .WithEndpoint() - .Done(async c => - { - var result = await this.TryGet($"/api/errors/last/{c.UniqueMessageId}"); - failure = result; - return (c.UniqueMessageId != null) & result; - }) - .Run(); - - Assert.That(failure, Is.Not.Null); - Assert.That(failure.TimeSent, Is.EqualTo(sentTime)); - } + var failure = context.Failure; - class MyContext : ScenarioContext - { - public string MessageId { get; } = Guid.NewGuid().ToString(); + Assert.That(failure, Is.Not.Null); + Assert.That(failure.TimeSent, Is.Null); - public string EndpointNameOfReceivingEndpoint => "MyEndpoint"; + //No failure time will result in utc now being used + Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime)); - public string UniqueMessageId => DeterministicGuid.MakeId(MessageId, EndpointNameOfReceivingEndpoint).ToString(); + // Both host and endpoint name is currently needed so this will be null since no host can be detected from the failed q header + Assert.That(failure.ReceivingEndpoint, Is.Null); + } - public Dictionary Headers { get; } = []; + [Test] + public async Task Should_include_headers_required_by_ServicePulse() + { + var context = await Define(c => + { + c.AddMinimalRequiredHeaders(); + + // This is needed for ServiceControl to be able to detect both endpoint (via failed q header) and host via the processing machine header + // Missing endpoint or host will cause a null ref in ServicePulse + c.Headers[Headers.ProcessingMachine] = "MyMachine"; + + c.Headers["NServiceBus.ExceptionInfo.ExceptionType"] = "SomeExceptionType"; + c.Headers["NServiceBus.ExceptionInfo.Message"] = "Some message"; + }) + .WithEndpoint() + .Done(async c => await TryGetFailureFromApi(c)) + .Run(); + + var failure = context.Failure; + + Assert.That(failure, Is.Not.Null); + + // ServicePulse assumes that the receiving endpoint name is present + Assert.That(failure.ReceivingEndpoint, Is.Not.Null); + Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint)); + Assert.That(failure.ReceivingEndpoint.Host, Is.EqualTo("MyMachine")); + + // ServicePulse needs both an exception type and description to render the UI in a resonable way + Assert.That(failure.Exception.ExceptionType, Is.EqualTo("SomeExceptionType")); + Assert.That(failure.Exception.Message, Is.EqualTo("Some message")); + } + + [Test] + public async Task TimeSent_should_not_be_casted() + { + var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z"); - public void AddMinimalRequiredHeaders() + var context = await Define(c => { - Headers["NServiceBus.FailedQ"] = EndpointNameOfReceivingEndpoint; - Headers[NServiceBus.Headers.MessageId] = MessageId; - } - } + c.AddMinimalRequiredHeaders(); + c.Headers.Add("NServiceBus.TimeSent", DateTimeOffsetHelper.ToWireFormattedString(sentTime)); + }) + .WithEndpoint() + .Done(async c => await TryGetFailureFromApi(c)) + .Run(); + + var failure = context.Failure; + + Assert.That(failure, Is.Not.Null); + Assert.That(failure.TimeSent, Is.EqualTo(sentTime)); + } + + async Task TryGetFailureFromApi(TestContext context) + { + context.Failure = await this.TryGet($"/api/errors/last/{context.UniqueMessageId}"); + return context.Failure != null; + } + + class TestContext : ScenarioContext + { + public string MessageId { get; } = Guid.NewGuid().ToString(); - class FailingEndpoint : EndpointConfigurationBuilder + public string EndpointNameOfReceivingEndpoint => "MyEndpoint"; + + public string UniqueMessageId => DeterministicGuid.MakeId(MessageId, EndpointNameOfReceivingEndpoint).ToString(); + + public Dictionary Headers { get; } = []; + + public FailedMessageView Failure { get; set; } + + public void AddMinimalRequiredHeaders() { - public FailingEndpoint() => EndpointSetup(c => c.Recoverability().Delayed(x => x.NumberOfRetries(0))); + Headers["NServiceBus.FailedQ"] = EndpointNameOfReceivingEndpoint; + Headers[NServiceBus.Headers.MessageId] = MessageId; + } + } + + class FailingEndpoint : EndpointConfigurationBuilder + { + public FailingEndpoint() => EndpointSetup(); - class SendFailedMessage : DispatchRawMessages + class SendFailedMessage : DispatchRawMessages + { + protected override TransportOperations CreateMessage(TestContext context) { - protected override TransportOperations CreateMessage(MyContext context) - { - var outgoingMessage = new OutgoingMessage(context.MessageId, context.Headers, Array.Empty()); + var outgoingMessage = new OutgoingMessage(context.MessageId, context.Headers, Array.Empty()); - return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error"))); - } + return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error"))); } } } From 9c8a219ec4b09bb297dbd3681aa0a0f548c6b98b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96hlund?= Date: Fri, 25 Apr 2025 10:26:50 +0200 Subject: [PATCH 11/11] Use header key constants --- ...When_ingesting_failed_message_with_missing_headers.cs | 9 +++++---- .../Recoverability/DetectSuccessfulRetriesEnricher.cs | 3 ++- src/ServiceControl/Operations/EndpointDetailsParser.cs | 7 ++++--- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs index 745d629e3c..d79b88333c 100644 --- a/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs +++ b/src/ServiceControl.AcceptanceTests/Recoverability/MessageFailures/When_ingesting_failed_message_with_missing_headers.cs @@ -8,6 +8,7 @@ using Infrastructure; using NServiceBus; using NServiceBus.AcceptanceTesting; +using NServiceBus.Faults; using NServiceBus.Routing; using NServiceBus.Transport; using NUnit.Framework; @@ -48,8 +49,8 @@ public async Task Should_include_headers_required_by_ServicePulse() // Missing endpoint or host will cause a null ref in ServicePulse c.Headers[Headers.ProcessingMachine] = "MyMachine"; - c.Headers["NServiceBus.ExceptionInfo.ExceptionType"] = "SomeExceptionType"; - c.Headers["NServiceBus.ExceptionInfo.Message"] = "Some message"; + c.Headers[FaultsHeaderKeys.ExceptionType] = "SomeExceptionType"; + c.Headers[FaultsHeaderKeys.Message] = "Some message"; }) .WithEndpoint() .Done(async c => await TryGetFailureFromApi(c)) @@ -77,7 +78,7 @@ public async Task TimeSent_should_not_be_casted() var context = await Define(c => { c.AddMinimalRequiredHeaders(); - c.Headers.Add("NServiceBus.TimeSent", DateTimeOffsetHelper.ToWireFormattedString(sentTime)); + c.Headers.Add(Headers.TimeSent, DateTimeOffsetHelper.ToWireFormattedString(sentTime)); }) .WithEndpoint() .Done(async c => await TryGetFailureFromApi(c)) @@ -109,7 +110,7 @@ class TestContext : ScenarioContext public void AddMinimalRequiredHeaders() { - Headers["NServiceBus.FailedQ"] = EndpointNameOfReceivingEndpoint; + Headers[FaultsHeaderKeys.FailedQ] = EndpointNameOfReceivingEndpoint; Headers[NServiceBus.Headers.MessageId] = MessageId; } } diff --git a/src/ServiceControl.Audit/Recoverability/DetectSuccessfulRetriesEnricher.cs b/src/ServiceControl.Audit/Recoverability/DetectSuccessfulRetriesEnricher.cs index d3f17194c5..618c36c765 100644 --- a/src/ServiceControl.Audit/Recoverability/DetectSuccessfulRetriesEnricher.cs +++ b/src/ServiceControl.Audit/Recoverability/DetectSuccessfulRetriesEnricher.cs @@ -7,6 +7,7 @@ using Contracts.MessageFailures; using Infrastructure; using NServiceBus; + using NServiceBus.Faults; using NServiceBus.Routing; using NServiceBus.Transport; using ServiceControl.Audit.Persistence.Infrastructure; @@ -64,7 +65,7 @@ IEnumerable GetAlternativeUniqueMessageId(IReadOnlyDictionary endpointDetails.Name = s); - DictionaryExtensions.CheckIfKeyExists("NServiceBus.OriginatingMachine", headers, s => endpointDetails.Host = s); + DictionaryExtensions.CheckIfKeyExists(Headers.OriginatingMachine, headers, s => endpointDetails.Host = s); DictionaryExtensions.CheckIfKeyExists(Headers.OriginatingHostId, headers, s => endpointDetails.HostId = Guid.Parse(s)); if (!string.IsNullOrEmpty(endpointDetails.Name) && !string.IsNullOrEmpty(endpointDetails.Host)) @@ -50,7 +51,7 @@ public static EndpointDetails ReceivingEndpoint(IReadOnlyDictionary endpoint.Host = s); + DictionaryExtensions.CheckIfKeyExists(Headers.ProcessingMachine, headers, s => endpoint.Host = s); } DictionaryExtensions.CheckIfKeyExists(Headers.ProcessingEndpoint, headers, s => endpoint.Name = s); @@ -62,7 +63,7 @@ public static EndpointDetails ReceivingEndpoint(IReadOnlyDictionary address = s); + DictionaryExtensions.CheckIfKeyExists(FaultsHeaderKeys.FailedQ, headers, s => address = s); // If we have a failed queue, then construct an endpoint from the failed queue information if (address != null)