Skip to content

Commit

Permalink
Merge pull request #12 from dbones-labs/1
Browse files Browse the repository at this point in the history
OpenTelemetry support + fix for headers
  • Loading branch information
dbones committed Oct 6, 2021
2 parents c8dc42c + 2355007 commit 3085cc1
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/Eventual.RabbitMq/Eventual.RabbitMq.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<PropertyGroup>
<Version>0.0.5</Version>
<Version>0.0.9</Version>
</PropertyGroup>

<PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Configuration;
using Fox.Middleware;
using Infrastructure.Serialization;
using Tracing;

public class PrepareMessageContextForPublish<T> : IPublishAction<T>
{
Expand All @@ -28,14 +29,21 @@ public Task Execute(MessagePublishContext<T> context, Next<MessagePublishContext
rbc.Body = encoded;

var properties = rbc.Properties;
properties.Type = typeof(T).FullName;
properties.AppId = _busConfiguration.ServiceName;
properties.DeliveryMode = 2; //topic
properties.CorrelationId = context.Message.CorrelationId;
properties.MessageId = context.Message.Id;
//properties.ContentEncoding = ""

//set headers
foreach (var entry in context.Message.Metadata)
{
properties.Headers.Add(entry.Key, entry.Value);
}

//retry headers
properties.Headers.Add("count", 0);
properties.Headers.Add("retry.in", 0);

for (var i = 0; i < _busConfiguration.RetryBackOff.Count; i++)
{
properties.Headers.Add($"retry.{i + 1}.after", _busConfiguration.RetryBackOff[i]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ public async Task Execute(MessageReceivedContext<T> context, Next<MessageReceive
{
var rbc = (RabbitMqMessageReceivedContext<T>)context;
var properties = rbc.Payload.BasicProperties;
var headers = properties?.Headers?.ToDictionary(key => key.Key, pair => pair.Value.ToString())
var headers = properties?.Headers?.ToDictionary(key => key.Key, pair =>
{
var val = pair.Value as byte[];
return val == null
? pair.Value.ToString()
: Encoding.UTF8.GetString(val);
})
?? new Dictionary<string, string>();

var content = Encoding.UTF8.GetString(rbc.Payload.Body.ToArray());
Expand Down
10 changes: 9 additions & 1 deletion src/Eventual/Configuration/Factory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Middleware;
using Middleware.Publishing;
using Middleware.Subscribing;
using Tracing;

public abstract class Factory
{
Expand Down Expand Up @@ -53,27 +54,34 @@ public abstract class Factory
setup.Subscribe(registeredController.ImplementationType);
}

//telemetry
services.AddSingleton<Telemetry>();
services.AddScoped<TelemetryContext>();

//middleware
//publishing
services.AddSingleton(typeof(MessagePublishContextMiddleware<>));
services.AddSingleton(typeof(PublishedMessageMiddleware<>));
services.AddSingleton(svc => svc.GetService<Setup>().PublishContextActions);
services.AddTransient(typeof(InvokePublish<>));
services.AddTransient(typeof(OpenTelemetryPublishAction<>));

var pa = setup.PublishContextActions;
pa.InvokePublisherAction ??= typeof(InvokePublish<>);
pa.ApmAction ??= typeof(OpenTelemetryPublishAction<>);

//subscriptions
services.AddSingleton(typeof(ReceivedMessageMiddleware<>));
services.AddSingleton(svc => svc.GetService<Setup>().ReceivedContextActions);
services.AddTransient(typeof(InvokeConsumer<>));
services.AddTransient(typeof(LogReceivedMessage<>));
services.AddTransient(typeof(DefaultMessageAck<>));
services.AddTransient(typeof(OpenTelemetryConsumeAction<>));

var ra = setup.ReceivedContextActions;
ra.DeadLetterAction ??= typeof(DefaultMessageAck<>);
ra.LoggingAction ??= typeof(LogReceivedMessage<>);
ra.InvokeConsumerAction ??= typeof(InvokeConsumer<>);
ra.ApmAction ??= typeof(OpenTelemetryConsumeAction<>);

}
}
Expand Down
3 changes: 2 additions & 1 deletion src/Eventual/Eventual.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<PropertyGroup>
<Version>0.0.5</Version>
<Version>0.0.9</Version>
</PropertyGroup>

<PropertyGroup>
Expand All @@ -32,6 +32,7 @@
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="5.0.0" />
<PackageReference Include="System.Text.Json" Version="5.0.2" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="6.0.0-rc.1.21451.13" />
</ItemGroup>

</Project>
40 changes: 39 additions & 1 deletion src/Eventual/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System;
using System.Collections.Generic;
using Tracing;

public class Message<T>
{
Expand All @@ -14,16 +15,53 @@ public Message()
{
DateTime = DateTime.UtcNow;
Id = Guid.NewGuid().ToString("D");
CorrelationId = Id;
Metadata = new Dictionary<string, string>();
}

public DateTime DateTime { get; set; }

/// <summary>
/// a unique Id for this message instance
/// </summary>
public string Id { get; set; }

/// <summary>
/// a user defined correlation id. please try to embrace the <see cref="OpenTelemetryTraceId"/>
/// </summary>
public string CorrelationId { get; set; }

/// <summary>
/// the open telemetry trace id, W3C format (this is normally generated from a parent context)
/// </summary>
public string OpenTelemetryTraceId
{
get
{
Metadata.TryGetValue(Telemetry.Header, out var value);
return value;
}
set
{
if (Metadata.ContainsKey(Telemetry.Header))
{
Metadata[Telemetry.Header] = value;
}
else
{
Metadata.Add(Telemetry.Header, value);
}
}
}

/// <summary>
/// any meta data (headers) that should accompany this message for down stream subscribers to process.
/// </summary>
public IDictionary<string, string> Metadata { get; set; }


/// <summary>
/// main payload to be published
/// </summary>
public T Body { get; set; }
}
}
2 changes: 1 addition & 1 deletion src/Eventual/Middleware/IDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public Task ProcessMessage<T>(MessageReceivedContext<T> receivedContext)

public Task ProcessMessage<T>(MessagePublishContext<T> publishContext)
{
var middleware = _serviceProvider.GetService<MessagePublishContextMiddleware<T>>();
var middleware = _serviceProvider.GetService<PublishedMessageMiddleware<T>>();
return middleware.Execute(_serviceProvider, publishContext);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
using System.Threading.Tasks;
using Fox.Middleware;

public class MessagePublishContextMiddleware<T> : IMiddleware<MessagePublishContext<T>>
public class PublishedMessageMiddleware<T> : IMiddleware<MessagePublishContext<T>>
{
private readonly Middleware<MessagePublishContext<T>> _internalMiddleware;

public MessagePublishContextMiddleware(PublishContextActions actions)
public PublishedMessageMiddleware(PublishContextActions actions)
{
_internalMiddleware = new Middleware<MessagePublishContext<T>>();

Expand Down
13 changes: 9 additions & 4 deletions src/Eventual/Middleware/Subscribing/LogReceivedMessage.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
namespace Eventual.Middleware.Subscribing
{
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using Fox.Middleware;
using Microsoft.Extensions.Logging;
using Tracing;

public class LogReceivedMessage<T> : IConsumeAction<T>
{
private readonly TelemetryContext _telemetryContext;
private readonly ILogger<T> _logger;

public LogReceivedMessage(ILogger<T> logger)
public LogReceivedMessage(TelemetryContext telemetryContext, ILogger<T> logger)
{
_telemetryContext = telemetryContext;
_logger = logger;
}

public async Task Execute(MessageReceivedContext<T> context, Next<MessageReceivedContext<T>> next)
{
var id = _telemetryContext.OpenTelemetryTraceId ?? context.Message.Id;
var messageType = typeof(T).FullName;
using (var scope = _logger.BeginScope(context.Message.Id))
using (var scope = _logger.BeginScope(id))
{
_logger.LogInformation($"Receiving message {messageType}");
_logger.LogDebug($"Receiving message {messageType}");
try
{
await next(context);
_logger.LogInformation($"Received message {messageType}");
_logger.LogDebug($"Received message {messageType}");
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ public ReceivedMessageMiddleware(ReceivedContextActions actions)
_internalMiddleware = new Middleware<MessageReceivedContext<T>>();

_internalMiddleware.Add(MakeGeneric<T>(actions.ReadMessageFromQueueIntoContextAction));
if (actions.LoggingAction != null) _internalMiddleware.Add(MakeGeneric<T>(actions.LoggingAction));
if (actions.ApmAction != null) _internalMiddleware.Add(MakeGeneric<T>(actions.ApmAction));
if (actions.LoggingAction != null) _internalMiddleware.Add(MakeGeneric<T>(actions.LoggingAction));
if (actions.DeadLetterAction != null) _internalMiddleware.Add(MakeGeneric<T>(actions.DeadLetterAction));

foreach (var customAction in actions.CustomActions)
Expand Down
72 changes: 72 additions & 0 deletions src/Eventual/Tracing/OpenTelemetryConsumeAction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
namespace Eventual.Tracing
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using Fox.Middleware;
using Middleware;
using Middleware.Subscribing;

public class OpenTelemetryConsumeAction<T> : IConsumeAction<T>
{
private readonly Telemetry _telemetry;
private readonly TelemetryContext _context;

public OpenTelemetryConsumeAction(Telemetry telemetry, TelemetryContext context)
{
_telemetry = telemetry;
_context = context;
}

public async Task Execute(MessageReceivedContext<T> context, Next<MessageReceivedContext<T>> next)
{
var traceId = context.Message.OpenTelemetryTraceId;
var activity = traceId != null
? _telemetry.ActivitySource.StartActivity(typeof(T).FullName, ActivityKind.Consumer, traceId)
: _telemetry.ActivitySource.StartActivity(typeof(T).FullName, ActivityKind.Consumer);

_context.CorrelationId = context.Message.CorrelationId;

if (activity == null)
{
await next(context);
return;
}

//activity.SetIdFormat(ActivityIdFormat.W3C);
activity.AddTag("adapter", "eventual");
activity.AddTag("message.id", context.Message.Id);
activity.AddTag("message.correlation.id", context.Message.CorrelationId);

Activity.Current = activity;
_context.OpenTelemetryTraceId = activity?.Id;

using (activity)
{
//activity.Start();
try
{
await next(context);
activity.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception e)
{
activity.SetStatus(ActivityStatusCode.Error, e.Message);
activity.SetTag("otel.status_code", "ERROR");
activity.SetTag("otel.status_description", e.Message);

var tags = new List<KeyValuePair<string, object>>
{
new("message", e.Message),
new("stack", e.StackTrace)
};

ActivityEvent @event = new ActivityEvent("error", tags: new ActivityTagsCollection(tags));
activity.AddEvent(@event);
throw;
}
}
}
}
}
61 changes: 61 additions & 0 deletions src/Eventual/Tracing/OpenTelemetryPublishAction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
namespace Eventual.Tracing
{
using System.Diagnostics;
using System.Threading.Tasks;
using Fox.Middleware;
using Middleware;
using Middleware.Publishing;

public class OpenTelemetryPublishAction<T> : IPublishAction<T>
{
private readonly Telemetry _telemetry;
private readonly TelemetryContext _context;

public OpenTelemetryPublishAction(Telemetry telemetry, TelemetryContext context)
{
_telemetry = telemetry;
_context = context;
}

public async Task Execute(MessagePublishContext<T> context, Next<MessagePublishContext<T>> next)
{
string parentId;
var parent = Activity.Current;

if (parent != null && !string.IsNullOrEmpty(parent.Id) && parent.IdFormat == ActivityIdFormat.W3C)
{
parentId = parent.Id;
}
else
{
parentId = _context?.OpenTelemetryTraceId;
}

var activity = parentId != null
? _telemetry.ActivitySource.StartActivity(typeof(T).FullName, ActivityKind.Producer, parentId)
: _telemetry.ActivitySource.StartActivity(typeof(T).FullName, ActivityKind.Producer);

//user can define -> we try the parent message -> finally we re-use the current id
context.Message.CorrelationId ??= _context?.CorrelationId ?? context.Message.Id;

if (activity == null)
{
await next(context);
return;
}

//activity.SetIdFormat(ActivityIdFormat.W3C);
activity.AddTag("adapter", "eventual");
activity.AddTag("message.id", context.Message.Id);
activity.AddTag("message.correlation.id", context.Message.CorrelationId);

using (activity)
{
//activity.Start();
context.Message.OpenTelemetryTraceId = activity.Id;
//if (!context.Message.Metadata.ContainsKey(Telemetry.Header)) context.Message.Metadata.Add(Telemetry.Header, activity.Id);
await next(context);
}
}
}
}

0 comments on commit 3085cc1

Please sign in to comment.