Skip to content

KAFKA-20318: Add time ordered session store support to the DSL#21794

Merged
mjsax merged 4 commits intoapache:trunkfrom
bbejeck:KAFKA-20318_add_time_ordered_session_store_dsl
Mar 19, 2026
Merged

KAFKA-20318: Add time ordered session store support to the DSL#21794
mjsax merged 4 commits intoapache:trunkfrom
bbejeck:KAFKA-20318_add_time_ordered_session_store_dsl

Conversation

@bbejeck
Copy link
Member

@bbejeck bbejeck commented Mar 17, 2026

This PR fixes a bug where
RocksDbTimeOrderedSessionBytesStoreSupplier.get with
withHeaders=true created segments using KeyValueSegment (default-CF
only) instead of SessionSegmentWithHeaders (dual-CF with lazy
migration), which would cause the upgrade path from non-headers to
headers format to fail. The fix introduces
RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders, widens
RocksDBTimeOrderedSessionStore to accept either segment type, and
updates the supplier to create the correct bytes store. Upgrade tests
validate lazy migration at both the store layer and DSL supplier path,
plus a TopologyTestDriver end-to-end test exercises ON_WINDOW_CLOSE
with both PLAIN and HEADERS formats.

Reviewers: Matthias J. Sax matthias@confluent.io, TengYao Chi
frankvicky@apache.org

throw new UnsupportedOperationException("This store does not support put with timestamp and seqnum");
}

public byte[] fetch(final Bytes key, final long timestamp, final int seqnum) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added fetchSession, fetchSessions, remove(Windowed), and put(Windowed) as UnsupportedOperationException defaults to match the existing pattern for put(Bytes,long,int,byte[]) and fetch(Bytes,long,int). This allows
RocksDBTimeOrderedSessionStore to widen its wrapped type to the abstract class while still calling session-specific methods. Both RocksDBTimeOrderedSessionSegmentedBytesStore and the new WithHeaders variant override these.


public class RocksDBTimeOrderedSessionStore
extends WrappedStateStore<RocksDBTimeOrderedSessionSegmentedBytesStore, Object, Object>
extends WrappedStateStore<AbstractRocksDBTimeOrderedSegmentedBytesStore<? extends Segment>, Object, Object>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Widened from WrappedStateStore<RocksDBTimeOrderedSessionSegmentedBytesStore, ...> to WrappedStateStore<AbstractRocksDBTimeOrderedSegmentedBytesStore<? extends Segment>, ...> so that both the plain and WithHeaders segmented
bytes stores can be wrapped. All wrapped() calls resolve against the abstract type which now has the session-specific methods from the previous change.

* @see WindowSegmentsWithHeaders
* @see WindowSegmentWithHeaders
*/
class RocksDBTimeOrderedWindowSegmentedBytesStoreWithHeaders
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mirrors RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders after RocksDBTimeOrderedWindowSegmentedBytesStoreWithHeaders (PR #21780) but for session key schemas. Uses SessionSegmentsWithHeaders which creates
SessionSegmentWithHeaders segments — these extend RocksDBMigratingSessionStoreWithHeaders and handle dual-CF lazy migration from DEFAULT CF (old format) to headers CF (new format) via HeadersBytesStore::convertToHeaderFormat.
No new segment classes needed — reuses existing infrastructure.


@Override
public SessionStore<Bytes, byte[]> get() {
if (withHeaders) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, get() always created a RocksDBTimeOrderedSessionSegmentedBytesStore (which uses KeyValueSegment / DEFAULT CF only), even when withHeaders=true. On upgrade from non-headers to headers, old
data in DEFAULT CF would not be readable because KeyValueSegment has no migration logic. Now the withHeaders path creates RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders which uses SessionSegmentWithHeaders segments
with dual-CF migration support.

name(), metricsScope(), retentionPeriod(), segmentIntervalMs(), true));
}
},
new RocksDbTimeOrderedSessionBytesStoreSupplier(ROCK_DB_STORE_NAME, retentionPeriod, true, true),
Copy link
Member Author

@bbejeck bbejeck Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the anonymous RocksDbTimeOrderedSessionBytesStoreSupplier override hack that was manually constructing a plain RocksDBTimeOrderedSessionSegmentedBytesStore for the WithHeaders test cases. Now uses the supplier's real
withHeaders=true code path directly.

Same for below

* This test validates lazy migration from DEFAULT CF to headers CF and ensures
* the store can read old data after upgrade.
*/
public class TimeOrderedSessionStoreUpgradeTest {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three layers of upgrade testing: (1) Store-level tests validate lazy migration directly using RocksDbTimeOrderedSessionBytesStoreSupplier — write with withHeaders=false, close, reopen with withHeaders=true, verify old data
readable with empty headers. (2) DSL supplier path test uses BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.sessionStore() — the same code path the DSL materializer calls with ON_WINDOW_CLOSE. (3) DSL end-to-end test builds
a real session aggregation topology via TopologyTestDriver with ON_WINDOW_CLOSE + DSL_STORE_FORMAT_HEADERS, verifying the full chain works.

@mjsax mjsax added the kip Requires or implements a KIP label Mar 19, 2026
@mjsax mjsax force-pushed the KAFKA-20318_add_time_ordered_session_store_dsl branch from 66a93c8 to 859ccdf Compare March 19, 2026 00:58
*/
public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends Segment> extends AbstractDualSchemaRocksDBSegmentedBytesStore<S> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBTimeOrderedSegmentedBytesStore.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side fix

private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBTimeOrderedSegmentedBytesStore.class);

abstract class IndexToBaseStoreIterator implements KeyValueIterator<Bytes, byte[]> {
private long minTimestamp;
Copy link
Member

@mjsax mjsax Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I unified more code (cf other comments) -- this is moved from RocksDBTimeOrderedKeyValueBytesStore to here

abstract class IndexToBaseStoreIterator implements KeyValueIterator<Bytes, byte[]> {
private long minTimestamp;

public abstract class IndexToBaseStoreIterator implements KeyValueIterator<Bytes, byte[]> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another side cleanup -- we use many of these classes outside of the internals package, so we should just declare them public.

There is more changes like this on this PR -- if we think we don't want this change, we can also revert it.

* <p>
* This can be reused by both window store implementations (with and without headers).
*/
class WindowKeySchemaIndexToBaseStoreIterator extends IndexToBaseStoreIterator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class belongs to RocksDBTimeOrderedWindowSegmentedBytesStore -- moved it there

minTimestamp = Long.MAX_VALUE;
}

Map<S, WriteBatch> getWriteBatches(
Copy link
Member

@mjsax mjsax Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a refactoring to allow sharing more code. The different sub-classes of AbstractRocksDBTimeOrderedSegmentedBytesStore all share a 99% common impl of Map<S, WriteBatch> getWriteBatches(Collection<ConsumerRecord<byte[], byte[]>> records), so I add this shared impl which is "customized" by passing corresponding extractors.

LogicalKeyValueSegment getReservedSegment(final long segmentId) {
return reservedSegments.get(segmentId);
LogicalKeyValueSegment getReservedSegment() {
return reservedSegments.get(-1L);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup -- we always pas -1L.

* <p>
* This store extends {@link RocksDBSessionStore} and returns
* {@link QueryResult#forUnknownQueryType(Query, Object)} for all queries,
* {@link QueryResult#forUnknownQueryType(Query, StateStore)} for all queries,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side fix (also elsewhere)

* @see SessionSegmentsWithHeaders
* @see SessionSegmentWithHeaders
*/
class RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the new class we need is not trivial with the unified code on the super-classes :)


public class RocksDBTimeOrderedWindowStore
extends WrappedStateStore<AbstractRocksDBTimeOrderedSegmentedBytesStore<? extends Segment>, Object, Object>
public class RocksDBTimeOrderedWindowStore<S extends Segment>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This the the follow up from #21780

if (stateStore instanceof RocksDBTimeOrderedWindowStore) {
return true;
}
if (stateStore instanceof RocksDBTimeOrderedWindowStoreWithHeaders) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RocksDBTimeOrderedWindowStoreWithHeaders extends RocksDBTimeOrderedWindowStore so this second check is unnecessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch

RocksDBTimeOrderedSessionStore(final RocksDBTimeOrderedSessionSegmentedBytesStore store) {
RocksDBTimeOrderedSessionStore(final AbstractRocksDBTimeOrderedSegmentedBytesStore<? extends Segment> store) {
super(store);
Objects.requireNonNull(store, "store is null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a random things come to my mind:
There are are bunch of ternary operator for null check, do we consider to replace it with Objects.requireNonNullElse to align the code style?

@mjsax mjsax merged commit 1ad463d into apache:trunk Mar 19, 2026
26 checks passed
mjsax added a commit that referenced this pull request Mar 19, 2026
This PR fixes a bug where
`RocksDbTimeOrderedSessionBytesStoreSupplier.get` with
`withHeaders=true` created segments using `KeyValueSegment` (default-CF
only) instead of `SessionSegmentWithHeaders` (dual-CF with lazy
migration), which would cause the upgrade path from non-headers to
headers format to fail. The fix introduces
`RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders`, widens
`RocksDBTimeOrderedSessionStore` to accept either segment type, and
updates the supplier to create the correct bytes store. Upgrade tests
validate lazy migration at both the store layer and DSL supplier path,
plus a `TopologyTestDriver` end-to-end test exercises  ON_WINDOW_CLOSE
with both PLAIN and HEADERS formats.

Reviewers: Matthias J. Sax <matthias@confluent.io>, TengYao Chi
<frankvicky@apache.org>

Co-authored-by: Matthias J. Sax <matthias@confluent.io>
@mjsax
Copy link
Member

mjsax commented Mar 19, 2026

Merged to trunk and cherry-picked to 4.3 branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants