Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
namespace ServiceControl.AcceptanceTests.Recoverability.MessageFailures;

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.EndpointTemplates;
using Infrastructure;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.Faults;
using NServiceBus.Routing;
using NServiceBus.Transport;
using NUnit.Framework;
using ServiceControl.MessageFailures.Api;

class When_ingesting_failed_message_with_missing_headers : AcceptanceTest
{
[Test]
public async Task Should_be_ingested_when_minimal_required_headers_is_present()
{
var testStartTime = DateTime.UtcNow;

var context = await Define<TestContext>(c => c.AddMinimalRequiredHeaders())
.WithEndpoint<FailingEndpoint>()
.Done(async c => await TryGetFailureFromApi(c))
.Run();

var failure = context.Failure;

Assert.That(failure, Is.Not.Null);
Assert.That(failure.TimeSent, Is.Null);

//No failure time will result in utc now being used
Assert.That(failure.TimeOfFailure, Is.GreaterThan(testStartTime));

// Both host and endpoint name is currently needed so this will be null since no host can be detected from the failed q header
Assert.That(failure.ReceivingEndpoint, Is.Null);
}

[Test]
public async Task Should_include_headers_required_by_ServicePulse()
{
var context = await Define<TestContext>(c =>
{
c.AddMinimalRequiredHeaders();

// This is needed for ServiceControl to be able to detect both endpoint (via failed q header) and host via the processing machine header
// Missing endpoint or host will cause a null ref in ServicePulse
c.Headers[Headers.ProcessingMachine] = "MyMachine";

c.Headers[FaultsHeaderKeys.ExceptionType] = "SomeExceptionType";
c.Headers[FaultsHeaderKeys.Message] = "Some message";
})
.WithEndpoint<FailingEndpoint>()
.Done(async c => await TryGetFailureFromApi(c))
.Run();

var failure = context.Failure;

Assert.That(failure, Is.Not.Null);

// ServicePulse assumes that the receiving endpoint name is present
Assert.That(failure.ReceivingEndpoint, Is.Not.Null);
Assert.That(failure.ReceivingEndpoint.Name, Is.EqualTo(context.EndpointNameOfReceivingEndpoint));
Assert.That(failure.ReceivingEndpoint.Host, Is.EqualTo("MyMachine"));

// ServicePulse needs both an exception type and description to render the UI in a resonable way
Assert.That(failure.Exception.ExceptionType, Is.EqualTo("SomeExceptionType"));
Assert.That(failure.Exception.Message, Is.EqualTo("Some message"));
}

[Test]
public async Task TimeSent_should_not_be_casted()
{
var sentTime = DateTime.Parse("2014-11-11T02:26:58.000462Z");

var context = await Define<TestContext>(c =>
{
c.AddMinimalRequiredHeaders();
c.Headers.Add(Headers.TimeSent, DateTimeOffsetHelper.ToWireFormattedString(sentTime));
})
.WithEndpoint<FailingEndpoint>()
.Done(async c => await TryGetFailureFromApi(c))
.Run();

var failure = context.Failure;

Assert.That(failure, Is.Not.Null);
Assert.That(failure.TimeSent, Is.EqualTo(sentTime));
}

async Task<bool> TryGetFailureFromApi(TestContext context)
{
context.Failure = await this.TryGet<FailedMessageView>($"/api/errors/last/{context.UniqueMessageId}");
return context.Failure != null;
}

class TestContext : ScenarioContext
{
public string MessageId { get; } = Guid.NewGuid().ToString();

public string EndpointNameOfReceivingEndpoint => "MyEndpoint";

public string UniqueMessageId => DeterministicGuid.MakeId(MessageId, EndpointNameOfReceivingEndpoint).ToString();

public Dictionary<string, string> Headers { get; } = [];

public FailedMessageView Failure { get; set; }

public void AddMinimalRequiredHeaders()
{
Headers[FaultsHeaderKeys.FailedQ] = EndpointNameOfReceivingEndpoint;
Headers[NServiceBus.Headers.MessageId] = MessageId;
}
}

class FailingEndpoint : EndpointConfigurationBuilder
{
public FailingEndpoint() => EndpointSetup<DefaultServerWithoutAudit>();

class SendFailedMessage : DispatchRawMessages<TestContext>
{
protected override TransportOperations CreateMessage(TestContext context)
{
var outgoingMessage = new OutgoingMessage(context.MessageId, context.Headers, Array.Empty<byte>());

return new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag("error")));
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Contracts.MessageFailures;
using Infrastructure;
using NServiceBus;
using NServiceBus.Faults;
using NServiceBus.Routing;
using NServiceBus.Transport;
using ServiceControl.Audit.Persistence.Infrastructure;
Expand Down Expand Up @@ -64,7 +65,7 @@ IEnumerable<string> GetAlternativeUniqueMessageId(IReadOnlyDictionary<string, st
yield return DeterministicGuid.MakeId(messageId, processingEndpoint).ToString();
}

if (headers.TryGetValue("NServiceBus.FailedQ", out var failedQ))
if (headers.TryGetValue(FaultsHeaderKeys.FailedQ, out var failedQ))
{
yield return DeterministicGuid.MakeId(messageId, ExtractQueueNameForLegacyReasons(failedQ)).ToString();
}
Expand Down
7 changes: 4 additions & 3 deletions src/ServiceControl/Operations/EndpointDetailsParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace ServiceControl.Contracts.Operations
using System.Collections.Generic;
using Infrastructure;
using NServiceBus;
using NServiceBus.Faults;
using ServiceControl.Operations;

class EndpointDetailsParser
Expand All @@ -13,7 +14,7 @@ public static EndpointDetails SendingEndpoint(IReadOnlyDictionary<string, string
var endpointDetails = new EndpointDetails();

DictionaryExtensions.CheckIfKeyExists(Headers.OriginatingEndpoint, headers, s => endpointDetails.Name = s);
DictionaryExtensions.CheckIfKeyExists("NServiceBus.OriginatingMachine", headers, s => endpointDetails.Host = s);
DictionaryExtensions.CheckIfKeyExists(Headers.OriginatingMachine, headers, s => endpointDetails.Host = s);
DictionaryExtensions.CheckIfKeyExists(Headers.OriginatingHostId, headers, s => endpointDetails.HostId = Guid.Parse(s));

if (!string.IsNullOrEmpty(endpointDetails.Name) && !string.IsNullOrEmpty(endpointDetails.Host))
Expand Down Expand Up @@ -50,7 +51,7 @@ public static EndpointDetails ReceivingEndpoint(IReadOnlyDictionary<string, stri
}
else
{
DictionaryExtensions.CheckIfKeyExists("NServiceBus.ProcessingMachine", headers, s => endpoint.Host = s);
DictionaryExtensions.CheckIfKeyExists(Headers.ProcessingMachine, headers, s => endpoint.Host = s);
}

DictionaryExtensions.CheckIfKeyExists(Headers.ProcessingEndpoint, headers, s => endpoint.Name = s);
Expand All @@ -62,7 +63,7 @@ public static EndpointDetails ReceivingEndpoint(IReadOnlyDictionary<string, stri

string address = null;
//use the failed q to determine the receiving endpoint
DictionaryExtensions.CheckIfKeyExists("NServiceBus.FailedQ", headers, s => address = s);
DictionaryExtensions.CheckIfKeyExists(FaultsHeaderKeys.FailedQ, headers, s => address = s);

// If we have a failed queue, then construct an endpoint from the failed queue information
if (address != null)
Expand Down