-
Notifications
You must be signed in to change notification settings - Fork 297
Description
The .NET Aspire Shop Demo is a sample application showcasing a microservices architecture with components like a storefront, product catalog, and basket service, all wired together using .NET Aspire's orchestration and observability features. By default, it leverages OpenTelemetry (OTel) for distributed tracing, metrics, and logging across HTTP-based service interactions. However, many real-world applications rely on asynchronous messaging systems (e.g., RabbitMQ, Kafka, or Azure Service Bus) for decoupling services. This write-up outlines how to extend the Aspire Shop Demo with a messaging example that demonstrates OTel context propagation for end-to-end observability.
Objective
Add a messaging-based workflow to the Aspire Shop Demo where:
-
A user action (e.g., placing an order) triggers a message to a queue.
-
A background service processes the message (e.g., updating inventory).
-
OpenTelemetry propagates the trace context across the producer and consumer, ensuring the entire workflow is observable in a single distributed trace.
Step 1: Define the Messaging Scenario
Let's extend the demo to include an "Order Processing" feature:
-
When a user adds items to their basket and checks out, the BasketService publishes an OrderPlaced event to a message queue.
-
A new InventoryService consumes the event and updates the product stock levels.
-
The OTel trace context is propagated from the BasketService (producer) to the InventoryService (consumer) via the message.
Step 2: Add Messaging Infrastructure
For this example, we'll use RabbitMQ as the messaging system, as it's lightweight and integrates well with .NET Aspire via its component model. However, the approach is adaptable to other messaging systems like Azure Service Bus or Kafka.
-
Add RabbitMQ to the Aspire App Host: In the AppHost project, update Program.cs to include RabbitMQ as a resource:
var builder = DistributedApplication.CreateBuilder(args); var rabbitmq = builder.AddRabbitMQ("messaging") .WithManagementPlugin(); // Optional: for RabbitMQ management UI // Existing services (e.g., catalog, basket, storefront) var catalogService = builder.AddProject<Projects.CatalogService>("catalogservice"); var basketService = builder.AddProject<Projects.BasketService>("basketservice") .WithReference(rabbitmq); // Add RabbitMQ reference var inventoryService = builder.AddProject<Projects.InventoryService>("inventoryservice") .WithReference(rabbitmq); // Add RabbitMQ reference var storefront = builder.AddProject<Projects.Storefront>("storefront"); builder.Build().Run();
This sets up RabbitMQ as a containerized resource and makes its connection details available to dependent services.
-
Install Required NuGet Packages: In both BasketService and InventoryService, add the following packages:
-
RabbitMQ.Client (for messaging)
-
OpenTelemetry.Instrumentation.RabbitMQ (optional, if available, or manual instrumentation)
-
Step 3: Implement the Producer (BasketService)
Modify the BasketService to publish an OrderPlaced message when a checkout occurs.
-
Define the Message Model: Create a shared model (e.g., in a Contracts project):
public record OrderPlacedEvent(string OrderId, List<string> ProductIds);
-
Publish the Message with OTel Context: Update the checkout endpoint in BasketService:
using RabbitMQ.Client; using System.Diagnostics; using System.Text; using OpenTelemetry; using OpenTelemetry.Context.Propagation; app.MapPost("/checkout", async (HttpContext httpContext, IConnection rabbitConnection) => { var orderId = Guid.NewGuid().ToString(); var productIds = new List<string> { "product1", "product2" }; // Simplified example var message = new OrderPlacedEvent(orderId, productIds); // Get the current OTel activity (trace context) var activity = Activity.Current; var propagator = Propagators.DefaultTextMapPropagator; using var channel = rabbitConnection.CreateModel(); channel.QueueDeclare("order-placed", durable: true, exclusive: false, autoDelete: false); var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message)); var properties = channel.CreateBasicProperties(); properties.Persistent = true; // Inject OTel trace context into message headers var headers = new Dictionary<string, object>(); propagator.Inject(new PropagationContext(activity?.Context ?? default, Baggage.Current), headers, (carrier, key, value) => carrier[key] = value); properties.Headers = headers; channel.BasicPublish(exchange: "", routingKey: "order-placed", basicProperties: properties, body: body); return Results.Ok(new { OrderId = orderId }); });
Here, the OTel Propagators.DefaultTextMapPropagator injects the trace context (e.g., traceparent and tracestate) into the RabbitMQ message headers.
Step 4: Implement the Consumer (InventoryService)
Create a new InventoryService project to consume and process the OrderPlaced event.
-
Set Up the Consumer: In Program.cs of InventoryService:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Diagnostics; using System.Text; using OpenTelemetry; using OpenTelemetry.Context.Propagation; var builder = WebApplication.CreateBuilder(args); var app = builder.Build(); app.MapGet("/", () => "Inventory Service"); // Background message consumer var factory = new ConnectionFactory { HostName = app.Configuration["RabbitMQ:Host"] }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); channel.QueueDeclare("order-placed", durable: true, exclusive: false, autoDelete: false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = JsonSerializer.Deserialize<OrderPlacedEvent>(Encoding.UTF8.GetString(body)); // Extract OTel trace context from headers var headers = ea.BasicProperties.Headers; var propagator = Propagators.DefaultTextMapPropagator; var parentContext = propagator.Extract(default, headers, (carrier, key) => carrier?.ContainsKey(key) == true ? [carrier[key].ToString()] : Array.Empty<string>()); // Start a new activity linked to the parent context using var activity = ActivitySource.StartActivity("ProcessOrderPlaced", ActivityKind.Consumer, parentContext.ActivityContext); activity?.SetTag("order.id", message.OrderId); activity?.SetTag("messaging.system", "rabbitmq"); activity?.SetTag("messaging.operation", "consume"); // Simulate inventory update Console.WriteLine($"Inventory updated for Order: {message.OrderId}"); }; channel.BasicConsume(queue: "order-placed", autoAck: true, consumer: consumer); app.Run(); static readonly ActivitySource ActivitySource = new("InventoryService");
The consumer extracts the OTel trace context from the message headers and starts a new Activity linked to the parent trace, ensuring continuity in the distributed trace.
Step 5: Verify End-to-End Observability
-
Run the Application: Use the Aspire dashboard (dotnet run from the AppHost project) to launch all services, including RabbitMQ.
-
Trigger a Checkout: Use the storefront UI or a tool like curl to hit the /checkout endpoint on BasketService.
-
Observe the Trace: Open the Aspire dashboard's tracing view (powered by OTel and tools like Jaeger or Zipkin). You should see a single trace spanning:
-
The HTTP request to BasketService.
-
The message publication to RabbitMQ.
-
The message consumption and processing by InventoryService.
The trace will include spans like:
-
POST /checkout (BasketService)
-
publish order-placed (BasketService)
-
ProcessOrderPlaced (InventoryService)
-
Benefits
-
End-to-End Visibility: The OTel context propagation ties the asynchronous messaging workflow into the same trace as the initial HTTP request, making it easier to debug latency or failures.
-
Aspire Integration: Leveraging Aspire's resource model simplifies setting up and connecting to RabbitMQ.
-
Extensibility: This pattern can be applied to other messaging systems or workflows (e.g., payment processing, order confirmation emails).
Potential Enhancements
-
Add error handling and retries with OTel instrumentation to observe failure scenarios.
-
Use a dedicated background service (e.g., BackgroundService) for the consumer instead of inline code.
-
Integrate metrics (e.g., queue depth, processing time) alongside traces.