Skip to content

[improve][broker] PIP-473: MetadataPendingAckStore for segment subscriptions#25772

Open
merlimat wants to merge 1 commit into
apache:masterfrom
merlimat:mmerli/pip-473-pending-ack
Open

[improve][broker] PIP-473: MetadataPendingAckStore for segment subscriptions#25772
merlimat wants to merge 1 commit into
apache:masterfrom
merlimat:mmerli/pip-473-pending-ack

Conversation

@merlimat
Copy link
Copy Markdown
Contributor

Summary

P4 of the PIP-473 metadata-driven transactions stack. Adds the ack-side complement to #25768 — a PendingAckStore implementation for subscriptions on segment:// topics that reads truth from the metadata-store layout.

  • MetadataPendingAckStore:
    • appendIndividualAck / appendCumulativeAck write TxnOp(ACK, segment, sub, ledgerId, entryId, cumulative) records via the TxnMetadataStore façade.
    • appendCommitMark / appendAbortMark are no-ops — in v5 the TC owns the lifecycle, the store consumes its events.
    • Subscription-event listener (/txn-subscription-events/<seg>:<sub>-*) uses the same header-truthing reconcile pattern as the P3 TB: re-read headers for every currently-open txn this subscription is involved in, then for terminals call PendingAckHandleImpl.commitTxn / abortTxn (no SPI change) and delete the corresponding /txn-op ack records.
    • Recovery via scan (Option C): subscribe → scan listAcksBySegmentSubscription → group by txnId → fetch headers → seed open-txn set → terminal txns discovered mid-scan are processed inline → drain post-recovery reconcile.
  • MetadataPendingAckStoreProvider + DispatchingTransactionPendingAckStoreProvider (routes by TopicName.isSegment(), falls through to legacy MLPendingAckStore otherwise).

Supporting changes

  • TxnOp gains a nullable Boolean cumulative field (only set on cumulative acks).
  • TxnMetadataStore.deleteAckOpsForSegmentSubscriptionAndTxn mirrors the P3 write-op cleanup helper; the two flavors share a private scanAndDeleteOpsForTxn.

Not in this PR (deferred to P5.4)

  • No default-config flip. transactionPendingAckStoreProviderClassName stays MLPendingAckStoreProvider. The new store is opt-in until the v5 TC is wired up.
  • MLPendingAckStore segment-workaround stays in place. Removing it now would break segment subscriptions while the legacy store is still the default. P5.4 will remove it as part of the same PR that flips all three providers.

Test plan

  • pulsar-broker:test --tests MetadataPendingAckStoreTest — 6 cases on the metadata store (Memory backend, mocked PersistentSubscription + PendingAckHandleImpl): individual ack append, cumulative ack append, no-op marks, commit event drives handle.commitTxn + ack-op cleanup, abort event drives handle.abortTxn + cleanup, recovery rebuilds state + processes terminal txns discovered mid-scan.
  • pulsar-broker:test --tests DispatchingTransactionPendingAckStoreProviderTest — 2 routing tests.
  • P2 + P3 unit tests still green after the TxnOp.cumulative addition.
  • Checkstyle clean on pulsar-broker.

…iptions

P4 of the metadata-driven transactions stack. Adds a PendingAckStore
implementation for subscriptions on segment:// topics that reads truth from
the metadata-store layout shipped in P2 and used by P3.

- MetadataPendingAckStore: appendIndividualAck / appendCumulativeAck write
  TxnOp(ACK, segment, sub, ledgerId, entryId, cumulative) records via the
  TxnMetadataStore facade. appendCommitMark / appendAbortMark are no-ops —
  in v5 the TC owns the lifecycle, the store consumes its events.
- Subscription-event listener applies the same header-truthing reconcile
  pattern as P3: on each notification, re-read headers for every currently-
  open txn this subscription is involved in; for those that went terminal,
  call PendingAckHandleImpl.commitTxn / abortTxn (no SPI-shape change for
  the handle) and then delete the corresponding /txn-op ack records.
- Recovery via scan (Option C): subscribe → scan listAcksBySegmentSubscription
  → group by txnId → fetch headers → seed open-txn set → terminal txns
  discovered mid-scan are processed during recovery → drain post-recovery
  reconcile to catch any events that fired in the window.
- MetadataPendingAckStoreProvider builds the store from the local
  MetadataStore. DispatchingTransactionPendingAckStoreProvider routes by
  TopicName.isSegment() and falls through to the legacy MLPendingAckStore
  for persistent:// subscriptions.
- Default config (transactionPendingAckStoreProviderClassName) is unchanged
  — stays legacy until P5.4 flips everything together (TC v5 + dispatching
  TB provider + dispatching PendingAckStore provider).

Supporting changes:
- TxnOp gains a nullable Boolean cumulative field, only set on cumulative
  acks. WRITE records (and individual acks) leave it null.
- TxnMetadataStore.deleteAckOpsForSegmentSubscriptionAndTxn mirrors the
  P3 write-op cleanup helper. The two flavors share a private
  scanAndDeleteOpsForTxn.

Tests: 6 MetadataPendingAckStoreTest scenarios (individual ack append,
cumulative ack append, no-op marks, commit/abort events drive handle
callbacks + cleanup, recovery rebuilds open-txn set and applies terminal
txns mid-scan) + 2 routing tests for the dispatcher.

The segment-aware pending-ack workaround in MLPendingAckStore stays in
place — segment topics still use the legacy store by default until P5.4
flips and that PR removes the workaround.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant