Skip to content

Commit

Permalink
Add message related properties to transactions and spans (#1525)
Browse files Browse the repository at this point in the history
Closes #1512
  • Loading branch information
russcam committed Oct 19, 2021
1 parent cbd4871 commit 64c27a8
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 17 deletions.
Expand Up @@ -112,6 +112,9 @@ private void OnProcessStart(KeyValuePair<string, object> kv, string action)
var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
transaction.Context.Service = new Service(null, null) { Framework = _framework };

if (queueName != null)
transaction.Context.Message = new Message { Queue = new Queue { Name = queueName } };

// transaction creation will create an activity, so use this as the key.
var activityId = Activity.Current.Id;

Expand Down Expand Up @@ -158,11 +161,15 @@ private void OnReceiveStart(KeyValuePair<string, object> kv, string action)
{
var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
transaction.Context.Service = new Service(null, null) { Framework = _framework };
if (queueName != null)
transaction.Context.Message = new Message { Queue = new Queue { Name = queueName } };
segment = transaction;
}
else
{
var span = ApmAgent.GetCurrentExecutionSegment().StartSpan(transactionName, ApiConstants.TypeMessaging, ServiceBus.SubType, action);
if (queueName != null)
span.Context.Message = new Message { Queue = new Queue { Name = queueName } };
segment = span;
}

Expand Down Expand Up @@ -249,6 +256,9 @@ private void OnSendStart(KeyValuePair<string, object> kv, string action)
}
};

if (queueName != null)
span.Context.Message = new Message { Queue = new Queue { Name = queueName } };

if (!_processingSegments.TryAdd(activity.Id, span))
{
Logger.Trace()?.Log(
Expand Down
Expand Up @@ -116,6 +116,9 @@ private void OnProcessStart(KeyValuePair<string, object> kv, string action, Prop
var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
transaction.Context.Service = new Service(null, null) { Framework = _framework };

if (queueName != null)
transaction.Context.Message = new Message { Queue = new Queue { Name = queueName } };

// transaction creation will create an activity, so use this as the key.
var activityId = Activity.Current.Id;

Expand Down Expand Up @@ -150,11 +153,15 @@ private void OnReceiveStart(KeyValuePair<string, object> kv, string action, Prop
{
var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging);
transaction.Context.Service = new Service(null, null) { Framework = _framework };
if (queueName != null)
transaction.Context.Message = new Message { Queue = new Queue { Name = queueName } };
segment = transaction;
}
else
{
var span = ApmAgent.GetCurrentExecutionSegment().StartSpan(transactionName, ApiConstants.TypeMessaging, ServiceBus.SubType, action);
if (queueName != null)
span.Context.Message = new Message { Queue = new Queue { Name = queueName } };
segment = span;
}

Expand Down Expand Up @@ -229,6 +236,9 @@ private void OnSendStart(KeyValuePair<string, object> kv, string action, Propert
}
};

if (queueName != null)
span.Context.Message = new Message { Queue = new Queue { Name = queueName } };

if (!_processingSegments.TryAdd(activity.Id, span))
{
Logger.Trace()?.Log(
Expand Down
5 changes: 5 additions & 0 deletions src/Elastic.Apm/Api/Context.cs
Expand Up @@ -27,6 +27,11 @@ public class Context
[JsonProperty("tags")]
public Dictionary<string, string> Labels => InternalLabels.Value;

/// <summary>
/// Holds details related to message receiving and publishing if the captured event integrates with a messaging system
/// </summary>
public Message Message { get; set; }

/// <summary>
/// If a log record was generated as a result of a http request, the http interface can be used to collect this
/// information.
Expand Down
62 changes: 62 additions & 0 deletions src/Elastic.Apm/Api/Message.cs
@@ -0,0 +1,62 @@
// Licensed to Elasticsearch B.V under
// one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Collections.Generic;
using Elastic.Apm.Api.Constraints;

namespace Elastic.Apm.Api
{
/// <summary>
/// Holds details related to message receiving and publishing if the captured event integrates with a messaging system
/// </summary>
public class Message
{
/// <summary>
/// Body of the received message
/// </summary>
public string Body { get; set; }

/// <summary>
/// Headers received with the message
/// </summary>
public Dictionary<string, string> Headers { get; set; }

/// <inheritdoc cref="Api.Age"/>
public Age Age { get; set; }

/// <see cref="Api.Queue"/>
public Queue Queue { get; set; }

/// <summary>
/// optional routing key of the received message as set on the queuing system, such as in RabbitMQ.
/// </summary>
public string RoutingKey { get; set; }
}

/// <summary>
/// Age of the message. If the monitored messaging framework provides a timestamp for the message, agents may use it.
/// Otherwise, the sending agent can add a timestamp in milliseconds since the Unix epoch to the message's metadata to be retrieved by the
/// receiving agent. If a timestamp is not available, agents should omit this field.
/// </summary>
public class Age
{
/// <summary>
/// Age of the message in milliseconds.
/// </summary>
public long Ms { get; set; }
}

/// <summary>
/// Information about the message queue where the message is received.
/// </summary>
public class Queue
{
/// <summary>
/// Name of the message queue where the message is received
/// </summary>
[MaxLength]
public string Name { get; set; }
}
}
1 change: 0 additions & 1 deletion src/Elastic.Apm/Api/Request.cs
Expand Up @@ -7,7 +7,6 @@
using System.Linq;
using Elastic.Apm.Helpers;
using Elastic.Apm.Libraries.Newtonsoft.Json;
using Elastic.Apm.Libraries.Newtonsoft.Json.Serialization;

namespace Elastic.Apm.Api
{
Expand Down
1 change: 1 addition & 0 deletions src/Elastic.Apm/Api/SpanContext.cs
Expand Up @@ -17,6 +17,7 @@ public class SpanContext
public Database Db { get; set; }
public Destination Destination { get; set; }
public Http Http { get; set; }
public Message Message { get; set; }

/// <summary>
/// <seealso cref="ShouldSerializeLabels" />
Expand Down
20 changes: 16 additions & 4 deletions src/Elastic.Apm/Filters/ErrorContextSanitizerFilter.cs
Expand Up @@ -18,12 +18,24 @@ internal class ErrorContextSanitizerFilter
{
public IError Filter(IError error)
{
if (error is Error realError && realError.Context?.Request?.Headers != null && realError.Configuration != null)
if (error is Error realError && realError.Configuration != null)
{
foreach (var key in realError.Context.Request.Headers.Keys.ToList())
if (realError.Context?.Request?.Headers != null)
{
if (WildcardMatcher.IsAnyMatch(realError.Configuration.SanitizeFieldNames, key))
realError.Context.Request.Headers[key] = Consts.Redacted;
foreach (var key in realError.Context.Request.Headers.Keys.ToList())
{
if (WildcardMatcher.IsAnyMatch(realError.Configuration.SanitizeFieldNames, key))
realError.Context.Request.Headers[key] = Consts.Redacted;
}
}

if (realError.Context?.Message?.Headers != null)
{
foreach (var key in realError.Context.Message.Headers.Keys.ToList())
{
if (WildcardMatcher.IsAnyMatch(realError.Configuration.SanitizeFieldNames, key))
realError.Context.Message.Headers[key] = Consts.Redacted;
}
}
}

Expand Down
20 changes: 16 additions & 4 deletions src/Elastic.Apm/Filters/HeaderDictionarySanitizerFilter.cs
Expand Up @@ -20,12 +20,24 @@ public ITransaction Filter(ITransaction transaction)
{
if (transaction is Transaction realTransaction)
{
if (realTransaction.IsContextCreated && realTransaction.Context.Request?.Headers != null)
if (realTransaction.IsContextCreated)
{
foreach (var key in realTransaction.Context?.Request?.Headers?.Keys.ToList())
if (realTransaction.Context.Request?.Headers != null)
{
if (WildcardMatcher.IsAnyMatch(realTransaction.Configuration.SanitizeFieldNames, key))
realTransaction.Context.Request.Headers[key] = Consts.Redacted;
foreach (var key in realTransaction.Context.Request.Headers.Keys.ToList())
{
if (WildcardMatcher.IsAnyMatch(realTransaction.Configuration.SanitizeFieldNames, key))
realTransaction.Context.Request.Headers[key] = Consts.Redacted;
}
}

if (realTransaction.Context.Message?.Headers != null)
{
foreach (var key in realTransaction.Context.Message.Headers.Keys.ToList())
{
if (WildcardMatcher.IsAnyMatch(realTransaction.Configuration.SanitizeFieldNames, key))
realTransaction.Context.Message.Headers[key] = Consts.Redacted;
}
}
}
}
Expand Down
Expand Up @@ -64,6 +64,10 @@ public async Task Capture_Span_When_Send_To_Queue()
destination.Service.Name.Should().Be(ServiceBus.SubType);
destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.QueueName}");
destination.Service.Type.Should().Be(ApiConstants.TypeMessaging);

span.Context.Message.Should().NotBeNull();
span.Context.Message.Queue.Should().NotBeNull();
span.Context.Message.Queue.Name.Should().Be(scope.QueueName);
}

[AzureCredentialsFact]
Expand Down Expand Up @@ -93,6 +97,10 @@ public async Task Capture_Span_When_Send_To_Topic()
destination.Service.Name.Should().Be(ServiceBus.SubType);
destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.TopicName}");
destination.Service.Type.Should().Be(ApiConstants.TypeMessaging);

span.Context.Message.Should().NotBeNull();
span.Context.Message.Queue.Should().NotBeNull();
span.Context.Message.Queue.Name.Should().Be(scope.TopicName);
}

[AzureCredentialsFact]
Expand Down Expand Up @@ -124,6 +132,10 @@ public async Task Capture_Span_When_Schedule_To_Queue()
destination.Service.Name.Should().Be(ServiceBus.SubType);
destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.QueueName}");
destination.Service.Type.Should().Be(ApiConstants.TypeMessaging);

span.Context.Message.Should().NotBeNull();
span.Context.Message.Queue.Should().NotBeNull();
span.Context.Message.Queue.Name.Should().Be(scope.QueueName);
}

[AzureCredentialsFact]
Expand Down Expand Up @@ -155,6 +167,10 @@ public async Task Capture_Span_When_Schedule_To_Topic()
destination.Service.Name.Should().Be(ServiceBus.SubType);
destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.TopicName}");
destination.Service.Type.Should().Be(ApiConstants.TypeMessaging);

span.Context.Message.Should().NotBeNull();
span.Context.Message.Queue.Should().NotBeNull();
span.Context.Message.Queue.Name.Should().Be(scope.TopicName);
}

[AzureCredentialsFact]
Expand Down Expand Up @@ -182,6 +198,10 @@ public async Task Capture_Span_When_Receive_From_Queue_Inside_Transaction()
span.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}");
span.Type.Should().Be(ApiConstants.TypeMessaging);
span.Subtype.Should().Be(ServiceBus.SubType);

span.Context.Message.Should().NotBeNull();
span.Context.Message.Queue.Should().NotBeNull();
span.Context.Message.Queue.Name.Should().Be(scope.QueueName);
}

[AzureCredentialsFact]
Expand All @@ -204,6 +224,10 @@ public async Task Capture_Transaction_When_Receive_From_Queue()

transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}");
transaction.Type.Should().Be(ApiConstants.TypeMessaging);

transaction.Context.Message.Should().NotBeNull();
transaction.Context.Message.Queue.Should().NotBeNull();
transaction.Context.Message.Queue.Name.Should().Be(scope.QueueName);
}

[AzureCredentialsFact]
Expand All @@ -224,9 +248,13 @@ public async Task Capture_Transaction_When_Receive_From_Topic_Subscription()

_sender.Transactions.Should().HaveCount(1);
var transaction = _sender.FirstTransaction;

transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}");
var subscription = $"{scope.TopicName}/Subscriptions/{scope.SubscriptionName}";
transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {subscription}");
transaction.Type.Should().Be(ApiConstants.TypeMessaging);

transaction.Context.Message.Should().NotBeNull();
transaction.Context.Message.Queue.Should().NotBeNull();
transaction.Context.Message.Queue.Name.Should().Be(subscription);
}

[AzureCredentialsFact]
Expand All @@ -242,8 +270,6 @@ public async Task Capture_Transaction_When_ReceiveDeferred_From_Queue()

var message = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
await receiver.DeferMessageAsync(message).ConfigureAwait(false);


await receiver.ReceiveDeferredMessageAsync(message.SequenceNumber).ConfigureAwait(false);

if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2), count: 2))
Expand All @@ -255,6 +281,10 @@ public async Task Capture_Transaction_When_ReceiveDeferred_From_Queue()
transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}");
transaction.Type.Should().Be(ApiConstants.TypeMessaging);

transaction.Context.Message.Should().NotBeNull();
transaction.Context.Message.Queue.Should().NotBeNull();
transaction.Context.Message.Queue.Name.Should().Be(scope.QueueName);

var secondTransaction = _sender.Transactions[1];
secondTransaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVEDEFERRED from {scope.QueueName}");
secondTransaction.Type.Should().Be(ApiConstants.TypeMessaging);
Expand Down Expand Up @@ -282,9 +312,14 @@ public async Task Capture_Transaction_When_ReceiveDeferred_From_Topic_Subscripti
_sender.Transactions.Should().HaveCount(2);

var transaction = _sender.FirstTransaction;
transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}");
var subscription = $"{scope.TopicName}/Subscriptions/{scope.SubscriptionName}";
transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {subscription}");
transaction.Type.Should().Be(ApiConstants.TypeMessaging);

transaction.Context.Message.Should().NotBeNull();
transaction.Context.Message.Queue.Should().NotBeNull();
transaction.Context.Message.Queue.Name.Should().Be(subscription);

var secondTransaction = _sender.Transactions[1];
secondTransaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVEDEFERRED from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}");
secondTransaction.Type.Should().Be(ApiConstants.TypeMessaging);
Expand Down Expand Up @@ -353,6 +388,10 @@ public async Task Capture_Transaction_When_ProcessMessage_From_Queue()

foreach (var transaction in processTransactions)
{
transaction.Context.Message.Should().NotBeNull();
transaction.Context.Message.Queue.Should().NotBeNull();
transaction.Context.Message.Queue.Name.Should().Be(scope.QueueName);

var spans = _sender.Spans.Where(s => s.TransactionId == transaction.Id).ToList();
spans.Should().HaveCount(1);
}
Expand Down Expand Up @@ -409,6 +448,10 @@ public async Task Capture_Transaction_When_ProcessSessionMessage_From_Queue()

foreach (var transaction in processTransactions)
{
transaction.Context.Message.Should().NotBeNull();
transaction.Context.Message.Queue.Should().NotBeNull();
transaction.Context.Message.Queue.Name.Should().Be(scope.QueueName);

var spans = _sender.Spans.Where(s => s.TransactionId == transaction.Id).ToList();
spans.Should().HaveCount(1);
}
Expand Down

0 comments on commit 64c27a8

Please sign in to comment.