Skip to content

[improve] Add Option.SequenceKeysDeltas + subscribeSequence#25724

Merged
merlimat merged 1 commit into
apache:masterfrom
merlimat:mmerli/metadata-sequence-keys
May 8, 2026
Merged

[improve] Add Option.SequenceKeysDeltas + subscribeSequence#25724
merlimat merged 1 commit into
apache:masterfrom
merlimat:mmerli/metadata-sequence-keys

Conversation

@merlimat
Copy link
Copy Markdown
Contributor

@merlimat merlimat commented May 8, 2026

Motivation

Follow-up to #25723. Add atomic, multi-dimensional sequence keys to the MetadataStore API — same model as Oxia's native sequence-keys — and a companion subscription channel for change notifications. PIP-471 (transaction state via metadata) needs both: per-segment append-only logs (/txn-op/<txnId>-<seq>) where the server picks the suffix atomically, and a notification stream so each TB can apply only its own segment's events.

The existing Option.Sequential (single-dim, +1 only, two-write fallback on Oxia) is preserved; this PR adds a richer primitive alongside it.

Modifications

API:

  • New Option.SequenceKeysDeltas(List<Long> deltas) — when present on put(prefix, ...), the actual stored key becomes <prefix>-<seq0:%020d>-<seq1:%020d>-.... Each dimension increments atomically by its delta (first must be > 0, rest >= 0). The returned Stat carries the actual generated path.
  • New MetadataStore.subscribeSequence(prefix, listener, opts) → AutoCloseable — listener receives the latest assigned sequence key under prefix as new records appear. Multiple updates may collapse into a single emission with the highest sequence. Closing the handle unsubscribes.
  • OptionsHelper.sequenceKeysDeltas(Set<Option>) accessor.

Oxia (native): OxiaMetadataStore translates Option.SequenceKeysDeltas to PutOption.SequenceKeysDeltas; subscribeSequence delegates to client.getSequenceUpdates. Marks supportsNativeSequenceKeys() == true so the compat layer is bypassed.

Compatibility layer in AbstractMetadataStore (used by LocalMemory, RocksDB, ZooKeeper, MockZooKeeper):

  • put intercepts Option.SequenceKeysDeltas, runs a CAS-retry loop on a sidecar counter document at <prefix>__seq_counter__ (a fixed-width binary blob of longs), then recurses into put with the synthesized key. Synthesized keys match Oxia's byte format exactly.
  • subscribeSequence registers an internal Notification listener that filters Created events whose paths start with <prefix>-, tracks the highest path seen via CAS, and delivers monotonically. Closing the handle removes the listener.

Wrappers: DualMetadataStore and FaultInjectionMetadataStore forward subscribeSequence to the wrapped store.

Verifying this change

Two new test classes:

  • OxiaSequenceKeysTest — 3-shard Oxia container, exercises the native path: single/multi-dim sequence puts, subscribe collapses to highest emission.
  • SequenceKeysTest — runs on every BaseMetadataStoreTest backend (ZK, Memory, RocksDB, Oxia, MockZooKeeper). Both compat and native paths must produce monotonically increasing keys with matching byte-format and deliver subscription updates.

Local results: 15/15 cross-backend tests pass; full metadata test suite (./gradlew :pulsar-metadata:test) green.

Does this pull request potentially affect one of the following parts:

  • The public MetadataStore API — strictly additive: a new Option subtype and a new default method on MetadataStore. Existing callers and external implementations are unaffected.

Matching PR in forked repository

PR in forked repository: https://github.com/merlimat/pulsar/pull/new/mmerli/metadata-sequence-keys

… with Oxia native + compat layer

Add a new Option subtype, Option.SequenceKeysDeltas(List<Long> deltas), that
when present on put(...) requests server-assigned multi-dimensional atomic
sequence-key suffixes — same semantics and key format as Oxia's native
sequence-keys. Add a companion subscription method, MetadataStore.subscribeSequence,
that delivers the latest assigned sequence key to a listener as new sequence
records appear under the prefix.

Oxia: native passthrough. doStorePut translates SequenceKeysDeltas to
PutOption.SequenceKeysDeltas; subscribeSequence delegates to
client.getSequenceUpdates. supportsNativeSequenceKeys() returns true.

Compatibility layer in AbstractMetadataStore: when the backend has no native
sequence-keys, intercept put(prefix, ..., {SequenceKeysDeltas}) at the top of
put(), CAS-increment a sidecar counter document at <prefix>__seq_counter__,
synthesize the actual key as <prefix>-<seq:%020d>-... matching Oxia byte-for-byte,
then recurse into the regular put path with the synthesized key. subscribeSequence
filters Created notifications by prefix and emits monotonically.

Wrappers: DualMetadataStore + FaultInjectionMetadataStore forward subscribeSequence
to the wrapped store.

Tests:
- OxiaSequenceKeysTest: 3-shard Oxia cluster — native single/multi-dim + subscribe
- SequenceKeysTest: cross-backend (ZK/Memory/RocksDB/Oxia/MockZooKeeper) — same
  three scenarios. Both layers must produce monotonically increasing keys with
  matching byte-format and deliver subscription updates.
@merlimat merlimat changed the title [improve][metadata] Add Option.SequenceKeysDeltas + subscribeSequence [improve] Add Option.SequenceKeysDeltas + subscribeSequence May 8, 2026
Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@merlimat merlimat merged commit d45e405 into apache:master May 8, 2026
81 of 84 checks passed
@merlimat merlimat deleted the mmerli/metadata-sequence-keys branch May 8, 2026 23:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants