Skip to content

fix: Use QoS::AtLeastOnce for shadow/provisioning/OTA control topics#122

Merged
KennethKnudsen97 merged 1 commit into
masterfrom
feat/shadow-qos-at-least-once
May 4, 2026
Merged

fix: Use QoS::AtLeastOnce for shadow/provisioning/OTA control topics#122
KennethKnudsen97 merged 1 commit into
masterfrom
feat/shadow-qos-at-least-once

Conversation

@KennethKnudsen97
Copy link
Copy Markdown
Contributor

Summary

  • Bump request/response topic pairs (shadow get/update/delete, fleet-provisioning register, OTA streaming control) from AtMostOnce to AtLeastOnce.
  • Switch the matching publishes to publish_with_options(...QoS::AtLeastOnce) so the publish QoS matches the subscription.

A transient drop between SUBSCRIBE and the broker's accepted/rejected reply silently loses the response with QoS 0, parking the caller on next_message().await. AtLeastOnce gives the broker a chance to redeliver.

Split out from the broader subscription/disconnect work (rustot #121) so it can land independently.

Test plan

  • cargo check (default features) passes
  • Existing shadow + provisioning integration tests still pass
  • Hardware: shadow round-trips work normally; no regressions during reconnect cycles

…opics

Bump the QoS on the request/response topic pairs from AtMostOnce to
AtLeastOnce so a transient drop between SUBSCRIBE and the broker's reply
doesn't silently lose the response and park the caller forever.
@KennethKnudsen97 KennethKnudsen97 merged commit da4c4b0 into master May 4, 2026
5 checks passed
@KennethKnudsen97 KennethKnudsen97 deleted the feat/shadow-qos-at-least-once branch May 4, 2026 09:18
MathiasKoch pushed a commit that referenced this pull request May 5, 2026
…age (#123)

* fix: Use QoS::AtLeastOnce for shadow, provisioning, and OTA control topics

Bump the QoS on the request/response topic pairs from AtMostOnce to
AtLeastOnce so a transient drop between SUBSCRIBE and the broker's reply
doesn't silently lose the response and park the caller forever.

* wip: Surface MQTT disconnect on Subscription::next_message

Watch the broker connection state from each Subscription so an
ungraceful drop wakes parked next_message() callers with None instead
of hanging forever. rumqttc impl uses tokio::sync::watch.

Builds on #122. Issue #121.

* fix(mqtt): Tighten clean-session semantic across backends

Only invalidate Subscriptions when the broker actually drops our subs
(CONNACK with session_present=false). Transient disconnects and
session-resume reconnects keep subs valid.

- mqttrust backend: bump rev to pick up FactbirdHQ/mqttrust#XX, which
  surfaces the same signal via clean_session_count.
- rumqttc backend: track a watch::Sender<u8> bumped only on
  session_present=false; subscriptions race recv against changed().
  Replaces the over-aggressive "any state change → None" behavior.
- Trait doc clarified.

Refs #121

---------

Co-authored-by: Kenneth Sylvest Knudsen <ksk@factbird.com>
MathiasKoch added a commit that referenced this pull request May 6, 2026
…125)

Same pattern as #122 — bump remaining request/response topic pairs
from AtMostOnce to AtLeastOnce so a transient broker disconnect
between SUBSCRIBE and the broker's accepted/rejected reply doesn't
silently drop the response, and so QoS-1-spooled publishes survive
a brief reconnect window instead of being dropped.

* jobs/stream.rs: bump JobAgent::subscribe (notify-next +
  describe-accepted), the describe publish, report_progress publish,
  and publish_and_wait's update/accepted+rejected subscriptions.
* commands/stream.rs: bump CommandAgent::subscribe (executions/+/request)
  and report_in_progress publish.
* defender_metrics/mod.rs: bump publish_and_subscribe's accepted+rejected
  subs and the metric publish.

Transfer's data_interface stays at QoS 0 — those are high-volume OTA
data blocks with their own retry semantics, same as before #122.

Discovered during factbird-edge factory_reset end-to-end testing:
job-manager's report_progress publish ~5–10s after a deployment-driven
cleanup returned `JobError::Mqtt` because the broker had briefly
dropped the connection during the cleanup, and QoS 0 publishes were
silently lost.
KennethKnudsen97 added a commit that referenced this pull request May 6, 2026
…ums (#124)

* fix: Use QoS::AtLeastOnce for shadow, provisioning, and OTA control topics

Bump the QoS on the request/response topic pairs from AtMostOnce to
AtLeastOnce so a transient drop between SUBSCRIBE and the broker's reply
doesn't silently lose the response and park the caller forever.

* wip: Surface MQTT disconnect on Subscription::next_message

Watch the broker connection state from each Subscription so an
ungraceful drop wakes parked next_message() callers with None instead
of hanging forever. rumqttc impl uses tokio::sync::watch.

Builds on #122. Issue #121.

* fix(mqtt): Tighten clean-session semantic across backends

Only invalidate Subscriptions when the broker actually drops our subs
(CONNACK with session_present=false). Transient disconnects and
session-resume reconnects keep subs valid.

- mqttrust backend: bump rev to pick up FactbirdHQ/mqttrust#XX, which
  surfaces the same signal via clean_session_count.
- rumqttc backend: track a watch::Sender<u8> bumped only on
  session_present=false; subscriptions race recv against changed().
  Replaces the over-aggressive "any state change → None" behavior.
- Trait doc clarified.

Refs #121

* fix(shadows): support opaque newtype variants in adjacently/externally-tagged enums

Marking a newtype enum variant with `#[shadow_attr(opaque)]` previously
failed to compile (E0119: conflicting From impls) and, even with that
worked around, the generated Reported serializer wrote the content as an
empty nested map instead of the scalar value.

- Skip the `From<<T as ShadowNode>::Reported> for Reported<Enum>` and
  matching Delta impls when the variant field is opaque. For leaf types
  where Delta = Reported = Self, the projection collapses to
  `From<X> for X` and collides with `impl<T> From<T> for T`.
- Emit a flat serialize arm for opaque variants that writes the inner
  value directly under the content key, producing
  `{"mode":"manual","apn":"onomondo"}` instead of `{"…":"…","apn":{}}`.
- Drop opaque variants from the inactive-variant null-padding list so
  unit variants no longer emit an empty content map alongside opaque
  siblings.
- Apply the same opaque gating to the externally-tagged enum codegen
  for symmetry.

Tests cover flat opaque serialize, unit-variant-only serialize, parse +
apply round-trip, mixed opaque/struct enums, and a compile-only check
that From impls remain for non-opaque siblings.

* fix(shadows): detect leaf newtype variants transparently

Replace the codegen-time `#[shadow_attr(opaque)]` gate on adjacently- and
externally-tagged enum newtype variants with a const-time dispatch on
`<T as ReportedFields>::FIELD_NAMES`. Any inner type whose ShadowNode
impl sets `Reported = Self` (e.g. anything reachable via `impl_opaque!`
or the leaf impls under `src/shadows/impls/`) now works without
annotation.

- Serializer dispatches scalar vs nested map per-variant via
  `FIELD_NAMES.is_empty()`. Unit variants additionally const-fold over
  every data sibling to decide whether to emit a (possibly null-only)
  content map at all — all-leaf siblings → no content key.
- Drop the From-impl gate. The earlier collision claim was wrong:
  `<T as ShadowNode>::Reported` does not normalize during coherence, so
  the projection-typed impl never overlaps with `impl<T> From<T> for T`
  even when `Reported = Self`. Leaf variants now also gain
  `#[builder(into)]` ergonomics.
- Stop tracking `is_opaque` in `variants_with_data`; everything the flag
  used to gate is now driven by the trait projections at the use site.

Tests reuse generic fixtures (`LeafEnum`, `MixedEnum`, `IntEnum`,
`ToggleConfig`) instead of project-specific fields, and exercise the
transparent path with bare `heapless::String<N>` and `u32` newtype
variants — no `#[shadow_attr(opaque)]`.

* test/codegen: revert leaf→opaque rename

Match the existing terminology used by `#[shadow_attr(opaque)]`,
`impl_opaque!`, and `src/shadows/impls/opaque.rs` — there is no reason
to introduce a parallel "leaf" vocabulary in the codegen comments and
test names. The structural signal is still
`<T as ReportedFields>::FIELD_NAMES.is_empty()`; only the wording
changes.

* fix(shadows): gate adj-tagged From impls on shadow_attr(opaque)

Opaque newtype variants caused E0119 in external crates because the
projection <T as ShadowNode>::Reported collapses to T, making the
emitted impl overlap core's blanket From<T> for T. Mark the variant
field #[shadow_attr(opaque)] to emit From<#inner_ty> directly.

* fix(shadows): require shadow_attr(opaque) for adj-tagged opaque variants

Drop the const-eval trait-based detection of opaque inner types in the
serialize arm. Both the From-impl gating and the serialize arm now
dispatch on `#[shadow_attr(opaque)]` at macro-expansion time, removing
the sharp edge where in-crate consumers worked but external consumers
hit E0119 because rustc only normalizes the projection through local
trait impls.

- Collapse `__INNER_IS_OPAQUE` const-eval into a codegen-time if/else.
- Collapse `__HAS_NON_OPAQUE_SIBLING` const-eval into a plain bool;
  filter `inactive_variant_field_nulls` to non-opaque siblings up front.
- Drop the dead ContentWrapper definition from opaque arms.
- Annotate the affected test fixtures (Label, Named, Number) with
  `#[shadow_attr(opaque)]` to exercise the opaque path explicitly.
- Document the variant-field meaning of `#[shadow_attr(opaque)]` on
  OpaqueSpec.

---------

Co-authored-by: Kenneth Sylvest Knudsen <ksk@factbird.com>
Co-authored-by: Mathias Koch <mk@factbird.com>
MathiasKoch added a commit that referenced this pull request May 6, 2026
* fix: Use QoS::AtLeastOnce for jobs/commands/defender control topics

Same pattern as #122 — bump remaining request/response topic pairs
from AtMostOnce to AtLeastOnce so a transient broker disconnect
between SUBSCRIBE and the broker's accepted/rejected reply doesn't
silently drop the response, and so QoS-1-spooled publishes survive
a brief reconnect window instead of being dropped.

* jobs/stream.rs: bump JobAgent::subscribe (notify-next +
  describe-accepted), the describe publish, report_progress publish,
  and publish_and_wait's update/accepted+rejected subscriptions.
* commands/stream.rs: bump CommandAgent::subscribe (executions/+/request)
  and report_in_progress publish.
* defender_metrics/mod.rs: bump publish_and_subscribe's accepted+rejected
  subs and the metric publish.

Transfer's data_interface stays at QoS 0 — those are high-volume OTA
data blocks with their own retry semantics, same as before #122.

Discovered during factbird-edge factory_reset end-to-end testing:
job-manager's report_progress publish ~5–10s after a deployment-driven
cleanup returned `JobError::Mqtt` because the broker had briefly
dropped the connection during the cleanup, and QoS 0 publishes were
silently lost.

* feat: introduce MaxJsonSize trait for sizing JSON encode buffers

`Update::max_size()` (jobs and commands) and the provisioning register
buffer were hardcoded values that overflow on non-trivial payloads —
`serde_json_core::to_slice` returns `PayloadError::BufferSize` and the
publish surfaces as `JobError::Mqtt` / `Error::BufferSize`, so what's
really a buffer-size bug looks like an MQTT/broker problem.

Observed on factbird-edge factory_reset: the post-cleanup `IN_PROGRESS`
update carries a 3-participant report (~700 bytes JSON) and overflows
the hardcoded 512-byte buffer in `jobs::Update::max_size()` every time.

Bumping the constants high penalises resource-constrained mqttrust
users who pay for an embedded buffer they don't need. Make the size
caller-known via the type system instead:

* New `MaxJsonSize: Serialize` super-trait in `mqtt/mod.rs`. Implementer
  declares `const MAX_JSON_SIZE: usize` — the worst-case JSON-encoded
  size of `Self`. `Serialize` is a super-trait because a max-size
  hint only makes sense for a JSON-encodable type; this gives use sites
  a single tighter bound (`S: MaxJsonSize`) instead of the compound
  `S: Serialize + MaxJsonSize`.

* `jobs::Update<'a, S>` bound changes `S: Serialize` → `S: MaxJsonSize`.
  `ToPayload::max_size` returns `S::MAX_JSON_SIZE + framing`. Same for
  `report_progress`, `succeed_job`, `fail_job`, `publish_and_wait`.

* `commands::Update<'a, R>` bound changes `R: Serialize` →
  `R: MaxJsonSize`. Same for `succeed`, `publish_and_wait`. Replaces
  the hardcoded 2048 with `R::MAX_JSON_SIZE + framing`.

* `provisioning::FleetProvisioner::provision*` parameters bound changes
  `impl Serialize` → `impl MaxJsonSize`. The DeferredPayload buffer is
  sized from `P::MAX_JSON_SIZE + framing` instead of a hardcoded 1024.

* `transfer::status_details::StatusDetailsExt` gains
  `const MAX_EXTRA_JSON_SIZE: usize`. `StatusDetails` and
  `CombinedStatusDetails<E>` impl `MaxJsonSize` — the latter delegates
  to `E::MAX_EXTRA_JSON_SIZE` so users only declare the size of their
  own contributed fields.

* Built-in impls: `() => 4` (serializes to "null"). Test types and
  `RejectDetails` get explicit impls.

Breaking change for callers who pass a custom `status_details` /
command `result` / provisioning `parameters` type — they need to add
`impl MaxJsonSize for MyType { const MAX_JSON_SIZE: usize = N; }`.

* Formatting

* fix(tests): add MaxJsonSize impls for integration test types

The bound bump on `Update<S>` / `provision*` ripples through the
integration tests, which use their own custom Serialize types.

* `tests/common/file_handler.rs`: TestStatusDetails gains
  MAX_EXTRA_JSON_SIZE (firmware_version is short).
* `tests/provisioning.rs`: Parameters<'a> gets MaxJsonSize, and the
  FleetProvisioner::provision[_cbor] turbofish gains a third inferred
  generic for the new P type parameter.
* commands::ResultMap moves its MaxJsonSize impl from a private
  test-only block to data_types.rs so integration tests can use it.

* fix(tests): bump commands test TX buffer for ResultMap max_size

`Update<'_, ResultMap>::max_size()` is now `R::MAX_JSON_SIZE +
UPDATE_FRAMING_OVERHEAD` = 4096 + 1280 = 5376, which exceeded the test
client's 4096-byte TX ring buffer. mqttrust's `grant_async` future stays
pending forever when the requested size exceeds buffer capacity, so the
publish would silently retry 3× (5+10+15s) before returning
`Error::Timeout` — surfaced to the test as `succeed: Mqtt`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants