Skip to content

fix(agent-module): correct MqttRelay availability topic emission#142

Draft
ottobolyos wants to merge 29 commits intoTrakHound:masterfrom
ottobolyos:fix/issue-135
Draft

fix(agent-module): correct MqttRelay availability topic emission#142
ottobolyos wants to merge 29 commits intoTrakHound:masterfrom
ottobolyos:fix/issue-135

Conversation

@ottobolyos
Copy link
Copy Markdown

@ottobolyos ottobolyos commented Apr 25, 2026

Summary

Fixes #135 — the MqttRelay agent module published Agent Availability on {TopicPrefix}/Probe/{agent-uuid}/Available with a raw AVAILABLE / UNAVAILABLE string, breaking the contract that {TopicPrefix}/Probe/# carries only JSON envelopes.

  • Relocate the Availability last-will-and-testament topic and the on-connect retained-message topic to {TopicPrefix}/Agent/{agent-uuid}/Available, outside the Probe wildcard.
  • Validate AvailabilityTopic configuration inputs and reject empty / whitespace / wildcard-segment values before the relay starts.
  • Guard MqttRelay.OnStop against a NullReferenceException when the relay shuts down in Entity mode.
  • Bound the shutdown disconnect await so a stuck broker cannot hang Agent termination.
  • Make the last-sent-sequence counter atomic on 64-bit access and back the persister with an in-memory store + timed flush so concurrent observation publishing cannot tear the value or thrash the I/O path.
  • Group observations in a single pass during MqttRelay publish so per-tick CPU stays linear in the observation count.
  • Wrap the relay's async-void event handlers with a fault-logging shim and log unexpected Worker faults from the outer catch so silent dead-thread regressions surface in the agent log.
  • Guard the missed-observation counter against integer underflow when a sequence reset arrives before the persister has flushed.
  • Add MTConnect.NET-AgentModule-MqttRelay-Tests with NUnit coverage of the topic-construction helper, a structural guard refusing any future availability topic containing /Probe/, and unit pins for the shutdown / fault-handling / atomic-counter / single-pass-grouping / in-memory-persister contracts.
  • Document the operator migration path in docs/testing/issue-135.md.
  • Breaking: operators that subscribe to {TopicPrefix}/Probe/{agent-uuid}/Available for the raw-string availability state must move their subscription to {TopicPrefix}/Agent/{agent-uuid}/Available. Subscribers of the Probe/# wildcard now receive only JSON envelopes.

Adds docs/testing/issue-135.md with the per-phase writeup index for
the MqttRelay availability topic fix and seeds the
docs/testing/issue-135/ subfolder with the phase-00 foundation
writeup.
Records the publish-surface investigation, the cppagent reference
posture, and the strategy decision (relocate out of the Probe
wildcard) for the issue-135 fix.
Extracts the agent availability topic-construction logic in the
MqttRelay module to a new public static helper class
AvailabilityTopic.Build(topicPrefix, agentUuid). Module.cs's
GetAgentAvailableTopic delegates to the helper. No behavior change:
the helper still emits the existing
{TopicPrefix}/Probe/{AgentUuid}/Available shape so the broken topic
contract reported in TrakHound#135 is preserved for the
incoming red-test commit. The subsequent fix flips the helper's
output to the corrected shape.
@ottobolyos ottobolyos force-pushed the fix/issue-135 branch 2 times, most recently from 761e52c to 57faf86 Compare April 28, 2026 10:56
Adds tests/MTConnect.NET-AgentModule-MqttRelay-Tests/ NUnit test
project covering the agent module's availability topic-construction
helper. The fixture pins the corrected topic shape that resolves
TrakHound#135:
{TopicPrefix}/Agent/{AgentUuid}/Available rather than the broken
{TopicPrefix}/Probe/{AgentUuid}/Available shape under the Probe
wildcard. The fixture also covers null and empty inputs plus the
public AvailableSegment constant so coverage of AvailabilityTopic.cs
is 100 percent once the fix flips the Probe segment to Agent.

Three of the eight tests are red against the prior commit (the
extracted helper still emits the broken topic shape). The fix in the
following commit makes them green.

Registers the new project in MTConnect.NET.sln so the solution-level
build picks it up.
Relocates the MqttRelay agent module's MQTT Last Will and Testament
topic plus its on-connect retained Available message out of the
{TopicPrefix}/Probe/# wildcard. AvailabilityTopic.Build now emits
{TopicPrefix}/Agent/{AgentUuid}/Available instead of
{TopicPrefix}/Probe/{AgentUuid}/Available. The Probe wildcard
therefore carries only JSON document envelopes, restoring the
contract a subscriber wildcarding Probe/# relies on (the prior shape
crashed JSON parsers on the raw "AVAILABLE" / "UNAVAILABLE" UTF-8
string payload).

Closes TrakHound#135

BREAKING CHANGE: operators that subscribed directly to
{TopicPrefix}/Probe/{AgentUuid}/Available for the raw availability
state must move their subscription to
{TopicPrefix}/Agent/{AgentUuid}/Available.
Adds a regression fixture that guards against re-introduction of the
broken availability topic shape resolved in
TrakHound#135. The fixture covers a parametric matrix
of topic prefixes and agent uuids (including adversarial inputs
where the prefix or the uuid is literally the string "Probe") and
asserts that the dedicated availability segment stays the
AvailabilityTopic.AgentSegment constant rather than collapsing back
to the Probe topic constant.
Adds per-phase writeups for the red tests (phase 02), library fix
(phase 03), regression pins (phase 04), end-to-end validation
posture (phase 05), and the closing summary (phase 06) of the
MqttRelay availability topic fix. Each writeup records executed
work, metrics deltas, deviations from the original plan, and
follow-ups.
Removes a stray reference to a gitignored plans path from the
phase 04 regression-pins writeup. The reference resolved only off
the public tree.
The docs/testing/issue-135/ subtree carried phase-by-phase campaign writeups that
referenced internal tooling (CONVENTIONS rule-book, internal section
numbers, extra-files.user/ paths, internal tracker terminology). Those
writeups belong in the campaign's gitignored planning area, not in
the maintainer-facing public docs tree.
Add NUnit coverage for the MQTT 3.1.1 reserved-character rules
(section 4.7.1.1) on AvailabilityTopic.Build inputs, plus
canonicalisation of leading and trailing slashes in the topicPrefix.
The current Build implementation does not validate or canonicalise
its inputs, so the new tests fail and pin the corrected contract.
Reject MQTT-reserved characters ('+', '#', '\0') in topicPrefix and
agentUuid per MQTT 3.1.1 §4.7.1.1; reject '/' inside agentUuid since
it is a single topic segment; canonicalise leading and trailing '/'
in topicPrefix so the resulting topic stays canonical and never
emits a stray empty segment.
Add NUnit coverage for a RelayBufferDiagnostics.ComputeMissed helper
that exposes the missed-observations diagnostic computation in
RelayBufferedObservations. The Module currently inlines
  long missed = (long)(to - lastSent);
which underflows when lastSent > to. The new helper does not yet
exist so the tests fail and pin the corrected contract.
Extract the missed-observation diagnostic into
RelayBufferDiagnostics.ComputeMissed and route the Module through
the helper. ComputeMissed returns 0 when lastSent >= to so a stale
last-sent-sequence file or a rolled broker sequence no longer
produces a huge spurious "missed" figure in the diagnostic log.
The MqttRelay agent module used to publish agent Availability under the
{TopicPrefix}/Probe/# wildcard (specifically
{TopicPrefix}/Probe/{AgentUuid}/Available with a raw UTF-8 string
payload). That broke the wildcard's pure-JSON contract for any
subscriber binding {TopicPrefix}/Probe/#. The module now publishes that
same Availability state on a dedicated
{TopicPrefix}/Agent/{AgentUuid}/Available topic, matching the cppagent
reference implementation.

Add a Migration section to the NuGet README so operators upgrading the
package see the topic move and the subscriber action required, rather
than only the raw API-shape contract pinned by the unit tests.
Module.OnStop() unconditionally invoked _documentServer.Stop(). In
Entity-mode the constructor only initialises _entityServer, so
_documentServer was null and the shutdown raised a
NullReferenceException, masking the real shutdown reason in the host
service event log.

Pin the lifecycle policy via a focused helper:

* StopServers must be a total function over (null, null).
* StopServers must invoke whichever stop action is provided.
* A throwing document-server stop must not prevent the entity-server
  stop from running, so a partial shutdown does not leak handlers.

This commit only adds the failing tests against MqttRelayLifecycle so
that the helper is introduced under TDD in the follow-up fix commit.
OnStop() unconditionally called _documentServer.Stop(). When the module
was constructed with TopicStructure=Entity only _entityServer was
initialised, so the call raised NullReferenceException during agent
shutdown and masked the real shutdown reason in the host service event
log.

Introduce MqttRelayLifecycle.StopServers, a small policy helper that
takes optional per-server stop actions, tolerates null targets, and
isolates each stop in its own try/catch so a throw on one path cannot
leave the other server running. Route OnStop() through the helper so
the null-guard is enforced uniformly and is unit-testable without
standing up an MTConnect agent broker. Pinned by
MqttRelayLifecycleStopTests.
Module.OnStop() previously invoked _mqttClient.DisconnectAsync(...) as a
fire-and-forget task: the returned Task was never awaited, any fault on
the disconnect path was silently dropped, and the host process risked
exiting before the disconnect actually completed. That hid broker
errors at shutdown from the operator.

Pin the lifecycle disconnect policy via a focused helper:

* Successful disconnect must not invoke the fault logger.
* A faulted disconnect Task must surface its exception to the logger.
* A disconnect that never completes must not hang shutdown; the helper
  bounds the wait and treats the timeout as best-effort success.
* A factory that throws synchronously must be caught and routed to the
  fault logger.
* A null disconnect factory must no-op (worker never ran path).

Tests fail to compile until DisconnectWithTimeout is introduced in the
follow-up fix commit.
OnStop() called _mqttClient.DisconnectAsync(...) as a fire-and-forget
task. The returned Task was never awaited, faults were silently
dropped, and the host process risked exiting before the disconnect
completed. Operators diagnosing a hung shutdown could not see the real
broker error.

Add MqttRelayLifecycle.DisconnectWithTimeout: a bounded synchronous
wait on the disconnect Task with a fault-logging callback. Synchronous
throws and Task faults route to the logger; a wait that elapses bails
out best-effort and attaches an OnlyOnFaulted continuation so a late
fault is not swallowed. Route OnStop() through the helper with a five
second bound and a Warning-level log line. Pinned by
MqttRelayLifecycleDisconnectTests.
Module.cs read and wrote the 64-bit _lastSentSequence field without
Interlocked. On 32-bit hosts that is not atomic: a concurrent reader
can observe a torn value (one half from the previous value, one from
the new). MqttRelay reads the field from observation event handlers
(multiple ThreadPool threads) while the durable-relay Worker writes it.
A torn read can log a wildly wrong "unsent" figure and propagate to the
persisted last-sent-sequence file, causing buffered observations to be
skipped or re-sent on the next reconnect.

Pin the atomic-access policy via a focused tracker:

* Fresh tracker reads zero.
* Write round-trips through Read.
* Full ulong range (including high-bit values that map to negative long
  on Interlocked.Read) round-trips losslessly.
* Last write wins: the tracker is not a max-watermark.
* Concurrent writer/reader smoke test: the reader never observes a
  half-torn value (high half not as written by the writer).

Tests fail to compile until LastSentSequenceTracker is introduced in the
follow-up fix commit.
Module.cs read and wrote the 64-bit _lastSentSequence field without
Interlocked. On 32-bit hosts a 64-bit field is read and written as two
32-bit halves and a concurrent reader can observe a torn value.
MqttRelay reads the counter from observation event handlers (multiple
ThreadPool threads) while the durable-relay Worker writes it. A torn
read could log a wildly wrong "unsent" figure and propagate to the
persisted last-sent-sequence file.

Introduce LastSentSequenceTracker, a small class that wraps every
read/write in Interlocked.Read / Interlocked.Exchange so the access is
atomic on every supported runtime. Replace the bare _lastSentSequence
field with the tracker and route all three call sites
(RelayBufferedObservations diagnostic compute and AgentObservationAdded
durable-relay path) through Read/Write. Pinned by
LastSentSequenceTrackerTests including a concurrent writer/reader
smoke check that asserts no torn high-half values are observed.
Module.cs exposes seven event handlers the agent broker invokes as
async void. Three (AgentDeviceAdded, AgentObservationAdded,
AgentAssetAdded) had no top-level try/catch; the other four guarded
only the inner publish. An async-void method that throws routes its
exception to the synchronization context, which on the ThreadPool
tears down the host process. A formatter throw or a synchronous broker
call (DataItem null-deref, broker shutting down) crashed the agent.

Pin the safety policy via a focused guard helper:

* Successful body must not invoke the fault logger.
* Synchronous throw must route to the logger.
* Async throw (Task fault) must route to the logger.
* Null logger must not rethrow: the guard absorbs the fault either way.
* Throwing logger must not corrupt the guard contract.
* Null body must no-op (defensive against a mis-wired handler).

Tests fail to compile until AsyncVoidGuard is introduced in the
follow-up fix commit.
Module.cs exposes seven event handlers the agent broker invokes as
async void. Three of them (AgentDeviceAdded, AgentObservationAdded,
AgentAssetAdded) had no top-level try/catch; the other four
(ProbeReceived, CurrentReceived, SampleReceived, AssetReceived)
guarded only the inner publish but not the formatter call. An async
void method that throws routes its exception to the synchronization
context, and on the ThreadPool that tears down the host process. So a
formatter throw, a synchronous broker call (DataItem null-deref,
broker shutting down), or an unexpected publish error crashed the
agent.

Add AsyncVoidGuard.Run, a tiny helper that awaits a delegate and
routes any synchronous or async exception to a logging callback,
never rethrowing. Wrap every async void handler with it. The four
formatter handlers are split into a slim async void shell plus an
async Task *Core method so the existing inner try/catch logic stays
intact while the Core call itself is now guarded. Pinned by
AsyncVoidGuardTests.
The Worker do/while loop in Module.cs previously swallowed any
unexpected exception escaping the inner try/catch with a bare
"catch (Exception) { }". The relay quietly entered the reconnect
delay branch and the underlying defect (a throw inside the inner
finally, or an oversight in connection handling) went undiagnosed for
the lifetime of the agent.

Pin the policy via WorkerLoopExceptionLogger:

* TaskCanceledException is the orderly-shutdown signal; do not log.
* OperationCanceledException (parent type) is also a shutdown signal.
* Any other exception is genuinely unexpected and must be logged with
  the exception type name and message.
* Null exception is a no-op; null callback is tolerated.

Tests fail to compile until the helper is introduced in the follow-up
fix commit.
The concurrent writer/reader smoke test asserted observed > 0 sample
hits as a sanity check, but on a fast machine the writer completed
before the reader took its first sample, failing the assertion
without indicating a real defect. The atomicity assertions inside the
reader loop are the load-bearing checks; the sample-count sanity was
not.

Drop the brittle observed-count assertion. Increase the iteration
count (5_000 -> 100_000) so even a fast writer cannot beat the
reader, and add a ManualResetEventSlim startGate so both tasks begin
under the same observation window. The torn-read assertions inside
the reader loop are unchanged and still load-bearing.
The Worker do/while outer-catch in Module.cs was a bare empty
"catch (Exception) { }". Any unexpected exception escaping the inner
try/catch (a throw inside the inner finally, an oversight in
connection handling) was silently swallowed. The relay quietly
entered the reconnect-delay branch and the underlying defect went
undiagnosed for the lifetime of the agent.

Introduce WorkerLoopExceptionLogger.Log: log any non-cancellation
exception at Warning with the exception type name and message.
TaskCanceledException and OperationCanceledException stay silent
because they are the orderly-shutdown signal. Route the outer-catch
through the helper so the policy is unit-testable. Pinned by
WorkerLoopExceptionLoggerTests.
Module.PublishObservations iterated the input enumerable up to three
times per distinct DataItemId (Distinct, Where, FirstOrDefault). For
n observations across k distinct data items the cost is O(n*k), and
each iteration may re-execute a deferred upstream query (the broker's
observation enumerator). On large agents (thousands of observations
across hundreds of data items) the catch-up after a reconnect was
materially slowed.

Pin the grouping policy via a focused helper:

* Null and empty input return empty.
* Distinct DataItemId values become distinct groups.
* The source enumerable is iterated exactly once (smoke-tested with a
  counting iterator).
* Encounter order is preserved within each group so callers relying
  on sequence-monotonic ordering are not broken.
* Null DataItemId is tolerated as its own (null-keyed) group rather
  than crashing the enumeration.

Tests fail to compile until ObservationGrouper is introduced in the
follow-up fix commit.
Module.PublishObservations iterated the input enumerable up to three
times per distinct DataItemId (Distinct, Where, FirstOrDefault). For
n observations across k distinct data items the cost was O(n*k) and
each iteration could re-execute a deferred upstream broker query. On
large agents that throttled the catch-up after a reconnect.

Introduce ObservationGrouper.GroupByDataItem: materialise the source
once and group by DataItemId, returning IGrouping<string, T> entries
that preserve encounter order within each group. Refactor
PublishObservations to use a foreach over groups, materialise each
group once into a List, and dispatch CONDITION versus single-value
data items off the first item's category. Extract the
IObservationOutput -> Observation copy into a static
CloneAsObservation helper so the condition and non-condition paths
share the projection. Pinned by ObservationGrouperTests.
Module.cs synchronously wrote to disk from every successful
observation publish in AgentObservationAdded:

    File.WriteAllText(path, seq.ToString());

Under high-rate observation arrival under DurableRelay that put a
synchronous disk write on every event-handler invocation, throttling
the relay. The fix is to track the value in memory and flush only on
a timer, on shutdown, and at batch boundaries.

Pin the persister policy:

* Update marks dirty and round-trips through Read.
* Read does not clear dirty: only Flush establishes durable state.
* TryFlush emits exactly one writer call when dirty, and clears dirty
  on success. Returns true if a write happened.
* TryFlush no-ops when clean (avoids burning IOPS on idle ticks).
* TryFlush keeps dirty when the writer throws so the next tick
  retries.
* Initialize seeds the in-memory value from disk without marking
  dirty (no redundant flush after startup).
* Last write wins (not a max-watermark).
* Null writer is tolerated.

Tests fail to compile until LastSentSequencePersister is introduced
in the follow-up fix commit.
Module.cs synchronously read and wrote the last-sent-sequence file
from the AgentObservationAdded handler on every observation under
DurableRelay. On a high-rate stream that put a synchronous disk read
plus a synchronous disk write on every ThreadPool callback,
serialising the relay behind disk IO and crashing the host's IOPS
budget.

Wire LastSentSequencePersister into Module:

* OnStartAfterLoad seeds the persister from ReadLastSentSequenceFromDisk
  (the only remaining hot disk read) and starts a one-second flush
  timer that calls FlushLastSentSequence.
* RecordLastSentSequence (replacing SetLastSentSequence) just updates
  the in-memory persister; no IO on the hot path.
* FlushLastSentSequence runs the actual File.WriteAllText under
  _lastSentSequenceLock and only when the persister is dirty.
* RelayBufferedObservations reads in memory, calls
  RecordLastSentSequence per observation, and flushes once at the
  batch boundary so a crash before the next timer tick does not lose
  the batch's progress.
* OnStop disposes the timer first, then flushes one last time, so a
  clean shutdown does not lose pending progress.
* AgentObservationAdded no longer issues File.ReadAllText per event;
  the durable-relay path becomes pure in-memory work.

Pinned by LastSentSequencePersisterTests.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

mqtt-relay module publishes Agent Availability on a non-canonical {prefix}/Probe/{agent-uuid}/Available four-segment topic

1 participant