Skip to content

KAFKA-20464: Fix topology corruption when ConnectedStoreProviders share a StoreBuilder#22102

Open
ChoMinGi wants to merge 1 commit intoapache:trunkfrom
ChoMinGi:kafka-20464-connected-store-provider-subtopology
Open

KAFKA-20464: Fix topology corruption when ConnectedStoreProviders share a StoreBuilder#22102
ChoMinGi wants to merge 1 commit intoapache:trunkfrom
ChoMinGi:kafka-20464-connected-store-provider-subtopology

Conversation

@ChoMinGi
Copy link
Copy Markdown
Contributor

@ChoMinGi ChoMinGi commented Apr 20, 2026

KAFKA-20464

ConnectedStoreProvider Javadoc documents that multiple providers may return the same StoreBuilder instance to share a store. Currently this places the processors in separate subtopologies because addStateStore() overwrites the stateFactories entry, losing the previous connectedProcessorNames.

The fix is a one-line conditional: skip the put when a compatible factory already exists. Tests added in StreamsBuilderTest and InternalTopologyBuilderTest.

On upgrade, affected topologies will see subtopology regrouping — task IDs from the previously separate subtopology will no longer exist, and their state will be restored from changelog topics on first restart.

@github-actions github-actions Bot added streams triage PRs from the community small Small PRs labels Apr 20, 2026
Copy link
Copy Markdown
Contributor

@UladzislauBlok UladzislauBlok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +714 to +727
@Test
public void shouldUniteProcessorsWhenAddStateStoreCalledMultipleTimesWithSameBuilder() {
final StoreBuilder<?> sharedStore = new MockKeyValueStoreBuilder("shared-store", false);

builder.addSource(null, "source-1", null, null, null, "topic-1");
builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1");
builder.addStateStore(sharedStore, "processor-1");

builder.addSource(null, "source-2", null, null, null, "topic-2");
builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-2");
builder.addStateStore(sharedStore, "processor-2");

assertEquals(1, builder.describe().subtopologies().size());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have very similar test: shouldAllowAddingSameStoreBuilderMultipleTimes
This one imo is better because it uses addStateStore api
I think we can remove shouldAllowAddingSameStoreBuilderMultipleTimes in favor of this one

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@UladzislauBlok
Removed, thanks for catching that :)

@ChoMinGi ChoMinGi force-pushed the kafka-20464-connected-store-provider-subtopology branch from 70d1794 to ae85398 Compare April 21, 2026 01:46
@github-actions github-actions Bot removed the triage PRs from the community label Apr 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants