-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5660: Don't throw TopologyBuilderException during runtime #4605
Conversation
\cc @bbejeck |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Couple of comments.
@guozhangwang Do we need a KIP for this?
*/ | ||
KStreamJoinWindow(String windowName, long windowSizeMs, long retentionPeriodMs) { | ||
this.windowName = windowName; | ||
|
||
if (windowSizeMs > retentionPeriodMs) | ||
throw new TopologyException("The retention period of the join window " | ||
throw new StreamsException("The retention period of the join window " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, that a TopologyException
is correct here. KStreamJoinWindows
is called before KafkaStreams#start()
. Or do I miss anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -60,7 +60,7 @@ public RecordCollector recordCollector() { | |||
@Override | |||
public StateStore getStateStore(final String name) { | |||
if (currentNode() == null) { | |||
throw new org.apache.kafka.streams.errors.TopologyBuilderException("Accessing from an unknown node"); | |||
throw new org.apache.kafka.streams.errors.StreamsException("Accessing from an unknown node"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can import StreamsException
an get rid of the package prefix.
(We just did not import TopologyBuilderException
because it's a deprecated class and for imports we cannot suppress warnings about usage of deprecated code.
Btw: you can also remove @SuppressWarning
from this method.
@@ -69,7 +69,7 @@ public StateStore getStateStore(final String name) { | |||
} | |||
|
|||
if (!currentNode().stateStores.contains(name)) { | |||
throw new org.apache.kafka.streams.errors.TopologyBuilderException("Processor " + currentNode().name() + " has no access to StateStore " + name); | |||
throw new org.apache.kafka.streams.errors.StreamsException("Processor " + currentNode().name() + " has no access to StateStore " + name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above.
@@ -644,7 +644,7 @@ private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitio | |||
continue; | |||
} | |||
if (numPartitions < 0) { | |||
throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name())); | |||
throw new org.apache.kafka.streams.errors.StreamsException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above.
@@ -157,7 +156,7 @@ public void testNamedTopicMatchesAlreadyProvidedPattern() { | |||
} | |||
|
|||
@Test | |||
public void shouldNotAllowToAddProcessorWithSameName() { | |||
public void shoudNotAllowToAddProcessorWithSameName() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a typo that can be reverted.
throw new RuntimeException("Did expect different exception. Did catch:", e); | ||
} | ||
} | ||
new ProcessorTopologyTestDriver(streamsConfig, topology.internalTopologyBuilder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should keep try-catch and check the error message. Also add a fail
if not exception is thrown.
try {
new ProcessorTopologyTestDriver(streamsConfig, topology.internalTopologyBuilder);
fail("Should have thrown StreamsException");
} catch (final StreamException e) {
...
@@ -590,8 +590,7 @@ public void shouldAddInternalTopicConfigForRepartitionTopics() { | |||
assertEquals("appId-foo", topicConfig.name()); | |||
} | |||
|
|||
|
|||
@Test(expected = TopologyBuilderException.class) | |||
@Test(expected = StreamsException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above.
@@ -600,31 +599,24 @@ public void shouldThroughOnUnassignedStateStoreAccess() throws Exception { | |||
final Properties config = new Properties(); | |||
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); | |||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); | |||
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); | |||
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory() | |||
.getAbsolutePath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why this?
final TopologyBuilder builder = new TopologyBuilder(); | ||
builder.addSource(sourceNodeName, "topic") | ||
.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), | ||
sourceNodeName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why this line break?
@@ -560,7 +559,7 @@ public void shouldAddInternalTopicConfigForRepartitionTopics() throws Exception | |||
} | |||
|
|||
@SuppressWarnings("deprecation") | |||
@Test | |||
@Test(expected = StreamsException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above.
assuming @mjsax comments addressed, LGTM. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we would need a KIP: this is to me more like a bug fix than a public interface change.
*/ | ||
KStreamJoinWindow(String windowName, long windowSizeMs, long retentionPeriodMs) { | ||
this.windowName = windowName; | ||
|
||
if (windowSizeMs > retentionPeriodMs) | ||
throw new TopologyException("The retention period of the join window " | ||
throw new StreamsException("The retention period of the join window " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -157,7 +156,7 @@ public void testNamedTopicMatchesAlreadyProvidedPattern() { | |||
} | |||
|
|||
@Test | |||
public void shouldNotAllowToAddProcessorWithSameName() { | |||
public void shoudNotAllowToAddProcessorWithSameName() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a typo that can be reverted.
Thanks for the comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR.
Couple more comments.
@@ -54,13 +55,12 @@ public RecordCollector recordCollector() { | |||
} | |||
|
|||
/** | |||
* @throws org.apache.kafka.streams.errors.TopologyBuilderException if an attempt is made to access this state store from an unknown node | |||
* @throws org.apache.kafka.streams.errors.StreamsException if an attempt is made to access this state store from an unknown node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: package prefix not needed
} else { | ||
throw new RuntimeException("Did expect different exception. Did catch:", e); | ||
} | ||
final String error = e.getCause().toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to call getCause()
? Does another StreamsException
wrap the actual expected StreamsException
? If yes, this should not happen IMHO.
final String expectedMessage = "Processor " + badNodeName + " has no access to StateStore " + | ||
LocalMockProcessorSupplier.STORE_NAME; | ||
|
||
assertTrue(error.contains(expectedMessage)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not assertEquals
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its possible. I'll have to add the fully qualified reference to StreamsException
because that's part of the error message.
|
||
try { | ||
final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should improve this test by moving this line outside/before try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving outside the try
results in the test failing because this line is throwing the exception
org.apache.kafka.streams.errors.StreamsException: failed to initialize processor badGuy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Than please remove the next line driver.process
and simplify to
new ProcessorTopology(...)
(we don't need variable driver
for this case)
|
||
try { | ||
final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder); | ||
driver.process("topic", null, null); | ||
} catch (final StreamsException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to insert a fail
here to fix this test
} else { | ||
throw new RuntimeException("Did expect different exception. Did catch:", e); | ||
} | ||
final String error = e.getCause().toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above.
final String error = e.getCause().toString(); | ||
final String expectedMessage = "Processor " + badNodeName + " has no access to StateStore "; | ||
|
||
assert error.contains(expectedMessage); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should not use Java keyword assert
but Junit assertXXX
methods (assertEquals
for this test)
} | ||
final String expectedMessage = "Processor " + badNodeName + " has no access to StateStore "; | ||
|
||
assertTrue(cause.getMessage().contains(expectedMessage)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
I attempted a merge to my fork to try and resolve the conflict.
and got
|
Can you try to rebase your PR instead of merging? |
|
Not sure... If you can't clean it up, maybe try to squash all your commits, create a new branch from |
TopologyBuilderException is a pre-runtime exception that should only be thrown before KafkaStreams#start() is called.