Skip to content

Commit

Permalink
Integrating Publish and Consuming telemetry.
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Apr 14, 2024
1 parent d317dbc commit d0021f8
Show file tree
Hide file tree
Showing 19 changed files with 255 additions and 173 deletions.
2 changes: 1 addition & 1 deletion RabbitMQ.Dataflows.sln
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "rabbitmq", "rabbitmq", "{5C
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "OpenTelemetry.Console.Tests", "tests\OpenTelemetry.Console.Tests\OpenTelemetry.Console.Tests.csproj", "{077E07C3-9A35-42A0-8228-E9778F02DFCE}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.ConsumerDataflows.Tests", "tests\RabbitMQ.ConsumerDataflows.Tests\RabbitMQ.ConsumerDataflows.Tests.csproj", "{F6C0B657-E70B-4DE9-96AE-E7612C89AF5F}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.ConsumerDataflowService", "tests\RabbitMQ.ConsumerDataflows.Tests\RabbitMQ.ConsumerDataflowService.csproj", "{F6C0B657-E70B-4DE9-96AE-E7612C89AF5F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
6 changes: 5 additions & 1 deletion src/HouseofCat.Dataflows/BaseDataflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ protected void SetCurrentSourceBlock(IDataflowBlock block)
_currentBlock = (ISourceBlock<TState>)block;
}

protected ExecutionDataflowBlockOptions GetExecuteStepOptions(int? maxDoP, bool? ensureOrdered, int? boundedCapacity, TaskScheduler taskScheduler = null)
protected ExecutionDataflowBlockOptions GetExecuteStepOptions(
int? maxDoP,
bool? ensureOrdered,
int? boundedCapacity,
TaskScheduler taskScheduler = null)
{
if (maxDoP.HasValue || ensureOrdered.HasValue || boundedCapacity.HasValue)
{
Expand Down
22 changes: 11 additions & 11 deletions src/HouseofCat.Dataflows/Extensions/WorkStateExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ public static class WorkStateExtensions
{
public static void SetOpenTelemetryError(this IWorkState state, string message = null)
{
if (state is null) return;
state.SetCurrentActivityAsError(message);

if (state.WorkflowSpan is null) return;
state.SetCurrentSpanAsError(message);
if (state.WorkflowSpan is null || !state.WorkflowSpan.IsRecording) return;
state.SetSpanAsError(state.WorkflowSpan, message);
}

public static void SetCurrentActivityAsError(this IWorkState state, string message = null)
Expand Down Expand Up @@ -91,12 +88,14 @@ public static void SetSpanAsError(this IWorkState state, TelemetrySpan span, str
parentSpanContext.Value,
attributes: attributes);
}

state.WorkflowSpan = OpenTelemetryHelpers
.StartRootSpan(
spanName,
spanKind,
attributes: attributes);
else
{
state.WorkflowSpan = OpenTelemetryHelpers
.StartRootSpan(
spanName,
spanKind,
attributes: attributes);
}
}

/// <summary>
Expand Down Expand Up @@ -173,6 +172,7 @@ public static void SetSpanAsError(this IWorkState state, TelemetrySpan span, str
{
state.SetOpenTelemetryError();
}
state.WorkflowSpan?.End();
state.WorkflowSpan?.Dispose();
}
}
47 changes: 45 additions & 2 deletions src/HouseofCat.RabbitMQ/Consumer/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ protected virtual async Task ReceiveHandlerAsync(object _, BasicDeliverEventArgs
await HandleMessageAsync(bdea).ConfigureAwait(false);
}

private static readonly string _consumerSpanNameFormat = "{0}.{1}";

protected virtual async ValueTask<bool> HandleMessageAsync(BasicDeliverEventArgs bdea)
{
if (!await _consumerChannel.Writer.WaitToWriteAsync().ConfigureAwait(false)) return false;
Expand All @@ -316,11 +318,13 @@ protected virtual async ValueTask<bool> HandleMessageAsync(BasicDeliverEventArgs
{
var receivedMessage = new ReceivedMessage(_chanHost.Channel, bdea, !ConsumerOptions.AutoAck);
using var span = OpenTelemetryHelpers.StartActiveSpan(
nameof(HandleMessageAsync),
string.Format(_consumerSpanNameFormat, ConsumerOptions.ConsumerName, nameof(HandleMessageAsync)),
SpanKind.Consumer,
receivedMessage.ParentSpanContext ?? default);

receivedMessage.ParentSpanContext = span.Context;
EnrichSpanWithTags(span, receivedMessage);

receivedMessage.ParentSpanContext = span?.Context;

AutoDeserialize(receivedMessage);

Expand All @@ -342,6 +346,45 @@ await _consumerChannel
}
}

protected void EnrichSpanWithTags(TelemetrySpan span, IReceivedMessage receivedMessage)
{
if (span == null || !span.IsRecording) return;

span.SetAttribute(Constants.MessagingSystemKey, Constants.MessagingSystemValue);

if (!string.IsNullOrEmpty(receivedMessage?.Message?.MessageId))
{
span.SetAttribute(Constants.MessagingMessageMessageIdKey, receivedMessage?.Message.MessageId);
}
if (!string.IsNullOrEmpty(ConsumerOptions.ConsumerName))
{
span.SetAttribute(Constants.MessagingConsumerNameKey, ConsumerOptions.ConsumerName);
}
if (!string.IsNullOrEmpty(ConsumerOptions.QueueName))
{
span.SetAttribute(Constants.MessagingMessageRoutingKeyKey, ConsumerOptions.QueueName);
}

if (!string.IsNullOrEmpty(receivedMessage?.Message?.Metadata?.PayloadId))
{
span.SetAttribute(Constants.MessagingMessagePayloadIdKey, receivedMessage?.Message?.Metadata?.PayloadId);
}

var encrypted = receivedMessage?.Message?.Metadata?.Encrypted();
if (encrypted.HasValue && encrypted.Value)
{
span.SetAttribute(Constants.MessagingMessageEncryptedKey, "true");
span.SetAttribute(Constants.MessagingMessageEncryptedDateKey, receivedMessage?.Message?.Metadata?.EncryptedDate());
span.SetAttribute(Constants.MessagingMessageEncryptionKey, receivedMessage?.Message?.Metadata?.EncryptionType());
}
var compressed = receivedMessage?.Message?.Metadata?.Compressed();
if (compressed.HasValue && compressed.Value)
{
span.SetAttribute(Constants.MessagingMessageCompressedKey, "true");
span.SetAttribute(Constants.MessagingMessageCompressionKey, receivedMessage?.Message?.Metadata?.CompressionType());
}
}

protected JsonSerializerOptions _defaultOptions;

protected virtual void AutoDeserialize(ReceivedMessage receivedMessage)
Expand Down
9 changes: 4 additions & 5 deletions src/HouseofCat.RabbitMQ/Dataflows/ConsumerBlock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,14 @@ public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOu
protected virtual async Task PushToBufferBlockAsync(CancellationToken token = default)
{
try
{
while (await Consumer.GetConsumerBuffer().WaitToReadAsync(token).ConfigureAwait(false))
{
var consumerBuffer = Consumer.GetConsumerBuffer();
while (await consumerBuffer.WaitToReadAsync(token).ConfigureAwait(false))
{
while (Consumer.GetConsumerBuffer().TryRead(out var message))
while (consumerBuffer.TryRead(out var message))
{
await _bufferBlock.SendAsync(message, token).ConfigureAwait(false);
}

if (token.IsCancellationRequested) return;
}
}
catch (OperationCanceledException)
Expand Down
35 changes: 24 additions & 11 deletions src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,22 @@ public ConsumerDataflow<TState> WithReadyToProcessBuffer(int boundedCapacity, Ta
return this;
}

protected static readonly string _defaultSpanNameFormat = "{0}.{1}";
protected static readonly string _defaultStepSpanNameFormat = "{0}.{1}.{2}";

protected string GetSpanName(string stepName)
{
return string.Format(_defaultSpanNameFormat, WorkflowName, stepName);
}

protected string GetStepSpanName(string stepName)
{
return string.Format(_defaultStepSpanNameFormat, WorkflowName, _suppliedTransforms.Count, stepName);
}

public ConsumerDataflow<TState> AddStep(
Func<TState, TState> suppliedStep,
string spanName,
string stepName,
int? maxDoP = null,
bool? ensureOrdered = null,
int? boundedCapacity = null,
Expand All @@ -251,13 +264,13 @@ public ConsumerDataflow<TState> WithReadyToProcessBuffer(int boundedCapacity, Ta
Guard.AgainstNull(suppliedStep, nameof(suppliedStep));

var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_suppliedTransforms.Add(GetWrappedTransformBlock(suppliedStep, executionOptions, spanName));
_suppliedTransforms.Add(GetWrappedTransformBlock(suppliedStep, executionOptions, GetStepSpanName(stepName)));
return this;
}

public ConsumerDataflow<TState> AddStep(
Func<TState, Task<TState>> suppliedStep,
string spanName,
string stepName,
int? maxDoP = null,
bool? ensureOrdered = null,
int? boundedCapacity = null,
Expand All @@ -266,7 +279,7 @@ public ConsumerDataflow<TState> WithReadyToProcessBuffer(int boundedCapacity, Ta
Guard.AgainstNull(suppliedStep, nameof(suppliedStep));

var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_suppliedTransforms.Add(GetWrappedTransformBlock(suppliedStep, executionOptions, spanName));
_suppliedTransforms.Add(GetWrappedTransformBlock(suppliedStep, executionOptions, GetStepSpanName(stepName)));
return this;
}

Expand All @@ -288,7 +301,7 @@ public ConsumerDataflow<TState> WithReadyToProcessBuffer(int boundedCapacity, Ta
if (_finalization == null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_finalization = GetLastWrappedActionBlock(action, executionOptions, $"{WorkflowName}.Finalization");
_finalization = GetLastWrappedActionBlock(action, executionOptions, GetSpanName("finalization"));
}
return this;
}
Expand All @@ -304,7 +317,7 @@ public ConsumerDataflow<TState> WithReadyToProcessBuffer(int boundedCapacity, Ta
if (_finalization == null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_finalization = GetLastWrappedActionBlock(action, executionOptions, $"{WorkflowName}.Finalization");
_finalization = GetLastWrappedActionBlock(action, executionOptions, GetSpanName("finalization"));
}
return this;
}
Expand Down Expand Up @@ -339,7 +352,7 @@ public ConsumerDataflow<TState> WithReadyToProcessBuffer(int boundedCapacity, Ta
executionOptions,
false,
x => x.ReceivedMessage.Encrypted,
$"{WorkflowName}_Decrypt");
GetSpanName("decrypt"));
}
return this;
}
Expand All @@ -360,7 +373,7 @@ public ConsumerDataflow<TState> WithReadyToProcessBuffer(int boundedCapacity, Ta
executionOptions,
false,
x => x.ReceivedMessage.Compressed,
$"{WorkflowName}_Decompress");
GetSpanName("decompress"));
}

return this;
Expand All @@ -376,7 +389,7 @@ public ConsumerDataflow<TState> WithReadyToProcessBuffer(int boundedCapacity, Ta
if (_createSendMessage == null)
{
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_createSendMessage = GetWrappedTransformBlock(createMessage, executionOptions, $"{WorkflowName}_CreateSendMessage");
_createSendMessage = GetWrappedTransformBlock(createMessage, executionOptions, GetSpanName("create_send_message"));
}
return this;
}
Expand All @@ -397,7 +410,7 @@ public ConsumerDataflow<TState> WithReadyToProcessBuffer(int boundedCapacity, Ta
executionOptions,
true,
x => !x.ReceivedMessage.Compressed,
$"{WorkflowName}_Compress");
GetSpanName("compress"));
}
return this;
}
Expand All @@ -418,7 +431,7 @@ public ConsumerDataflow<TState> WithReadyToProcessBuffer(int boundedCapacity, Ta
executionOptions,
true,
x => !x.ReceivedMessage.Encrypted,
$"{WorkflowName}_Encrypt");
GetSpanName("encrypt"));
}
return this;
}
Expand Down
5 changes: 5 additions & 0 deletions src/HouseofCat.RabbitMQ/Messages/Message.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using HouseofCat.RabbitMQ.Pools;
using HouseofCat.Utilities.Helpers;
using OpenTelemetry.Trace;
using RabbitMQ.Client;
using System;
using System.ComponentModel.DataAnnotations;
Expand Down Expand Up @@ -36,6 +37,8 @@ public interface IMessage
IPublishReceipt GetPublishReceipt(bool error);

IBasicProperties BuildProperties(IChannelHost channelHost, bool withOptionalHeaders, string contentType);

public SpanContext? ParentSpanContext { get; set; }
}

public sealed class Message : IMessage
Expand Down Expand Up @@ -67,6 +70,8 @@ public sealed class Message : IMessage
[JsonIgnore]
public string ContentType { get; set; } = Constants.HeaderValueForContentTypeJson;

public SpanContext? ParentSpanContext { get; set; }

public Message()
{
MessageId ??= Guid.NewGuid().ToString();
Expand Down
7 changes: 3 additions & 4 deletions src/HouseofCat.RabbitMQ/Options/RabbitOptions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Globalization;

namespace HouseofCat.RabbitMQ;

Expand All @@ -23,10 +22,10 @@ public class RabbitOptions

public ConsumerOptions GetConsumerOptions(string consumerName)
{
if (!ConsumerOptions.TryGetValue(consumerName, out ConsumerOptions value))
if (ConsumerOptions.TryGetValue(consumerName, out ConsumerOptions value))
{
throw new ArgumentException(string.Format(ExceptionMessages.NoConsumerOptionsMessage, consumerName));
return value;
}
return value;
throw new ArgumentException(string.Format(ExceptionMessages.NoConsumerOptionsMessage, consumerName));
}
}
18 changes: 16 additions & 2 deletions src/HouseofCat.RabbitMQ/Publisher/Publisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,8 @@ await _channelPool
return error;
}

private static readonly string _defaultSpanName = "messaging.rabbitmq.publisher";

/// <summary>
/// Acquires a channel from the channel pool, then publishes message based on the message parameters.
/// <para>Only throws exception when failing to acquire channel or when creating a receipt after the ReceiptBuffer is closed.</para>
Expand All @@ -573,7 +575,11 @@ await _channelPool
/// <param name="withOptionalHeaders"></param>
public async Task PublishAsync(IMessage message, bool createReceipt, bool withOptionalHeaders = true)
{
using var span = OpenTelemetryHelpers.StartActiveSpan(nameof(PublishAsync), SpanKind.Producer);
using var span = OpenTelemetryHelpers.StartActiveSpan(
_defaultSpanName,
SpanKind.Producer,
message.ParentSpanContext ?? default);

message.EnrichSpanWithTags(span);

var error = false;
Expand Down Expand Up @@ -614,6 +620,8 @@ await CreateReceiptAsync(message, error)

await _channelPool
.ReturnChannelAsync(chanHost, error);

span?.End();
}
}

Expand All @@ -627,7 +635,11 @@ await _channelPool
/// <param name="withOptionalHeaders"></param>
public async Task PublishWithConfirmationAsync(IMessage message, bool createReceipt, bool withOptionalHeaders = true)
{
using var span = OpenTelemetryHelpers.StartActiveSpan(nameof(PublishWithConfirmationAsync), SpanKind.Producer);
using var span = OpenTelemetryHelpers.StartActiveSpan(
_defaultSpanName,
SpanKind.Producer,
message.ParentSpanContext ?? default);

message.EnrichSpanWithTags(span);

var error = false;
Expand Down Expand Up @@ -671,6 +683,8 @@ await CreateReceiptAsync(message, error)

await _channelPool
.ReturnChannelAsync(chanHost, error);

span?.End();
}
}

Expand Down

0 comments on commit d0021f8

Please sign in to comment.