New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5958: Global stores access state restore listener #3973
KAFKA-5958: Global stores access state restore listener #3973
Conversation
ping @mjsax @dguy @guozhangwang |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bbejeck
@@ -294,7 +298,7 @@ public void run() { | |||
|
|||
private StateConsumer initialize() { | |||
try { | |||
final GlobalStateManager stateMgr = new GlobalStateManagerImpl(topology, consumer, stateDirectory); | |||
final GlobalStateManager stateMgr = new GlobalStateManagerImpl(topology, consumer, stateDirectory, stateRestoreListener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should probably put the params on separate lines as it has exceeded the 120 char limit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
@@ -179,6 +179,7 @@ public void testStateGlobalThreadClose() throws Exception { | |||
builder.globalTable("anyTopic"); | |||
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads); | |||
final KafkaStreams streams = new KafkaStreams(builder.build(), props); | |||
streams.setGlobalStateRestoreListener(new MockStateRestoreListener()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this? The test doesn't actually use or test it so it seems irrelevant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, good catch
@@ -452,7 +475,7 @@ public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir( | |||
public boolean lockGlobalState(final int retry) throws IOException { | |||
throw new IOException("KABOOM!"); | |||
} | |||
}); | |||
}, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be null? It seems that GlobalStateMangerImpl
never expects it to be null.
@@ -96,7 +98,7 @@ public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() { | |||
new StateDirectory("appId", TestUtils.tempDirectory().getPath(), time), | |||
new Metrics(), | |||
new MockTime(), | |||
"clientId"); | |||
"clientId", null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new line for param and I think it should probably not be null
@@ -220,7 +221,7 @@ public ProcessorTopologyTestDriver(final StreamsConfig config, | |||
globalPartitionsByTopic.put(topicName, partition); | |||
offsetsByTopicPartition.put(partition, new AtomicLong()); | |||
} | |||
final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory); | |||
final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory, stateRestoreListener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: params on sep line as char limit exceeded?
@dguy thanks for review, updated for comments |
@@ -179,6 +179,7 @@ public void testStateGlobalThreadClose() throws Exception { | |||
builder.globalTable("anyTopic"); | |||
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads); | |||
final KafkaStreams streams = new KafkaStreams(builder.build(), props); | |||
streams.setGlobalStateRestoreListener(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just not set it all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
updated this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bbejeck, LGTM
merged to trunk |
No description provided.