Skip to content

Commit

Permalink
fix: opentelemetry broken traces
Browse files Browse the repository at this point in the history
Activity.Current is a static variable that uses AsyncLocal internally,
which means that it flows into children async calls, but not back to the
caller (message consumer and producer).
With the previous implementation, the Activity.Current is null after the
produce and consume of messages, which means that all spans created
after it will generate new trace information because the context is not
being propagated.

This change fixes the problem firing sync events and making the
OpenTelemetry handlers subscribe them.
  • Loading branch information
simaoribeiro committed Nov 22, 2023
1 parent 06fcddf commit c48df05
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 37 deletions.
4 changes: 4 additions & 0 deletions src/KafkaFlow.Abstractions/IEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public interface IEvent
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<Task> handler);

IEventSubscription Subscribe(Action handler);

Check warning on line 18 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IEvent.Subscribe(Action)'
}

/// <summary>
Expand All @@ -28,5 +30,7 @@ public interface IEvent<out TArg>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<TArg, Task> handler);

IEventSubscription Subscribe(Action<TArg> handler);

Check warning on line 34 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Missing XML comment for publicly visible type or member 'IEvent<TArg>.Subscribe(Action<TArg>)'
}
}

Check warning on line 36 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / Test deployment

12 changes: 12 additions & 0 deletions src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"profiles": {
"KafkaFlow.Admin.Dashboard": {
"commandName": "Project",
"launchBrowser": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "https://localhost:63908;http://localhost:63909"
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
namespace KafkaFlow.IntegrationTests.Core.Middlewares
{
using System.Diagnostics;
using System.Threading.Tasks;
using KafkaFlow.IntegrationTests.Core.Handlers;
using KafkaFlow.OpenTelemetry;

internal class GzipMiddleware : IMessageMiddleware
{
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
MessageStorage.Add((byte[]) context.Message.Value);
var source = new ActivitySource(KafkaFlowInstrumentation.ActivitySourceName);
using var activity = source.StartActivity("integration-test", ActivityKind.Internal);

MessageStorage.Add((byte[])context.Message.Value);
await next(context);
}
}
Expand Down
40 changes: 35 additions & 5 deletions src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,43 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.IsNull(producerSpan.ParentId);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
}

[TestMethod]
public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity()
{
// Arrange
var provider = await this.GetServiceProvider();
MessageStorage.Clear();

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("KafkaFlow.OpenTelemetry")
.AddInMemoryExporter(this.exportedItems)
.Build();

var producer = provider.GetRequiredService<IMessageProducer<GzipProducer>>();
var message = this.fixture.Create<byte[]>();

// Act
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.IsNull(producerSpan.ParentId);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
Assert.AreEqual(internalSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(internalSpan.ParentSpanId, consumerSpan.SpanId);
}

[TestMethod]
public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsPropagatedFromTestActivityToConsumer()
{
Expand Down Expand Up @@ -98,7 +127,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Expand Down Expand Up @@ -182,9 +211,9 @@ await Policy
.ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned));
}

private async Task<(Activity producerSpan, Activity consumerSpan)> WaitForSpansAsync()
private async Task<(Activity producerSpan, Activity consumerSpan, Activity internalSpan)> WaitForSpansAsync()
{
Activity producerSpan = null, consumerSpan = null;
Activity producerSpan = null, consumerSpan = null, internalSpan = null;

await Policy
.HandleResult<bool>(isAvailable => !isAvailable)
Expand All @@ -193,11 +222,12 @@ await Policy
{
producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer);
consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer);
internalSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Internal);
return Task.FromResult(producerSpan != null && consumerSpan != null);
});

return (producerSpan, consumerSpan);
return (producerSpan, consumerSpan, internalSpan);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal static class OpenTelemetryConsumerEventsHandler
private const string AttributeMessagingKafkaSourcePartition = "messaging.kafka.source.partition";
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;

public static Task OnConsumeStarted(IMessageContext context)
public static void OnConsumeStarted(IMessageContext context)
{
try
{
Expand Down Expand Up @@ -50,21 +50,17 @@ public static Task OnConsumeStarted(IMessageContext context)
{
// If there is any failure, do not propagate the context.
}

return Task.CompletedTask;
}

public static Task OnConsumeCompleted(IMessageContext context)
public static void OnConsumeCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
activity?.Dispose();
}

return Task.CompletedTask;
}

public static Task OnConsumeError(IMessageContext context, Exception ex)
public static void OnConsumeError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
Expand All @@ -73,8 +69,6 @@ public static Task OnConsumeError(IMessageContext context, Exception ex)

activity?.Dispose();
}

return Task.CompletedTask;
}

private static IEnumerable<string> ExtractTraceContextIntoBasicProperties(IMessageContext context, string key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal static class OpenTelemetryProducerEventsHandler
private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition";
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;

public static Task OnProducerStarted(IMessageContext context)
public static void OnProducerStarted(IMessageContext context)
{
try
{
Expand Down Expand Up @@ -60,21 +60,17 @@ public static Task OnProducerStarted(IMessageContext context)
{
// If there is any failure, do not propagate the context.
}

return Task.CompletedTask;
}

public static Task OnProducerCompleted(IMessageContext context)
public static void OnProducerCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
activity?.Dispose();
}

return Task.CompletedTask;
}

public static Task OnProducerError(IMessageContext context, Exception ex)
public static void OnProducerError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
Expand All @@ -83,8 +79,6 @@ public static Task OnProducerError(IMessageContext context, Exception ex)

activity?.Dispose();
}

return Task.CompletedTask;
}

private static void InjectTraceContextIntoBasicProperties(IMessageContext context, string key, string value)
Expand Down
61 changes: 48 additions & 13 deletions src/KafkaFlow/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

internal class Event<TArg> : IEvent<TArg>
{
private readonly ILogHandler logHandler;

private readonly List<Func<TArg, Task>> handlers = new();
private readonly List<Func<TArg, Task>> asyncHandlers = new();
private readonly List<Action<TArg>> syncHandlers = new();

public Event(ILogHandler logHandler)
{
Expand All @@ -17,33 +19,64 @@ public Event(ILogHandler logHandler)

public IEventSubscription Subscribe(Func<TArg, Task> handler)
{
if (!this.handlers.Contains(handler))
return this.Subscribe(this.asyncHandlers, handler);
}

public IEventSubscription Subscribe(Action<TArg> handler)
{
return this.Subscribe(this.syncHandlers, handler);
}

internal Task FireAsync(TArg arg)
{
this.InvokeSyncHandlers(arg);
return this.InvokeAsyncHandlers(arg);
}

private IEventSubscription Subscribe<T>(List<T> handlersList, T handler)
{
if (!handlersList.Contains(handler))
{
this.handlers.Add(handler);
handlersList.Add(handler);
}

return new EventSubscription(() => this.handlers.Remove(handler));
return new EventSubscription(() => handlersList.Remove(handler));
}

internal async Task FireAsync(TArg arg)
private async Task InvokeAsyncHandlers(TArg arg)
{
foreach (var handler in this.handlers)
foreach (var handler in this.asyncHandlers.Where(h => h is not null))
{
try
{
if (handler is null)
{
continue;
}

await handler.Invoke(arg);
}
catch (Exception e)
catch (Exception ex)
{
this.LogHandlerOnError(ex);
}
}
}

private void InvokeSyncHandlers(TArg arg)
{
foreach (var handler in this.syncHandlers.Where(h => h is not null))
{
try
{
this.logHandler.Error("Error firing event", e, new { Event = this.GetType().Name });
handler.Invoke(arg);
}
catch (Exception ex)
{
this.LogHandlerOnError(ex);
}
}
}

private void LogHandlerOnError(Exception ex)
{
this.logHandler.Error("Error firing event", ex, new { Event = this.GetType().Name });
}
}

internal class Event : IEvent
Expand All @@ -57,6 +90,8 @@ public Event(ILogHandler logHandler)

public IEventSubscription Subscribe(Func<Task> handler) => this.evt.Subscribe(_ => handler.Invoke());

public IEventSubscription Subscribe(Action handler) => this.evt.Subscribe(_ => handler.Invoke());

internal Task FireAsync() => this.evt.FireAsync(null);
}
}

0 comments on commit c48df05

Please sign in to comment.