KAFKA-20413: Add missing headers-aware ListValueStore for streams-streams join#22156
Open
aliehsaeedii wants to merge 5 commits intoapache:trunkfrom
Open
KAFKA-20413: Add missing headers-aware ListValueStore for streams-streams join#22156aliehsaeedii wants to merge 5 commits intoapache:trunkfrom
aliehsaeedii wants to merge 5 commits intoapache:trunkfrom
Conversation
aliehsaeedii
commented
Apr 27, 2026
| // read it from the key (TimestampedKeyAndJoinSide#timestamp). Callers that need the | ||
| // headers gate on isHeadersStore() — the lifted headers slot here is unused on the plain | ||
| // path. | ||
| return new LiftingIterator<>(plainStore.all()); |
Contributor
Author
There was a problem hiding this comment.
adds up cost to default store users (but only in memory)!
| protected abstract class KStreamKStreamJoinProcessor extends ContextualProcessor<K, VThis, K, VOut> { | ||
| private TimestampedWindowStoreWithHeaders<K, VOther> otherWindowStore; | ||
| private Sensor droppedRecordsSensor; | ||
| private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>>> outerJoinStore = Optional.empty(); |
Contributor
There was a problem hiding this comment.
IMO: Optional makes sense here.
Optional<OuterJoinStoreWrapper<K, VLeft, VRight>> WDYT?
Comment on lines
199
to
205
Contributor
There was a problem hiding this comment.
ditto
With previous version we was forced to provide store, so it was hard to end up with NPE
Now we implies that developer needs to check that store is non-null before calling this method
| // values in the list. | ||
| // we do not use delete() calls since it would incur extra get() | ||
| store.put(prevKey, null); | ||
| outerJoinStoreWrapper.put(prevKey, null, null, 0L); |
Contributor
There was a problem hiding this comment.
Shouldn't we propagate headers here? I case of plain store they will be ignored
ditto in other places
| */ | ||
| @ParameterizedTest | ||
| @ValueSource(booleans = {false, true}) | ||
| public void testShouldForwardPerRecordHeadersForMultipleBufferedOuterEmits(final boolean withHeaders) { |
Contributor
There was a problem hiding this comment.
+1 for test. Like it a lot
| final InternalMockProcessorContext<Integer, ValueTimestampHeaders<String>> context = new InternalMockProcessorContext<>( | ||
| baseDir, | ||
| Serdes.Integer(), | ||
| null, |
Contributor
There was a problem hiding this comment.
Why null, not Integer?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Stream-stream outer joins lost per-record headers because
ListValueStorehad no headers-aware variant (second part ofKAFKA-20413). This PR adds optional per-record headers preservation to
the stream-stream outer-join store, gated by dsl.store.format:
PLAIN (default): no on-disk change, no changelog change, no behavior
change for existing apps.
HEADERS: each list element is serialized as a
V
alueTimestampHeaders<LeftOrRightValue<…>>blob viaValueTimestampHeadersSerde, so per-entry headers survive round-tripthrough the store, the changelog, and back into the join's null-side
emit.
Testing
Parametrized the existing ListValueStoreTest
Updated KStreamKStreamOuterJoinTest.testShouldForwardCurrentHeaders