Skip to content

fix: resolve producer lookup failure when posting Reply messages#4070

Merged
DevJonny merged 16 commits intomasterfrom
fix/reply-producer-lookup
Apr 27, 2026
Merged

fix: resolve producer lookup failure when posting Reply messages#4070
DevJonny merged 16 commits intomasterfrom
fix/reply-producer-lookup

Conversation

@DevJonny
Copy link
Copy Markdown
Contributor

@DevJonny DevJonny commented Apr 21, 2026

Summary

When a message mapper sets Message.Header.Topic to a value different from the registered publication topic (as Reply mappers do, using the reply address), the outbox dispatcher was looking up the producer by the mapper-set topic and failing to find it.

Fix: the wrap pipeline stashes the publication topic in Header.Bag[ProducerTopicHeaderName] when it differs from the mapper-set topic. The outbox dispatcher reads that bag entry for producer lookup (falling back to Header.Topic when absent). Stripping the bag entry on the wire is the transport's responsibility — the message itself keeps the entry so
an InMemoryOutbox-by-reference retry still locates the producer.

Changes

Core

  • Message.ProducerTopicHeaderName constant (namespaced as paramore.brighter.ProducerTopic)
  • WrapPipeline.Wrap / WrapPipelineAsync.WrapAsync: write publication.Topic into bag when topics differ (guarded for null publication topic)
  • OutboxProducerMediator.GetProducerLookupTopic: new private helper used by Dispatch, DispatchAsync, BulkDispatchAsync. The mediator no longer mutates the bag — no strip-and-restore dance, no dispatched book-keeping that existed only to drive the restore
  • MessageHeader.IsLocalHeader(name) / MessageHeader.RegisterLocalHeader(name): public extension seam for marking bag keys as local (must not be serialised onto the wire). Pre-populated with ProducerTopicHeaderName. Backed by a copy-on-write HashSet<string> snapshot — lock-free reads on the per-bag-entry hot path, CAS-loop registrations expected at
    startup. No new package dependency.
  • MessageHeader.BagWithoutLocalHeaders(): returns a filtered copy of the bag for transports that JSON-serialise the whole bag in one go

Transport sweep

Every transport that copies Header.Bag onto its wire format now skips local headers via MessageHeader.IsLocalHeader(...):

  • AzureServiceBus publisher + Scheduler.Azure
  • AWSSQS V3 + V4 (SNS publisher + SQS sender — use BagWithoutLocalHeaders() because they JSON-serialise the whole bag in one go)
  • RMQ.Async + RMQ.Sync publishers
  • Redis publisher (BagWithoutLocalHeaders())
  • Kafka header builder
  • GcpPubSub parser
  • RocketMQ producer

Followed iancooper's review preference: stripping happens at the wire-conversion step in each transport, not centrally in the mediator. This preserves the bag entry on the original message reference held by InMemoryOutbox, so retries still resolve the correct producer.

Deployment note — rolling upgrade

During a rolling upgrade, outbox rows persisted by an old instance won't have the ProducerTopic bag entry. When a new instance drains them, producer lookup falls back to Header.Topic (the reply address) and will fail for reply messages. Either drain the outbox before upgrading, or accept that stuck reply rows may need to be re-posted after the rollout
completes.

Tests

  • When_Posting_A_Reply_Message_To_The_Command_Processor (+ async) — end-to-end post + asserts bag entry survives dispatch on the original message (transports do the wire-form stripping)
  • When_Wrapping_A_Reply_Message_Mapper (+ async) — verifies bag is populated with publication topic
  • When_Bulk_Dispatching_Reply_Messages_Async — pins BulkDispatchAsync path
  • When_Wrapping_With_Null_Publication_Topic (+ async) — pins the null-publication-topic guard
  • MessageHeaderLocalHeadersTestsBagWithoutLocalHeaders + IsLocalHeader + RegisterLocalHeader idempotency, asserts the original header is untouched
  • AzureServiceBusMessagePublisherLocalHeaderTests — confirms ASB wire form drops ProducerTopic while the original message keeps it, and unrelated bag entries still travel

Test plan

  • New tests pass locally
  • Existing test suite builds clean
  • CI green

Co-Authored-By: Claude (claude-opus-4-7) noreply@anthropic.com

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Apr 21, 2026

CLA assistant check
All committers have signed the CLA.

@DevJonny DevJonny changed the title Fix/reply producer lookup fix: resolve producer lookup failure when posting Reply messages Apr 21, 2026
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 21, 2026

Code Review: Fix/reply producer lookup

Overview

This PR fixes a real bug: when a reply message mapper overrides Header.Topic to the dynamic reply address, the OutboxProducerMediator can no longer find the producer (which is registered under the original publication topic). The fix stores the publication topic in Header.Bag under a new ProducerTopic key, then reads that key during producer lookup.

The approach is pragmatic and well-targeted. The test coverage is thorough. A few issues worth discussing before merge:


Issues / Risks

1. firstMessage assumption in BulkDispatchAsync is fragile

// OutboxProducerMediator.cs ~854
var firstMessage = topicBatch.First();
var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(firstMessage));

Messages are batched by Header.Topic (the reply address). The fix assumes every message in the batch carries the same ProducerTopic bag value. While logically consistent today, this assumption is implicit and unvalidated. If a batch ever contains messages with different bag values (e.g., due to a mapper bug or future code change), the wrong producer will be selected silently for all but the first.

Consider either:

  • Asserting / logging a warning when bag values differ across the batch, or
  • Adding a brief inline comment acknowledging the invariant being relied on.

2. Task.Delay(500) in the bulk test is a timing anti-pattern

// When_Bulk_Dispatching_Reply_Messages_Async.cs line ~200
await Task.Delay(500);

This makes the test both slow and potentially flaky in congested CI environments. A TaskCompletionSource, polling loop with a short timeout, or WaitUntil helper would make this deterministic. If ClearOutstandingFromOutboxAsync schedules background work that is hard to await, document that constraint — but a raw Delay is a red flag in a test.

3. Outbox messages written before this fix will still fail

If the outbox already contains reply messages persisted before this fix (no ProducerTopic bag entry), GetProducerLookupTopic falls back to Header.Topic (the reply address) and producer lookup still fails. This is likely acceptable, but it is a migration cliff worth noting in the PR description or a CHANGELOG entry so operators know they may need to manually resolve stuck outbox entries after deploying.


Smaller Points

Style: missing braces on single-line if

// WrapPipeline.cs and WrapPipelineAsync.cs
if (publication.Topic is not null)
    message.Header.Bag[Message.ProducerTopicHeaderName] = publication.Topic.Value;

The surrounding if block already uses braces. Braces here would be consistent with the project style and reduce the risk of a future maintainer accidentally adding a second statement without braces.

ProducerTopicHeaderName is in Header.Bag, not a real header

The XML doc says "Tag name for the producer topic header". This value lives in the loosely-typed Bag dictionary, not as a typed header property. Consider clarifying the doc comment to say "bag key" to avoid confusion with MessageHeader properties.

Log line still uses message.Header.Topic

Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id);

This is intentional (logging where the message goes, not which producer dispatches it), but a brief comment would make that clear to future maintainers who might otherwise think the log is inconsistent with the lookup.


Strengths

  • The core fix is minimal and correct: one helper, three call-site updates.
  • The null-guard on publication.Topic before writing to the bag is correct and is explicitly tested.
  • Test coverage spans: wrap pipeline (sync + async), null publication topic (sync + async), single-message dispatch (sync + async), and bulk dispatch.
  • The MyResponseMessageMapper / MyResponseMessageMapperAsync test doubles faithfully replicate the real-world pattern (mapper sets topic to reply address).

Summary

The fix is sound. The Task.Delay in the bulk test and the unvalidated firstMessage assumption are the two items I'd address before merging. The others are minor polish. Overall this is a well-scoped, well-tested fix.

DevJonny pushed a commit that referenced this pull request Apr 21, 2026
- deterministic poll-until in bulk dispatch test (drop Task.Delay)
- document firstMessage invariant in BulkDispatchAsync
- clarify ProducerTopicHeaderName xml doc (bag key, not header)
- braces around single-line if in WrapPipeline + WrapPipelineAsync
- inline note on Log.DecoupledInvocationOfMessage divergence from lookup

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
@github-actions github-actions Bot added the Bug label Apr 21, 2026
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 21, 2026

Code Review

The fix is well-motivated and the production code change is minimal. Here are my findings, grouped by severity.


⚠️ Potential Issue: ProducerTopic bag entry is sent on the wire

Files: WrapPipeline.cs, WrapPipelineAsync.cs

The ProducerTopicHeaderName entry is written into Header.Bag before the message is serialized and dispatched. If Brighter's transport adapters include the Bag in the wire format (which many do — it maps to AMQP headers, SNS/SQS message attributes, etc.), then consumers will receive this internal routing artefact as a visible message attribute.

This could:

  • Pollute consumer-side header inspection and event-store snapshots
  • Be confused for a user-defined header by consumers
  • Leak internal topology information (the registered producer topic) to external systems

Suggestion: Consider stripping the bag entry just before sending (in OutboxProducerMediator after calling GetProducerLookupTopic) or document explicitly that this header is internal and expected to be present on the wire.


⚠️ BulkDispatchAsyncfirstMessage assumption is sound but fragile

File: OutboxProducerMediator.cs:65

var firstMessage = topicBatch.First();
var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(firstMessage));

The grouping is by Header.Topic, so all messages in a batch share the same wire topic. Since WrapPipeline writes the same ProducerTopic bag entry for all messages routed through the same publication, firstMessage is indeed representative. However, there's a silent failure mode: if some messages in the batch were persisted to the outbox before this fix was deployed (during a rolling upgrade), they won't have the bag entry. GetProducerLookupTopic would fall back to Header.Topic (the reply address), and LookupBy would throw or return wrong results for those messages.

This is likely acceptable as a transient migration concern, but worth calling out in the PR description so operators know a restart/draining may be needed during deployment.


💡 Minor: Bag key collision risk

File: Message.cs:15

public const string ProducerTopicHeaderName = "ProducerTopic";

The key "ProducerTopic" is short and generic. If a user happens to set this key on their own message, the fix would silently override their value. Adopting a namespace convention (e.g. "_brighter.ProducerTopic" or "paramore.brighter.ProducerTopic") would make collisions virtually impossible and signal clearly that the header is infrastructure-owned.


💡 Minor: Async test method missing Async suffix

File: When_Wrapping_A_Reply_Message_MapperAsync.cs:79

public async Task When_Wrapping_A_Reply_Message_Mapper()  // should be ...Mapper_Async

The class is AsyncReplyMessageWrapRequestTests but the test method doesn't follow the _Async naming convention used elsewhere in the suite. Small but could cause confusion.


✅ What's working well

  • Fix is correct and minimal: 3 lines of production code, targeting the exact layer (WrapPipeline) where the topic divergence is first observed.
  • Good test coverage: 8 test files covering sync, async, bulk-dispatch, and the null-publication-topic guard. The When_Wrapping_* unit tests cleanly pin the bag population behaviour at the right layer.
  • GetProducerLookupTopic helper: Centralising the lookup logic into one static method makes the three call sites consistent and easy to audit. The is string pattern match on the TryGetValue result correctly handles any type mismatch in the bag.
  • Null guard on publication.Topic: Correctly avoids writing a null value into the bag, and the guard test pins it against regression.
  • Documentation: The XML doc on ProducerTopicHeaderName is clear about why it exists and where it lives, which is valuable given its unusual dual-role as a routing metadata carrier.

The wire-leakage concern is the one I'd want resolved before merge — everything else is advisory.

codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 21, 2026

Code Review

This is a well-reasoned fix for a real bug. The design is sound — using Header.Bag as a sidecar to carry the original publication topic through the pipeline, then stripping it before wire dispatch, is a clean solution that avoids changing the public interface of MessageHeader. The test coverage is thorough across sync, async, and bulk paths. A few observations below.


Issues worth addressing before merge

1. BulkDispatchAsyncfirstMessage assumption is undocumented at the call site

The comment in the test correctly explains the assumption, but the production code comment in OutboxProducerMediator.BulkDispatchAsync says:

Messages in this batch share Header.Topic (the group key). They therefore share the same ProducerTopic bag entry (set by WrapPipelineAsync when the mapper overrode the topic), so firstMessage is representative of the batch.

This is only true if all messages in the batch share the same publication (i.e., same producer topic in the bag). If a future path ever groups messages from different publications under the same Header.Topic, the wrong producer would be looked up silently. Consider an explicit assertion or at least a Debug.Assert that all messages in topicBatch with a bag entry share the same ProducerTopicHeaderName value. Low risk today, but this is a silent correctness assumption.

2. Outbox retains the bag entry after dispatch

StripProducerLookupTopic is called on the in-memory Message object. If InMemoryOutbox stores message references (not copies), the outbox entry also gets the bag entry stripped after dispatch. If it stores copies, the outbox entry permanently holds the paramore.brighter.ProducerTopic bag key. The current tests don't assert on the outbox message's bag state post-dispatch, so the behaviour is unspecified. This is unlikely to cause real problems (it's an internal header), but a note in the ProducerTopicHeaderName doc comment about outbox retention would help future readers.

3. Polling loop in When_Bulk_Dispatching_Reply_Messages_Async could flake

while (_internalBus.Stream(_replyTopic).Count() < 2 && DateTimeOffset.UtcNow < deadline)
{
    await Task.Delay(25);
}

DateTimeOffset.UtcNow is real wall-clock time while the rest of the test uses FakeTimeProvider. In a slow CI environment, the 5-second window is probably fine, but this pattern is inherently time-sensitive. If ClearOutstandingFromOutboxAsync with useBulk: true is actually synchronous with respect to dispatch (i.e., messages are on the bus before the method returns), the poll is unnecessary. If it's truly fire-and-forget, consider whether FakeTimeProvider can be advanced to avoid real-time waiting entirely. Either way, adding a failure message to the final assertion would make CI failures easier to diagnose:

Assert.Equal(2, messages.Length); // timed out after 5s — bulk dispatch may not have fired

Minor observations

4. Comment style in production code

The inline comments added to Dispatch and DispatchAsync explain non-obvious behaviour (logging by wire topic but looking up by a different topic) — that's the right threshold. However, the StripProducerLookupTopic XML doc is quite verbose for a private static helper. CLAUDE.md's guidance is one short comment line for private helpers.

5. ProducerTopicHeaderName placement

The constant lives on Message rather than MessageHeader (where Bag lives). A minor nit — MessageHeader.ProducerTopicHeaderName would be the more discoverable location since it's a bag key on MessageHeader.Bag. Message is also fine given the existing RejectionReasonHeaderName precedent.

6. Null-topic fallback in GetProducerLookupTopic

When publication.Topic is null, the bag entry is correctly not written (guarded in WrapPipeline), and GetProducerLookupTopic falls back to message.Header.Topic. This means null-topic publications with a reply mapper still fail on producer lookup — but that's pre-existing behaviour. Worth a brief note in GetProducerLookupTopic's comment to avoid confusion later.


Summary

The core fix is correct and the test suite covers the important paths (sync post, async post, bulk dispatch, null-topic guard). The main thing I'd want resolved before merge is item #1 (making the firstMessage batch assumption explicit/verifiable in production code) and item #3 (confirm whether the poll is actually needed). Items #2, #4, #5, #6 are minor and could be addressed in a follow-up.

Marking as DRAFT makes sense until CI is green — let us know when you're ready for a final pass.

codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 21, 2026

PR Review: fix: resolve producer lookup failure when posting Reply messages

The fix is conceptually sound and well-scoped. Using a transient Header.Bag entry as a side-channel to separate producer lookup key from wire topic avoids changing the public MessageHeader type or the IAmAMessageMapper interface, which is the right call. Test coverage is comprehensive — sync, async, and bulk paths are all pinned. A few things worth looking at before merging:


Bug: InMemoryOutbox stores references — StripProducerLookupTopic corrupts the stored record

InMemoryOutbox.Add stores the Message object by reference (OutboxEntry(message) holds message as-is). Get returns that same reference. This means when StripProducerLookupTopic removes the bag entry from the dispatched message, it also silently removes it from the outbox's own copy.

For a DB-backed outbox this is harmless — the serialised row in the DB is unaffected. But for InMemoryOutbox, a message whose dispatch fails after the strip but before successful send (e.g. transport exception) cannot be retried: the bag entry is gone from the outbox, producer lookup falls back to Header.Topic (the reply address), and the lookup fails. The message is stuck.

The issue is subtle and production impact is limited to cases where InMemoryOutbox is used in production (which is rare). Still worth documenting explicitly, or — better — making StripProducerLookupTopic operate on a copy of the message used only for dispatch rather than mutating the outbox-owned object.


Minor: BulkDispatchAsyncfirstMessage assumption is undocumented at the call site

var firstMessage = topicBatch.First();
var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(firstMessage));
StripProducerLookupTopic(topicBatch);

The assumption (all messages in a topic group share the same ProducerTopic bag entry) is correct by design, and the comment in the test file explains it well. The production code comment (// Messages in this batch share Header.Topic (the group key)...) is useful. Consider whether it is also worth a Debug.Assert or a log warning if two messages in the same batch have different bag values, to catch future mapper misconfigurations before they silently produce wrong producer lookups.


Minor: Inline comments explain what, not why (contra CLAUDE.md)

// Log the wire topic (Header.Topic) — where the message is going. Producer
// lookup uses GetProducerLookupTopic, which may differ from Header.Topic when
// a mapper overrode it (e.g. Reply messages routed to a dynamic reply address).
Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id);

The CLAUDE.md convention is to comment only when the why is non-obvious, not to explain what a line does. The log call itself is self-evident; the comment's real value is only in the second sentence ("may differ from Header.Topic"). The same applies in BulkDispatchAsync. Consider trimming to only the non-obvious part, or removing the comments entirely and letting the method names (GetProducerLookupTopic) speak.


Nit: Polling loop in When_Bulk_Dispatching_Reply_Messages_Async could be flaky in CI

var deadline = DateTimeOffset.UtcNow + TimeSpan.FromSeconds(5);
while (_internalBus.Stream(_replyTopic).Count() < 2 && DateTimeOffset.UtcNow < deadline)
{
    await Task.Delay(25);
}

ClearOutstandingFromOutboxAsync returns a Task — if the underlying clear completes synchronously (which it does for InMemoryOutbox), awaiting the task is sufficient and no polling is needed. If it genuinely is fire-and-forget, consider using a SemaphoreSlim or TaskCompletionSource keyed off a mock InternalBus subscription instead of a time-bounded spin. The 5-second wall-clock bound is fine in practice but will cause flaky failures under CI load.


Info: Brighter.slnx references new sample projects whose .csproj files are not in this diff

<Folder Name="/samples/TaskQueue/ASBRequestReply/">
  <Project Path="samples/TaskQueue/ASBRequestReply/Greetings/Greetings.csproj" />
  <Project Path="samples/TaskQueue/ASBRequestReply/GreetingsClient/GreetingsClient.csproj" />
  <Project Path="samples/TaskQueue/ASBRequestReply/GreetingsServer/GreetingsServer.csproj" />
</Folder>

These .csproj files are not in the PR diff — are they already on the branch in a prior commit, or are they missing? If missing, the solution file will fail to load (unresolved project reference). Worth verifying before CI runs.


Summary

Area Finding
Core logic ✅ Correct and minimal
Null-topic guard ✅ Present and tested
Test coverage ✅ Comprehensive (sync / async / bulk / null-guard)
InMemoryOutbox mutation ⚠️ Strip corrupts the stored reference; retry after failure will fail
BulkDispatch firstMessage assumption ℹ️ Correct but undocumented safety net
Comment style ℹ️ Some what-comments contra CLAUDE.md
Test polling ℹ️ Could be flaky under CI load
Missing sample .csproj files ℹ️ Verify or exclude from slnx

codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 21, 2026

Review: fix/resolve-producer-lookup-failure-reply-messages

The fix is conceptually sound and well-tested. Previous reviews have progressively tightened it; the remaining issue worth resolving before merge is the InMemoryOutbox mutation hazard. Notes below, ordered by severity.


1. InMemoryOutbox stores by reference — StripProducerLookupTopic corrupts the outbox entry (Medium)

OutboxEntry holds a direct reference to the message, not a copy:

// src/Paramore.Brighter/InMemoryOutbox.cs
public class OutboxEntry(Message message)
{
    public Message Message { get; } = message;   // reference, not a copy
}

So when StripProducerLookupTopic(message) removes ProducerTopicHeaderName from Header.Bag, it also removes it from the object that the outbox holds. If producer.Send (or the bulk equivalent) throws after the strip, the outbox entry is permanently missing the bag key. A subsequent retry will call GetProducerLookupTopic, find no bag entry, fall back to Header.Topic (the dynamic reply address), fail to find a registered producer, and the message is stuck.

Persistent outboxes (SQL, Mongo, etc.) are immune because each drain deserialises a fresh object — the DB row is untouched. This only affects InMemoryOutbox, which is primarily a dev/test tool, so the blast radius is limited. But all the new tests use InMemoryOutbox, so the tests don't exercise the failure-then-retry path.

Suggested fix: Rather than mutating the original, dispatch a shallow-cloned message with the bag entry stripped:

private static Message ToWireMessage(Message message)
{
    var wire = new Message(
        new MessageHeader(message.Header) { Bag = new Dictionary<string, object>(message.Header.Bag) },
        message.Body);
    wire.Header.Bag.Remove(Message.ProducerTopicHeaderName);
    return wire;
}

Then dispatch ToWireMessage(message) instead of message directly. The outbox entry stays intact and retries are safe.

If MessageHeader doesn't expose a copy constructor today, an alternative is to strip only after MarkDispatchedAsync succeeds — though that would leave the bag entry visible to the transport, which is the original wire-leakage problem.


2. Debug.Assert is a no-op in Release builds (Low)

// OutboxProducerMediator.cs BulkDispatchAsync
Debug.Assert(
    topicBatch.All(m => GetProducerLookupTopic(m) == producerLookupTopic),
    "all messages in a topicBatch must share the same producer-lookup topic");

Debug.Assert is compiled out in Release configurations, so in production this invariant is never checked. If the assumption breaks (e.g. a future mapper or pipeline change produces mixed bag entries within a Header.Topic group), BulkDispatchAsync will silently dispatch all messages via the wrong producer with no diagnostic.

Consider replacing with a logged warning or, if the invariant must hold, an InvalidOperationException so the problem is surfaced in production:

if (!topicBatch.All(m => GetProducerLookupTopic(m) == producerLookupTopic))
    throw new InvalidOperationException(
        $"Expected all messages in topic batch '{topicBatch.Key}' to share producer-lookup topic '{producerLookupTopic}', but they do not.");

3. Comment style (Low — CLAUDE.md convention)

CLAUDE.md asks for comments only when the why is non-obvious, and discourages multi-line comment blocks. Several of the new comments describe what the code does rather than a hidden constraint:

// Log the wire topic (Header.Topic) — where the message is going. Producer
// lookup uses GetProducerLookupTopic, which may differ from Header.Topic when
// a mapper overrode it (e.g. Reply messages routed to a dynamic reply address).

The code itself (GetProducerLookupTopic(message) vs message.Header.Topic) already communicates the difference. A single-line comment noting the invariant would be sufficient, or no comment at all if the method name is self-descriptive.


What's working well

  • The core fix is correct: stashing the publication topic in the bag at wrap time and reading it back at dispatch cleanly decouples the mapper-controlled wire topic from the producer-registry key.
  • ProducerTopicHeaderName = "paramore.brighter.ProducerTopic" is properly namespaced to avoid collisions.
  • The null-topic guard in both WrapPipeline variants is correct.
  • Test coverage is comprehensive: both paths (sync/async), the bulk path, and the null-publication-topic edge case are all pinned.
  • The rolling upgrade caveat in the PR description is clear and actionable.
  • The ASBRequestReply sample projects referenced in Brighter.slnx appear to be referenced but not yet present in the diff — worth confirming whether they belong in a separate PR or need to be included here.

The mutation issue (#1) is the one thing I'd want to see addressed before merge, especially given the tests rely on InMemoryOutbox — fixing it there would also close the test coverage gap on the retry path.

Review by Claude (claude-sonnet-4-6)

codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 21, 2026

Code Review — PR #4070: Resolve producer lookup failure for Reply messages

Overall: This is a clean, well-targeted bug fix with good test coverage. The approach of stashing the publication topic in Header.Bag is clever — it bridges the gap between what the mapper sets as the wire destination and what the producer registry needs for lookup, without changing any public mapper interfaces.


What the PR does

When a Reply mapper sets Header.Topic to the reply address (a dynamic, per-message value), OutboxProducerMediator was looking up the producer by that dynamic value and failing. The fix: WrapPipeline stashes the registered publication topic in Header.Bag[\"paramore.brighter.ProducerTopic\"] whenever the mapper-set topic differs, and the mediator reads it for lookup then strips it before dispatch.


Issues and suggestions

1. RoutingKey equality in Debug.Assert — verify operator semantics

Debug.Assert(
    topicBatch.All(m => GetProducerLookupTopic(m) == producerLookupTopic), ...);

GetProducerLookupTopic returns new RoutingKey(topic) from the bag. Whether == is structural or reference equality depends on whether RoutingKey overrides == / is a record. If it compares references, two new RoutingKey(\"Reply\") calls won't be equal and the assert will always fire in debug builds on reply batches. Worth a quick verify — if RoutingKey doesn't override ==, use .Value string comparison instead:

topicBatch.All(m => GetProducerLookupTopic(m).Value == producerLookupTopic.Value)

2. StripProducerLookupTopic before dispatch — document that it is safe

In Dispatch and DispatchAsync:

var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), ...);
StripProducerLookupTopic(message);   // ← strip before send

If SendWithDelay internally uses a Polly retry policy, each retry attempt uses the same message object — which is now stripped. This is actually fine because Header.Topic (the wire destination) is not touched; only the internal lookup hint is removed. But a one-line comment would pre-empt confusion for future readers:

// Strip before send: the bag entry is only needed for producer lookup above;
// Header.Topic (the wire destination) is unchanged.
StripProducerLookupTopic(message);

3. Missing outbox assertion in When_Posting_A_Reply_Message_To_The_Command_Processor

The test verifies the dispatched wire message is stripped, and that the outbox record exists — but it doesn't assert that the outbox record was stored with the ProducerTopicHeaderName bag entry. That stored entry is the mechanism that makes draining work correctly. Adding it would more explicitly document the contract:

// bag entry present in outbox (enables future drain to find the producer)
Assert.True(outboxMessage.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName));

Note: this depends on whether InMemoryOutbox.Get returns the original or a copy of the stored message. If Post dispatches immediately and the in-memory object is already stripped by the time Get is called, you may need a separate unit test against WrapPipeline for this assertion (which When_Wrapping_A_Reply_Message_Mapper already covers).

4. ProducerTopicHeaderName visibility — consider internal

The constant is public, making it part of the library's API surface. That's fine if you want application mappers to reference it explicitly (e.g., to strip the entry themselves in custom dispatch scenarios). But if it's purely a framework-internal coordination mechanism, internal would prevent accidental coupling. Not a blocker — just worth a conscious choice.

5. Debug.Assert double-enumeration risk (low risk, but worth noting)

var firstMessage = topicBatch.First();
var producerLookupTopic = GetProducerLookupTopic(firstMessage);
Debug.Assert(topicBatch.All(m => GetProducerLookupTopic(m) == producerLookupTopic), ...);

If topicBatch is a deferred LINQ sequence from the grouping, .First() and .All(...) would enumerate it twice. In practice the messages are already materialized in memory before grouping, so this is fine — but it's worth confirming. If there's any risk, materializing with .ToList() or .ToArray() first would make it explicit.


What's done well

  • The fix is minimal (4 lines in WrapPipeline, ~30 lines in the mediator) with no behavioral changes to the happy path.
  • Fallback to Header.Topic when the bag entry is absent correctly preserves pre-fix behavior for existing outbox rows — the rolling-upgrade note is well documented.
  • Sync/async/bulk paths are all covered with dedicated tests.
  • When_Wrapping_With_Null_Publication_Topic pins the null-guard and prevents future NREs.
  • Stripping before wire dispatch prevents bag leakage on AMQP/SNS transports.

Summary

The logic is sound and the tests cover the important paths. The main actionable item is #1 (verify RoutingKey == semantics) — if that operator isn't structural, the Debug.Assert will falsely fire on every reply batch in debug builds. Items 2–5 are suggestions for clarity and completeness, not blockers.

@DevJonny DevJonny marked this pull request as ready for review April 22, 2026 06:54
Copilot AI review requested due to automatic review settings April 22, 2026 06:54
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes outbox dispatch producer lookup for Reply messages when a mapper overrides Message.Header.Topic (e.g., to a dynamic reply address) by stashing the publication topic into Message.Header.Bag for lookup, then stripping it prior to transport dispatch to avoid leaking internal metadata.

Changes:

  • Add Message.ProducerTopicHeaderName bag-key constant for preserving the publication topic across outbox persistence/dispatch.
  • Update WrapPipeline.Wrap and WrapPipelineAsync.WrapAsync to write the publication topic into Header.Bag when mapper/publication topics differ (guarded for null publication topic).
  • Update OutboxProducerMediator dispatch paths (sync/async/bulk) to resolve producer lookup topic from the bag and strip the bag entry before dispatch; add targeted tests.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/Paramore.Brighter/Message.cs Adds ProducerTopicHeaderName constant for an internal bag entry used during producer lookup.
src/Paramore.Brighter/WrapPipeline.cs Writes publication topic into Header.Bag when mapper overrides the message topic.
src/Paramore.Brighter/WrapPipelineAsync.cs Async equivalent of the wrap change to stash publication topic into Header.Bag.
src/Paramore.Brighter/OutboxProducerMediator.cs Uses the bag-stashed publication topic for producer lookup and strips it before dispatch (sync/async/bulk).
tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_Mapper*.cs Verifies bag population when wrapping Reply mappers (sync/async).
tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic*.cs Pins the null-publication-topic guard (sync/async).
tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor*.cs End-to-end post verifying dispatch and that the bag entry is stripped before sending.
tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs Covers bulk outbox clear path and asserts bag stripping across the batch.
tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/MyResponseMessageMapperAsync.cs Async mapper used by the new end-to-end/bulk tests to simulate Reply behavior.

Comment thread src/Paramore.Brighter/OutboxProducerMediator.cs Outdated
DevJonny pushed a commit that referenced this pull request Apr 22, 2026
Debug.Assert is stripped in Release, so it could not enforce the single-
producer invariant at runtime. Group by the composite key instead — this
guarantees every batch resolves to one producer without any assertion.

Addresses Copilot review feedback on PR #4070.

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 22, 2026

Code Review — PR #4070: Resolve producer lookup failure for Reply messages

Overall: The design is sound — using Header.Bag as a sidecar to carry the registered publication topic through the pipeline, then stripping it before wire dispatch, is a clean solution. The scope is minimal and the core fix is correct.

Previous reviews have already covered the InMemoryOutbox mutation hazard and Debug.Assert semantics. Two fresh findings below.


1. Stale comment in the bulk-dispatch test (Low — clarity)

When_Bulk_Dispatching_Reply_Messages_Async.cs (class-level comment):

// Gap: BulkDispatchAsync groups by Header.Topic (the mapper-set reply topic) but
// looks up the producer using the first message's bag … so a future refactor of
// GetProducerLookupTopic or the firstMessage assumption cannot silently regress …

Neither claim matches the current implementation. The GroupBy key is now (WireTopic, LookupTopic) — not Header.Topic alone — and the producer is looked up via topicBatch.Key.LookupTopic (from the composite group key), not the "first message's bag". This comment appears to describe an earlier draft of the fix. It should be updated to reflect what the code actually does, or removed if the code is self-descriptive.


2. producerSpans key collision when same wire topic maps to different producers (Low — edge case)

In BulkDispatchAsync:

producerSpans.TryAdd(topicBatch.Key.WireTopic, span);

The new GroupBy can produce two distinct batches that share the same WireTopic but differ by LookupTopic — for example, if two producers are each registered under a different topic but both happen to receive reply messages addressed to the same reply address. In that scenario TryAdd silently discards the span for the second batch and its end-span call is lost, leaving an uncompleted trace.

This is an extremely unlikely production scenario, but it is newly possible after the grouping change. If it matters, a ConcurrentDictionary with compound key (WireTopic, LookupTopic) would avoid the collision — though that would also require changing how spans are completed at the bottom of the method. A code comment calling out the assumption ("a given wire topic maps to at most one producer") would at least make the invariant explicit.


What's done well

  • Minimal code change (4 lines in WrapPipeline, ~30 in the mediator) with no impact on the happy path.
  • GetProducerLookupTopic falls back to Header.Topic when the bag entry is absent — correctly preserving behaviour for outbox rows persisted before the upgrade.
  • The null-publication-topic guard in both WrapPipeline variants is correct and pinned by dedicated tests.
  • Coverage is thorough: sync, async, bulk, and null-guard paths each have their own test.
  • ProducerTopicHeaderName is namespaced (paramore.brighter.ProducerTopic) to avoid bag-key collisions with application code.
  • Rolling-upgrade caveat is clear and actionable in the PR description.

Recommendation: Address the stale comment (#1) and optionally document the producerSpans invariant (#2) before merge. Neither is a blocker, but the inaccurate comment could mislead future readers.

Review by Claude (claude-sonnet-4-6)

@Jonny-Freemarket
Copy link
Copy Markdown
Contributor

Thanks for the fresh review — both findings addressed in a7d761d:

  1. Stale test comment — rewritten to describe the current (WireTopic, LookupTopic) grouping and the LookupTopic-based producer resolution.

  2. producerSpans key collision — the key is now "{WireTopic}|{LookupTopic}" so two batches that share a WireTopic but differ by LookupTopic both end their spans. In practice this is an extremely unlikely scenario (different publications producing coinciding reply addresses), but the composite key closes it cheaply.

On the InMemoryOutbox mutation hazard from earlier reviews — added a doc comment on StripProducerLookupTopic explicitly calling out that persistent outboxes (SQL/Mongo/Dynamo) re-hydrate per drain and are unaffected, while InMemoryOutbox stores the reference so a post-strip failure + retry would miss the producer. I stopped short of cloning the message on every dispatch since InMemoryOutbox is primarily dev/test; happy to add a clone if you think it's worth the extra allocation for production-use InMemoryOutbox scenarios.

codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 22, 2026

Code Review

Overview

This PR fixes a real bug in the request-reply pattern: when a Reply mapper sets Header.Topic to the dynamic reply address (not the registered producer topic), the outbox dispatcher fails to locate the producer. The fix is a two-part side-channel: the wrap pipeline stashes the publication topic in Header.Bag[ProducerTopicHeaderName], and the outbox dispatcher reads it for lookup then strips it before dispatch.

The approach is minimal and targeted. The fallback to Header.Topic when the bag entry is absent preserves pre-fix behaviour, meaning normal (non-reply) message flows are unaffected.


Potential Issue: Span dictionary key collision

In BulkDispatchAsync:

producerSpans.TryAdd($"{topicBatch.Key.WireTopic}|{topicBatch.Key.LookupTopic}", span);

Using | as a separator means two topic pairs like ("a|b", "c") and ("a", "b|c") produce the same composite key "a|b|c". In practice, topic names typically don't contain |, so this is unlikely to be hit — but a tuple-keyed dictionary would be unambiguous:

// Declare the dict as:
var producerSpans = new ConcurrentDictionary<(RoutingKey WireTopic, RoutingKey LookupTopic), Activity?>();
producerSpans.TryAdd(topicBatch.Key, span);

This is a minor concern but worth addressing since span cleanup relies on this key matching the corresponding TryRemove call later.


Design note: strip-before-retry hazard with InMemoryOutbox

The comment on StripProducerLookupTopic accurately documents this:

InMemoryOutbox stores the reference, so a dispatch that fails after strip and then retries via the outbox will fall back to Header.Topic and miss the producer — acceptable since InMemoryOutbox is primarily dev/test.

Agreed that this is acceptable for InMemoryOutbox. It's worth confirming the persistent outbox implementations (SQL, Mongo, DynamoDB) all deserialise into fresh objects rather than returning cached references — if any of them cache, this becomes a prod-level bug.


BulkDispatchAsync grouping is correct

The new grouping key (WireTopic, LookupTopic) correctly ensures that messages with the same wire destination but different registered producers land in separate batches. LINQ GroupBy buffers the source, so GetProducerLookupTopic is called exactly once per message at grouping time (not twice), and the strip iterating topicBatch a second time works because the groups are already materialised. This is sound.


Strip ordering is correct

In both Dispatch and DispatchAsync:

var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), ...);
StripProducerLookupTopic(message);

If LookupBy throws, strip is skipped — the dispatch fails completely, so the unsent message retains its bag entry (correct for retry). If lookup succeeds and strip throws (it won't — it's a Dictionary.Remove), the entry leaks to the wire, which is acceptable given how unlikely that path is.


Test coverage

Tests are comprehensive:

  • Sync and async Post paths
  • Bulk dispatch path (BulkDispatchAsync)
  • Null publication topic guard (both sync and async)
  • Strip assertions on dispatched messages

One gap: there is no test confirming that the ProducerTopic bag entry is absent from the outbox-stored message for persistent outboxes (confirming they deserialise fresh objects). Not a blocker, but would be a good integration test addition.


Rolling upgrade concern (documented, correct)

The PR description correctly identifies the rolling upgrade hazard: messages deposited by old instances won't carry the ProducerTopic bag entry, and new instances draining them will fall back to Header.Topic (reply address) and miss the producer. The mitigation (drain before upgrade) is reasonable. Consider adding a WARN-level log in GetProducerLookupTopic when it falls back to Header.Topic on a message whose topic doesn't match any registered producer — this would surface stuck rows more visibly during a rolling upgrade.


Summary

Area Assessment
Bug fix correctness ✅ Correct
Fallback behaviour ✅ Preserved
Wire leakage prevention ✅ Strip-after-lookup
Bulk dispatch ✅ Correct grouping
Test coverage ✅ Good, one gap noted
Rolling upgrade ⚠️ Documented, consider a fallback log
Span key collisions ⚠️ Minor, worth a tuple key

The core fix is solid. The two flagged items are minor — the span key collision is the only one worth changing before merge.

@Jonny-Freemarket
Copy link
Copy Markdown
Contributor

CI build job has failed twice on the same pre-existing flaky test — unrelated to this PR:

Paramore.Brighter.Core.Tests.CommandProcessors.Pipeline.CommandProcessorBuildDefaultInboxPublishAsyncTests.WhenInsertingADefaultInboxIntoTheSendPipeline [FAIL]
Assert.True() Failure

The test uses ExistsAsync<...>(timeoutInMilliseconds: 100) as its polling budget. Passes locally in ~55ms per test; CI loses the 100ms race under load. Touches nothing this PR modifies (default-inbox pipeline, not reply/producer lookup).

Could a maintainer re-run the build job? Alternatively, bumping that 100ms budget to something more CI-friendly (say 1s) in a separate commit would stop this recurring.

@DevJonny DevJonny force-pushed the fix/reply-producer-lookup branch from 63bda5a to 53996d4 Compare April 25, 2026 19:32
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 25, 2026

Code Review

Overview

This PR fixes a real and well-understood bug: reply message mappers set Header.Topic to the dynamic reply address, which caused the outbox dispatcher to fail its producer registry lookup. The design is sound — stash the original publication topic in the bag at wrap time, read it back at dispatch time, and let each transport strip it at the wire boundary. Clean separation of concerns.


Strengths

  • Thread-safe local-header registry — the copy-on-write HashSet<string> with Volatile.Read + CAS loop in RegisterLocalHeader is the right pattern for a write-rare/read-hot structure. Good implementation.
  • BagWithoutLocalHeaders() returns a copy — preserves the original bag entry so InMemoryOutbox-by-reference retries still find the producer. Intentional and correct.
  • GetProducerLookupTopic is clean — single-responsibility, isolated fallback logic. Good.
  • BulkDispatchAsync group key fix — grouping by (WireTopic, LookupTopic) is the right fix; using Uuid.NewAsString() for the span dictionary avoids the key collision problem cleanly.
  • Test coverage — sync + async paths, null publication topic guard, bulk dispatch path, and ASB wire-form stripping are all pinned. Well done.

Issues / Suggestions

1. [Trait("Fragile", "CI")] on the ASB unit test is incorrect

// tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_Converting_A_Message_With_Local_Headers.cs
[Trait("Category", "ASB")]
[Trait("Fragile", "CI")]
public class AzureServiceBusMessagePublisherLocalHeaderTests

This test does no I/O — it calls ConvertToServiceBusMessage on a locally-constructed message and checks the resulting ServiceBusMessage properties. It will never flake on CI infrastructure issues. The Fragile/CI trait is likely to cause it to be excluded from CI runs, which defeats the purpose of writing it. Please remove [Trait("Fragile", "CI")].

2. StripLocalHeaders() is mentioned in the PR summary but is absent from the diff

The PR description lists MessageHeader.StripLocalHeaders() as a new helper, but only BagWithoutLocalHeaders() appears in the code. If StripLocalHeaders() was deliberately dropped (since the design keeps the original bag intact), the summary should be updated to avoid confusion.

3. null publication topic still results in a silent dispatch failure

In WrapPipeline.Wrap:

if (message.Header.Topic != publication.Topic)
{
    Log.DifferentPublicationAndMessageTopic(...);
    if (publication.Topic is not null)        // guard: no bag entry written
    {
        message.Header.Bag[Message.ProducerTopicHeaderName] = publication.Topic.Value;
    }
}

When publication.Topic == null and the mapper sets its own topic, no bag entry is written. GetProducerLookupTopic then falls back to Header.Topic (the mapper-set reply address), and LookupBy fails — same as before the fix. The PR notes this in the deployment section, and the When_Wrapping_With_Null_Publication_Topic test pins the guard behaviour. This is acceptable; just confirming it is intentional and not an oversight.

4. Potential for ProducerTopicHeaderName spoofing from external senders

If a message arrives from an external system with paramore.brighter.ProducerTopic already in its bag (either accidentally or maliciously), and the consumer re-posts it via Brighter's outbox, GetProducerLookupTopic will use the bag value for producer lookup, potentially misrouting the message. The risk is low (consume → re-post flows are rare and the worst outcome is a lookup failure or wrong-producer dispatch, not data exfiltration), but worth noting as a trust-boundary consideration. One mitigation would be to only trust the bag entry when it was written by WrapPipeline in the same process — but that may be over-engineering for the current use case.

5. Missing explicit test for the rolling-upgrade fallback

The deployment note says "old outbox rows without the bag entry fall back to Header.Topic." This is exactly GetProducerLookupTopic's else-branch, but there's no test that directly asserts it. A simple unit test:

// Given a message with no ProducerTopic bag entry, GetProducerLookupTopic returns Header.Topic

...would make the rollback guarantee explicit and prevent a future refactor from silently breaking it. Low effort, high signal.

6. Minor: allocation per-send in BagWithoutLocalHeaders()

public Dictionary<string, object> BagWithoutLocalHeaders()
{
    var locals = Volatile.Read(ref s_localHeaderNames);
    var copy = new Dictionary<string, object>(Bag.Count);
    ...

This allocates a new Dictionary<string, object> on every message send for SNS/SQS/Redis transports. In normal-message (non-reply) flows where Bag contains no local headers, the result is a full copy of Bag with no entries removed. For most deployments this is negligible, but for high-throughput scenarios it's worth knowing. A fast-path guard (if (!locals.Overlaps(Bag.Keys)) return Bag;) could avoid the allocation entirely when there are no local headers present.


Summary

The core fix is correct and well-implemented. The main actionable items before merge are:

  1. Remove [Trait("Fragile", "CI")] from AzureServiceBusMessagePublisherLocalHeaderTests — this will exclude a pure unit test from CI.
  2. Update the PR summary to remove the reference to StripLocalHeaders() if it was dropped.
  3. Consider adding a one-liner fallback test for the rolling-upgrade code path (item 5 above).

Everything else above is informational. The design decision to strip at the transport boundary rather than centrally is the right call.

🤖 Reviewed with Claude Code

@claude
Copy link
Copy Markdown

claude Bot commented Apr 25, 2026

Code Review

Several prior reviews have covered the core design thoroughly — this pass focuses on what remains unresolved and one new observation.

Previous feedback addressed (good)

  • Task.Delay(500) timing anti-pattern removed; bulk test now awaits ClearOutstandingFromOutboxAsync directly
  • Grouping by (WireTopic, LookupTopic) tuple avoids the firstMessage assumption concern
  • Uuid.NewAsString() for span keys resolves the key-collision concern cleanly
  • "paramore.brighter.ProducerTopic" namespace prefix prevents bag key collisions
  • Braces added to the WrapPipeline inner guard

Still unresolved: [Trait("Fragile", "CI")] on the ASB unit test

Called out in the previous review and still in the diff:

// When_Converting_A_Message_With_Local_Headers.cs
[Trait("Category", "ASB")]
[Trait("Fragile", "CI")]
public class AzureServiceBusMessagePublisherLocalHeaderTests

The test does no I/O — it calls ConvertToServiceBusMessage on a locally-constructed Message and inspects the resulting ServiceBusMessage.ApplicationProperties. No service bus connection, no network call, nothing that can flake. The Fragile/CI trait will cause CI to skip the only transport-level test that pins local-header filtering, defeating the purpose of writing it. Please remove the trait.


New: is string topic check may silently fail for messages from persistent outboxes

// OutboxProducerMediator.cs - GetProducerLookupTopic
if (message.Header.Bag.TryGetValue(Message.ProducerTopicHeaderName, out var producerTopic)
    && producerTopic is string topic)
{
    return new RoutingKey(topic);
}

When a message is stored in a database outbox (SQL, DynamoDB, MongoDB, etc.), Header.Bag is serialised and later deserialised. With System.Text.Json, deserialising Dictionary<string, object> produces JsonElement values — not string values. The producerTopic is string topic check would then fail silently, the fallback to Header.Topic (the reply address) kicks in, and producer lookup fails — the same bug this PR is fixing.

All tests use InMemoryOutbox (holds messages by reference, no round-trip), so this would not be caught by the test suite.

Suggested fix: handle both runtime shapes:

private static RoutingKey GetProducerLookupTopic(Message message)
{
    if (message.Header.Bag.TryGetValue(Message.ProducerTopicHeaderName, out var producerTopic))
    {
        var topic = producerTopic switch
        {
            string s => s,
            System.Text.Json.JsonElement je
                when je.ValueKind == System.Text.Json.JsonValueKind.String => je.GetString(),
            _ => null
        };
        if (topic is not null)
            return new RoutingKey(topic);
    }
    return message.Header.Topic;
}

If Brighter's outbox serialisation already preserves the string runtime type via a custom ObjectJsonConverter, a comment documenting that guarantee would close the question.


Still missing: fallback-path unit test

The path where ProducerTopicHeaderName is absent (normal messages, rolling-upgrade scenario) falls back to Header.Topic. This was flagged in the previous review. A direct test of this path would make the backward-compatibility guarantee explicit and prevent a future refactor from silently breaking it.


Summary

Remove [Trait("Fragile", "CI")] and verify/fix the is string topic deserialisation behaviour before merging. The latter is the only scenario where the core fix fails silently — and it would affect all users with persistent outboxes.

Reviewed with Claude Code

codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 25, 2026

Code Review — PR #4070: fix: resolve producer lookup failure when posting Reply messages

Overview

A well-targeted fix for a real production failure: reply message mappers override Header.Topic to the dynamic reply address, causing the outbox dispatcher to fail its producer registry lookup. The fix introduces a side-channel bag entry (paramore.brighter.ProducerTopic) that the dispatcher reads during lookup while transports strip it before putting the message on the wire.

The design decision to preserve the bag entry on the original message reference (rather than stripping it in the mediator) is the right call — it means InMemoryOutbox-by-reference retries still work without a second lookup mechanism.


Strengths

  • Correct threading model for s_localHeaderNames: copy-on-write HashSet<string> with Volatile.Read + Interlocked.CompareExchange CAS loop is the right pattern for a set that is written rarely (startup) and read on every message. The HashSet is never mutated in-place after publication, so reads are truly lock-free.
  • Consistent transport sweep: all 9 transports that copy bag entries to their wire format are updated with the same filter, either inline or via BagWithoutLocalHeaders().
  • Graceful fallback: GetProducerLookupTopic falls back to Header.Topic when the bag entry is absent — non-reply messages and old outbox rows from a rolling upgrade continue to work as before (with the documented caveat for stuck reply rows).
  • Good test regression surface: sync/async paths, bulk dispatch, null publication topic guard, matching topic (no-bag-entry) fallback, and JSON round-trip type contract are all pinned.

Issues

1. AzureServiceBusMessagePublisher.ConvertToServiceBusMessage accessibility not shown in diff

The new test calls AzureServiceBusMessagePublisher.ConvertToServiceBusMessage(message) as a static method from a separate test assembly, but the diff for AzureServiceBusMessagePublisher.cs only shows the body change inside AddBrighterHeaders. The visibility change (to internal static + [InternalsVisibleTo], or public static) is not visible in this diff. Please confirm:

  • Was the method already accessible from the test assembly before this PR?
  • If not, the diff appears incomplete.

2. Overly verbose comment in GetProducerLookupTopic

// The `is string` cast is safe across persistent outboxes (SQL family,
// Mongo, DynamoDB) because Brighter's bag round-trip uses
// JsonSerialisationOptions.Options, which registers DictionaryStringObjectJsonConverter
// + ObjectToInferredTypesConverter — together they preserve the string runtime
// type through serialise/deserialise rather than handing back JsonElement.
// See When_Bag_String_Values_Round_Trip_Through_Brighter_Json_Options for
// a regression pin on that contract.

Per project convention, comments should be one short line max, only when the WHY is non-obvious. The multi-paragraph block and the reference to a specific test name (When_Bag_String_Values_Round_Trip...) belong in the PR description or commit message, not the source. A single line linking to DictionaryStringObjectJsonConverter would suffice, e.g.:

// `is string` not `as string`: DictionaryStringObjectJsonConverter preserves string
// type through persistent-outbox round-trips (avoids JsonElement).

3. Static global state can never shrink — test isolation caveat

RegisterLocalHeader permanently adds to the process-wide s_localHeaderNames set and there is no corresponding unregister. For the test suite this is fine (the test uses a class-unique key), but any consuming code that calls RegisterLocalHeader with a key derived from runtime configuration would permanently accumulate entries for the process lifetime. Consider documenting this in the RegisterLocalHeader XML doc (/// <remarks>Registrations are permanent for the process lifetime...</remarks>).

4. Span key type change in BulkDispatchAsync not obviously correct

Before: producerSpans.TryAdd(topicBatch.Key, span) where topicBatch.Key was a RoutingKey.
After: producerSpans.TryAdd(Uuid.NewAsString(), span) where the return is string.

If producerSpans is typed Dictionary<RoutingKey, ...>, this compiles only because RoutingKey has an implicit conversion from string. That implicit conversion makes the intent less clear than an explicit new RoutingKey(Uuid.NewAsString()). A minor nit but worth being explicit.


Minor observations

  • Braces added for single-statement if / sent blocks (e.g. if (sent) { ExecuteWithResiliencePipeline(... } ) — consistent with project style, good cleanup.
  • BulkDispatchAsync grouping on (WireTopic, LookupTopic) tuples is semantically correct: reply messages with distinct reply addresses land in separate batches and resolve separate producers. The only cost is smaller batches in high-reply-volume scenarios, which is unavoidable for correctness.
  • Transport coverage: only ASB has a wire-form strip test (AzureServiceBusMessagePublisherLocalHeaderTests). The same coverage for Kafka / RMQ / Redis / SNS / SQS would be valuable but likely requires broker infrastructure to test, so this is an acceptable gap for now.
  • Deployment note in PR description is clear and actionable. Consider adding it to CHANGELOG or migration notes if the project maintains one.

Summary

The fix is sound and the design is well thought-out. The main thing to verify before merging is the ConvertToServiceBusMessage accessibility point (item 1). Items 2–4 are polish-level concerns. Overall this is a clean, well-tested bug fix that correctly threads a tricky path between outbox-by-reference retries and wire-format cleanliness.

Jonny-Freemarket and others added 16 commits April 27, 2026 08:55
When posting a Reply, the message mapper sets the header topic to the
dynamic reply address (a GUID queue name). The outbox dispatcher then
fails to find the producer because it's registered under the static
publication topic (e.g. "Reply"), not the GUID.

Store the publication topic in the message header Bag during the wrap
pipeline when a topic mismatch is detected, then use it as the lookup
key in all three dispatch methods (sync, async, bulk).

Co-Authored-By: Claude (claude-opus-4-6) <noreply@anthropic.com>
… lookup

- bulk dispatch reply messages via outbox sweep (firstMessage bag lookup)
- wrap with null publication.Topic skips bag write (guard pin, sync + async)

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
- deterministic poll-until in bulk dispatch test (drop Task.Delay)
- document firstMessage invariant in BulkDispatchAsync
- clarify ProducerTopicHeaderName xml doc (bag key, not header)
- braces around single-line if in WrapPipeline + WrapPipelineAsync
- inline note on Log.DecoupledInvocationOfMessage divergence from lookup

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
- strip ProducerTopicHeaderName from Header.Bag after lookup in Dispatch,
  DispatchAsync, and BulkDispatchAsync so transports that serialise the
  bag (AMQP headers, SNS/SQS attributes) don't leak internal topology
- namespace bag key as "paramore.brighter.ProducerTopic" to eliminate
  collision risk with user-defined bag entries
- rename async wrap-reply test to end with _Async per suite convention
- extend reply post + bulk dispatch tests to pin the strip behaviour

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
…plexity flat

Moves the per-message strip loop into an IEnumerable<Message> overload of
StripProducerLookupTopic so the added foreach no longer counts against
BulkDispatchAsync's cyclomatic complexity (CodeScene delta warning).

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
…, test cleanup

- Debug.Assert in BulkDispatchAsync that every message in a topicBatch
  resolves to the same producer-lookup topic (makes the firstMessage
  assumption explicit and cheaply catchable)
- drop the poll loop in the bulk dispatch test; ClearOutstandingFromOutboxAsync
  awaits the dispatch so it completes before return
- shorten StripProducerLookupTopic xml doc to a one-line comment
- add a brief note in GetProducerLookupTopic explaining the null-topic fallback

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
The slnx was accidentally swept into an earlier commit with references
to samples/TaskQueue/ASBRequestReply/ projects that were never committed
to the branch, breaking CI build.

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
Debug.Assert is stripped in Release, so it could not enforce the single-
producer invariant at runtime. Group by the composite key instead — this
guarantees every batch resolves to one producer without any assertion.

Addresses Copilot review feedback on PR #4070.

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
…on, outbox note

- rewrite the stale class-level comment in When_Bulk_Dispatching_Reply_Messages_Async
  to match the current composite-key grouping
- disambiguate producerSpans key with the composite (WireTopic, LookupTopic) so
  two batches sharing a WireTopic but differing in LookupTopic can both end their
  spans cleanly
- document on StripProducerLookupTopic that the mutation is harmless for DB-backed
  outboxes but affects InMemoryOutbox retries (primarily dev/test scope)

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
…e span key

Addresses copilot review feedback on #4070:

- StripProducerLookupTopic now returns the prior bag value so each
  dispatch path restores it in a finally block when dispatch did not
  succeed. InMemoryOutbox stores the Message by reference, so without
  this a post-strip send failure would leave the outbox entry missing
  ProducerTopic and retry would fall back to Header.Topic.
- BulkDispatchAsync's producerSpans key is now Guid.NewGuid().ToString()
  rather than "{WireTopic}|{LookupTopic}" — routing keys may legally
  contain '|', which could collide and drop spans.

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
… paths

Per review feedback: the original comment focused on the fallback being
"pre-fix behaviour," which obscured *why* the bag entry exists. Rewrite
to call out the Reply path (mapper rewrites Header.Topic to a dynamic
reply address) and the normal-publication path (no bag entry, falls
back to Header.Topic).

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
Per review feedback (iancooper): the OutboxProducerMediator was stripping
the ProducerTopic bag entry around dispatch and restoring it on failure.
That mutated the message reference held by InMemoryOutbox, so on retry
the producer hint was gone.

Move stripping responsibility to the transport's wire-conversion step:

- MessageHeader.LocalHeaderNames: static set of bag keys that are internal
  to Brighter and must not be serialised onto the wire. Pre-populated
  with Message.ProducerTopicHeaderName; extensible from downstream code.
- MessageHeader.StripLocalHeaders(): instance method removing those keys
  from the bag (provided as a transport convenience).
- AzureServiceBusMessagePublisher: skip LocalHeaderNames when copying
  Header.Bag into ApplicationProperties.
- OutboxProducerMediator: drop StripProducerLookupTopic /
  RestoreProducerLookupTopic and the dispatched/batchDispatched book-
  keeping that existed only to drive the restore. The bag entry now
  survives a successful dispatch — InMemoryOutbox-by-reference keeps the
  producer hint for retries.

Tests updated: reply-message Post tests now assert the bag entry survives
dispatch (it's the transport's job to omit it on the wire). New unit
tests cover MessageHeader.StripLocalHeaders and the ASB publisher's
local-header skip.

Note: only the ASB transport has been migrated. Other transports that
serialise Header.Bag (RMQ, SNS/SQS, Kafka, …) will now leak the
paramore.brighter.ProducerTopic entry on the wire — benign for receivers
but a behaviour change. Follow-up work to apply the same pattern to those
transports is documented on LocalHeaderNames.

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
ASB stripped the ProducerTopic local header on the wire already (prior
commit). Apply the same pattern to every other transport that copies
Header.Bag onto its wire format so paramore.brighter.ProducerTopic
doesn't leak.

Inline-filter transports (existing skip-set extended with LocalHeaderNames):
- RMQ.Async / RMQ.Sync RmqMessagePublisher
- Kafka KafkaDefaultMessageHeaderBuilder
- GcpPubSub Parser
- RocketMQ RocketMqMessageProducer
- MessageScheduler.Azure AzureServiceBusScheduler

Whole-bag-as-JSON transports (use new MessageHeader.BagWithoutLocalHeaders):
- AWSSQS V3 + V4 SnsMessagePublisher
- AWSSQS V3 + V4 SqsMessageSender
- Redis RedisMessagePublisher

MessageHeader gains BagWithoutLocalHeaders() — returns a new dictionary
copy minus LocalHeaderNames — so transports that serialise the bag in
one shot (SNS / SQS / Redis emit it as a single JSON property) can hand
the filtered view to the serialiser without mutating the original
header (preserves InMemoryOutbox-by-reference retries).

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
…alHeader

The first cut exposed LocalHeaderNames as a public mutable HashSet so
transports could call .Contains directly. That let any caller remove the
framework-essential ProducerTopicHeaderName entry, and read/write races
were possible if a registration ran while a publisher was iterating.

Hide the storage and expose two operations:

- IsLocalHeader(name): lock-free read against a copy-on-write snapshot
  (Volatile.Read of the field). This is on the per-bag-entry hot path.
- RegisterLocalHeader(name): CAS loop that swaps in a new HashSet
  containing the additional key. Idempotent. Expected to be called once
  at startup from extension code.

The field itself becomes private; the snapshot is never mutated in place
after publication, so readers can iterate it freely without locking.
StripLocalHeaders and BagWithoutLocalHeaders snapshot once via
Volatile.Read at the start.

ImmutableHashSet was the natural fit but isn't in the netstandard2.0 BCL
and Brighter doesn't reference the package — emulating copy-on-write
with HashSet keeps the same semantics without adding a dependency.

Call sites in the 9 transports + the local-header tests switch to
MessageHeader.IsLocalHeader. New test covers RegisterLocalHeader
idempotency and that custom keys are honoured by BagWithoutLocalHeaders.

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
No transport calls it — they all use IsLocalHeader for inline filtering
or BagWithoutLocalHeaders for whole-bag JSON serialisation. Keeping the
method around invites callers to mutate the message's bag in place,
which would re-introduce the InMemoryOutbox-by-reference regression that
moving stripping out of the mediator just fixed.

Test class renamed MessageHeaderStripLocalHeadersTests →
MessageHeaderLocalHeadersTests and the two strip-specific cases rewritten
against BagWithoutLocalHeaders (also asserts the original header is
untouched, pinning the InMemoryOutbox-by-reference invariant).

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
…r contract, pin fallback

Three issues from claude[bot]'s latest review:

1. The new ASB unit test carried [Trait("Fragile", "CI")], copied by
   mistake from the live-ASB integration tests. The CI pipeline uses
   "Fragile!=CI" as its filter, so the trait skipped the only
   transport-level pin for local-header filtering. Drop it.

2. GetProducerLookupTopic uses `producerTopic is string topic`. Concern
   was that persistent outboxes round-trip Header.Bag through JSON, so
   values would come back as JsonElement and the cast would silently
   fail — reproducing the bug the PR fixes.

   Investigation shows Brighter's bag round-trip uses
   JsonSerialisationOptions.Options, which composes
   DictionaryStringObjectJsonConverter with ObjectToInferredTypesConverter
   to preserve string runtime types. Verified RelationDatabaseOutbox
   (covers MsSql, PostgreSQL, MySql, Sqlite, Spanner), MongoDb, and
   DynamoDB all use those options on deserialise.

   Document the contract on GetProducerLookupTopic and add
   BagStringValueRoundTripTests to pin the converter behaviour. If a
   future change drops one of the converters, the round-trip test fails
   loudly instead of GetProducerLookupTopic regressing silently.

3. Add WrapMatchingPublicationTopicTests to pin the fallback path: when
   a mapper does NOT override Header.Topic, no ProducerTopic bag entry
   is written and producer lookup falls back to Header.Topic. The
   existing test trio (reply-mapper-overrides, null-publication-topic,
   matching-topic) now covers all three branches of WrapPipeline.Wrap.

Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
@DevJonny DevJonny force-pushed the fix/reply-producer-lookup branch from a8ccf2c to b4406b9 Compare April 27, 2026 07:55
Copy link
Copy Markdown

@codescene-delta-analysis codescene-delta-analysis Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gates Failed
Enforce advisory code health rules (1 file with Complex Method, Complex Conditional)

Gates Passed
3 Quality Gates Passed

See analysis details in CodeScene

Reason for failure
Enforce advisory code health rules Violations Code Health Impact
RocketMqMessageProducer.cs 2 advisory rules 8.19 → 7.93 Suppress

Quality Gate Profile: Clean Code Collective
Install CodeScene MCP: safeguard and uplift AI-generated code. Catch issues early with our IDE extension and CLI tool.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 27, 2026

Code Review

Overview

This PR fixes a real bug: when a reply message mapper sets Header.Topic to the dynamic reply address, the outbox dispatcher could not find the registered producer because it was looking up by the mapped topic rather than the publication topic. The chosen approach — stashing the publication topic in Header.Bag at wrap time and filtering it at the wire boundary in each transport — is sound and well-scoped.


Correctness

WrapPipeline / WrapPipelineAsync — bag write

The write is correctly guarded for null publication topic, and the if (message.Header.Topic != publication.Topic) condition ensures the entry is only added when there is an actual topic mismatch. The async and sync pipelines are symmetric. ✅

OutboxProducerMediator.GetProducerLookupTopic

The fallback chain (bag entry → Header.Topic) is correct. The comment about is string being safe after JSON round-trip via DictionaryStringObjectJsonConverter is an important invariant — the regression pin in When_Bag_String_Values_Round_Trip_Through_Brighter_Json_Options correctly locks this down. ✅

BulkDispatchAsync grouping key change

Changing the group key from m.Header.Topic to the (WireTopic, LookupTopic) tuple is the right fix for the bulk path. Span keys correctly use Uuid.NewAsString() to avoid collision between reply messages routed to the same wire topic from different producers. ✅


Potential Issues

1. AzureServiceBusMessagePublisher.ConvertToServiceBusMessage visibility

AzureServiceBusMessagePublisherLocalHeaderTests calls AzureServiceBusMessagePublisher.ConvertToServiceBusMessage(message) as a static method. In AzureServiceBusScheduler.cs the equivalent method is private static — if AzureServiceBusMessagePublisher.ConvertToServiceBusMessage is also private, this test will not compile. The diff does not show a visibility change to that method. Please verify that either: (a) the method is already internal with an InternalsVisibleTo attribute pointing at the test assembly, or (b) the method was made internal/public as part of this PR and the diff context just doesn't show it.

2. BagWithoutLocalHeaders() allocates a new dictionary on every call

For SNS/SQS/Redis this is called once per message send, which is likely acceptable. However, if the Bag is empty or contains only local headers, the allocation is wasted. A minor optimisation would be to return _emptyBag in the all-local case, but this is not blocking.

3. RegisterLocalHeader is permanently global state

The method mutates static state that persists for the life of the process (including across tests in the same process). The test key in MessageHeaderLocalHeadersTests is namespaced to avoid cross-test pollution ("custom.local.header." + nameof(MessageHeaderLocalHeadersTests)), which is correct. Callers should be aware there is no UnregisterLocalHeader — document or enforce the "startup-only" contract explicitly, otherwise a library author might call it at request time and be surprised.

4. Deployment / rolling upgrade gap is only in the PR description

The note that old outbox rows persisted without ProducerTopic will fail on new instances for reply messages is important operational information. Consider adding it to a CHANGELOG entry or UPGRADING.md if the project maintains one, since PR descriptions are not always consulted during incident response.


Code Style

The comment density in OutboxProducerMediator and MessageHeader is higher than the project's "default to no comments" convention. Most of the comments added do explain non-obvious invariants (the is string cast, the InMemoryOutbox-by-reference contract, the CAS loop semantics), which is the correct exception criterion. A few explain what the code does rather than why:

// Log the wire topic (Header.Topic) — where the message is going.

This sentence restates what Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, ...) already makes clear from its arguments. It can be removed.

Similarly, the inline //assert - ... comments in the test methods are helpful as test documentation, but they are more verbose than the project's existing tests (e.g. When_Posting_A_Message_To_The_Command_Processor.cs). Not blocking, but worth noting for consistency.


Test Coverage

Coverage is thorough and well structured:

Scenario Test
Sync post of reply message When_Posting_A_Reply_Message_To_The_Command_Processor
Async post of reply message When_Posting_A_Reply_Message_To_The_Command_Processor_Async
Bulk dispatch of reply messages When_Bulk_Dispatching_Reply_Messages_Async
Wrap pipeline bag write (sync) When_Wrapping_A_Reply_Message_Mapper
Wrap pipeline bag write (async) When_Wrapping_A_Reply_Message_Mapper_Async
No bag entry when topics match When_Wrapping_A_Message_Whose_Topic_Matches_The_Publication
Null publication topic guard When_Wrapping_With_Null_Publication_Topic + async twin
BagWithoutLocalHeaders / IsLocalHeader / idempotency MessageHeaderLocalHeadersTests
ASB wire stripping AzureServiceBusMessagePublisherLocalHeaderTests
JSON round-trip regression pin When_Bag_String_Values_Round_Trip_Through_Brighter_Json_Options

One gap: there is no test covering the mixed bulk dispatch scenario — a batch containing both a reply message (with ProducerTopic bag entry) and a normal message (without it) that happen to share a wire topic. The grouping logic in BulkDispatchAsync should handle this correctly (they will land in different topicBatch groups since their LookupTopic differs), but a test would pin this edge case explicitly.


Summary

The fix is correct and the design decisions are well-reasoned. The main actionable items before merge:

  1. Verify ConvertToServiceBusMessage visibility is accessible from the ASB test project (likely already internal via InternalsVisibleTo, just not visible in the diff).
  2. Consider documenting the rolling-upgrade caveat in an UPGRADING.md or similar file.
  3. Minor: remove the // Log the wire topic comments that restate the call.
  4. Nice to have: add a mixed-batch test to When_Bulk_Dispatching_Reply_Messages_Async.

Overall this is a well-scoped, well-tested fix. LGTM pending the visibility verification.

Copy link
Copy Markdown
Member

@iancooper iancooper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much better, thanks for taking the time with this.

@Jonny-Freemarket
Copy link
Copy Markdown
Contributor

Than you for the review! This will massively help us.

@DevJonny DevJonny merged commit d593c02 into master Apr 27, 2026
29 of 31 checks passed
@DevJonny DevJonny deleted the fix/reply-producer-lookup branch April 27, 2026 08:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

2 - In Progress Bug .NET Pull requests that update .net code V10.X

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants