New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4532: StateStores can be connected to the wrong source topic resulting in incorrect metadata returned from Interactive Queries #2250
Conversation
By |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor comments, otherwise LGTM.
@@ -692,6 +692,13 @@ private void connectProcessorAndStateStore(String processorName, String stateSto | |||
private void connectStateStoreNameToSourceTopics(final String stateStoreName, | |||
final ProcessorNodeFactory processorNodeFactory) { | |||
|
|||
// we should never update the mapping for a stateStoreName, but this is called |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment here is a bit confusing, could we say
we should never update the mapping from state store names to source topics if the store name already exists in the map; this scenario is possible, for example, that a state store underlying a source KTable is connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic.
@@ -150,4 +153,34 @@ public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception { | |||
new KStreamBuilder().stream(Serdes.String(), Serdes.String(), null, null); | |||
} | |||
|
|||
@Test | |||
public void shouldNotModifyStateStoreSourceIfItExistsAndAnotherProcessorConnectingToIt() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we just merge these two test into one? I feel the second is almost a super set coverage of the first.
LGTM |
Refer to this link for build results (access rights to CI server needed): |
@guozhangwang addressed the comments and updated the description |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
…sulting in incorrect metadata returned from Interactive Queries When building a topology with tables and StateStores, the StateStores are mapped to the source topic names. This map is retrieved via TopologyBuilder.stateStoreNameToSourceTopics() and is used in Interactive Queries to find the source topics and partitions when resolving the partitions that particular keys will be in. There is an issue where by this mapping for a table that is originally created with builder.table("topic", "table");, and then is subsequently used in a join, is changed to the internal repartition topic. This is because the mapping is updated during the call to topology.connectProcessorAndStateStores(..). In the case that the stateStoreNameToSourceTopics Map already has a value for the state store name it should not update the Map. Author: Damian Guy <damian.guy@gmail.com> Reviewers: Matthias J. Sax, Guozhang Wang Closes apache#2250 from dguy/kafka-4532
When building a topology with tables and StateStores, the StateStores are mapped to the source topic names. This map is retrieved via TopologyBuilder.stateStoreNameToSourceTopics() and is used in Interactive Queries to find the source topics and partitions when resolving the partitions that particular keys will be in.
There is an issue where by this mapping for a table that is originally created with builder.table("topic", "table");, and then is subsequently used in a join, is changed to the internal repartition topic. This is because the mapping is updated during the call to topology.connectProcessorAndStateStores(..).
In the case that the stateStoreNameToSourceTopics Map already has a value for the state store name it should not update the Map.