Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5862: Remove ZK dependency from Streams reset tool #3927

Conversation

bbejeck
Copy link
Contributor

@bbejeck bbejeck commented Sep 20, 2017

No description provided.

@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 20, 2017

ping @mjsax @dguy @guozhangwang

@tedyu
Copy link
Contributor

tedyu commented Sep 20, 2017

Please fix the following:

scala2.12/core/src/main/scala/kafka/tools/StreamsResetter.java:20:1: Disallowed import - org.apache.kafka.clients.admin.AdminClient. [ImportControl]
[ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.12/core/src/main/scala/kafka/tools/StreamsResetter.java:21:1: Disallowed import - org.apache.kafka.clients.admin.DeleteTopicsResult. [ImportControl]
[ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.12/core/src/main/scala/kafka/tools/StreamsResetter.java:22:1: Disallowed import - org.apache.kafka.clients.admin.KafkaAdminClient. [ImportControl]

try {
olderAdminClient = kafka.admin.AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption));
if (!olderAdminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) {
throw new IllegalStateException("Consumer group '" + groupId + "' is still active. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include information on the active consumer(s), if possible

try {
entry.getValue().get();
} catch (Exception e) {
System.err.println("ERROR deleting topic " + entry.getKey() + " failed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include exception message

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch. Couple of nits.

30000,
30000,
JaasUtils.isZkSecurityEnabled());
Properties adminClientProperties = new Properties();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

System.out.println("Done.");
}

private void doDelete(List<String> topicsToDelete, KafkaAdminClient adminClient) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final -- single parameter per line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

System.out.println("Done.");
}

private void doDelete(List<String> topicsToDelete, KafkaAdminClient adminClient) {
RuntimeException deleteException = null;
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: final -- also some more below

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: does deleteTopics throw any exceptions? If yes should we handle it?

Copy link
Contributor Author

@bbejeck bbejeck Sep 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang yes, but not directly.

Any exception occurring while deleting the topic is thrown with the KafkaFuture.get call for the given topic name from the 'Map<String, KafkaFuture>` map on line 345.

We check for exceptions by iterating over the map on line 347, and any exceptions are logged out, and the first exception is captured and is propagated up to the run method which handles the exception on line 118.

Same for comment below.

ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
allTopics.addAll(kafkaAdminClient.listTopics(listTopicsOptions).names().get(30000, TimeUnit.MILLISECONDS));
assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe here? The future that is returned from KafkaAdminClient ensure that the topic was delete? Ie. "delete topic" is not async anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe so. In StreamsResetter we check each returned KafkaFuture for successful completion (lines 347- 354) if it failed for whatever reason the resetter would return with an error return code and fail the test before confirming deletions. Does this answer your question?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Thanks for confirming.

I was not sure was entry.getValue().get(30, TimeUnit.SECONDS) return -- if I understand you correctly, it either returns "success" or it throws. There is no "failed" return code in the future.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a meta comment about using the new client: currently there is no java doc on the new client about what types of exceptions could possibly be thrown, while from the code it seems certain exceptions are indeed possible. As a user-facing tool we should still consider all the caller functions and see what exceptions may be throwable and whether we should handle it, otherwise it will be thrown all the way to user's face. cc @cmccabe for possible java doc improvements on admin client.

.ofType(String.class)
.defaultsTo("localhost:2181")
.describedAs("url");
zookeeperOption = optionParser.accepts("zookeeper", "Zookeeper option is deprecated");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. deprecated by bootstrap.servers, as the reset tool would no longer access Zookeeper directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " +
"Make sure to stop all running application instances before running the reset tool.");
}
allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, TimeUnit.SECONDS));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could listTopics throw some TimeoutException? Do we need to handle such cases? cc @cmccabe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get() method can, but it is handled by line 118.

if (zkUtils != null) {
zkUtils.close();
if (kafkaAdminClient != null) {
kafkaAdminClient.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this call is a blocking one, while in the old client it is not.

private void validateNoActiveConsumers(String groupId) {
kafka.admin.AdminClient olderAdminClient = null;
try {
olderAdminClient = kafka.admin.AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a TODO marker to remove this when describeGroup is added to the admin client in case we forget about it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should create a JIRA instead of adding a TODO...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.out.println("Done.");
}

private void doDelete(List<String> topicsToDelete, KafkaAdminClient adminClient) {
RuntimeException deleteException = null;
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: does deleteTopics throw any exceptions? If yes should we handle it?

@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 22, 2017

looks like I messed something up with rebase, let me re-do

EDIT: fixed rebase

@bbejeck bbejeck force-pushed the KAFKA-5862_remove_zk_dependency_from_streams_reset_tool branch from 382f1f2 to 6c3ecc5 Compare September 22, 2017 19:30
@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 22, 2017

@mjsax @guozhangwang comments addressed.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments.

}
}

return exitCode;
}

private void validateNoActiveConsumers(String groupId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

.ofType(String.class)
.defaultsTo("localhost:2181")
.describedAs("url");
zookeeperOption = optionParser.accepts("zookeeper", "Zookeeper option is deprecated by bootstrap.servers, as the reset tool will no longer access Zookeeper directly");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: . missing at the end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();

for (Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

try {
entry.getValue().get(30, TimeUnit.SECONDS);
} catch (Exception e) {
System.err.println("ERROR deleting topic " + entry.getKey() + " failed " + e.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Error[:]

As we do print all errors here, do we need to include the first detected exception when failing? This implies, that we log this exception twice. Maybe we should just print the whole Stacktrace for each exception, and go a generic error message at the end?

@@ -343,15 +356,13 @@ private void cleanGlobal(final String intermediateUserTopic) {
parameters = new String[]{
"--application-id", APP_ID + testNo,
"--bootstrap-servers", CLUSTER.bootstrapServers(),
"--zookeeper", CLUSTER.zKConnectString(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add one test, that makes sure we don't fail if --zookeeper is specified? -> doNotFailIfZookeeperIsSpecified ?

Copy link
Contributor Author

@bbejeck bbejeck Sep 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good enough to add the --zookeeper args back in? Reason being most existing users will probably keep using the old command.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should, do. Maybe only add it back for one case so we cover both. And add a comment why we do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@@ -374,25 +385,15 @@ private void assertInternalTopicsGotDeleted(final String intermediateUserTopic)
expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN);
expectedRemainingTopicsAfterCleanup.add("__consumer_offsets");

Set<String> allTopics;
ZkUtils zkUtils = null;
Set<String> allTopics = new HashSet<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add final

if (zkUtils != null) {
zkUtils.close();
}
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

listTopicsOptions.listInternal(true);
allTopics.addAll(kafkaAdminClient.listTopics(listTopicsOptions).names().get(30000, TimeUnit.MILLISECONDS));
assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup));
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Why do we need to catch an fail here? We can just let the exception bubble out to fail the test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to leave as is because IMHO the Assert.fail is more immediately clear as to why the test failed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what @dguy and @guozhangwang think. From my understanding, one should never catch an exception in a test (with the exception you want to do a negative test, ie, the exception implies that the test passes).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, I'll take it out.

@bbejeck bbejeck force-pushed the KAFKA-5862_remove_zk_dependency_from_streams_reset_tool branch from 6c3ecc5 to 12ac35c Compare September 22, 2017 21:19
@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 22, 2017

@mjsax updates for comments

@bbejeck
Copy link
Contributor Author

bbejeck commented Sep 22, 2017

@mjsax changes as suggested.

@asfgit asfgit closed this in 271f6b5 Sep 23, 2017
Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Merged to trunk.

@bbejeck bbejeck deleted the KAFKA-5862_remove_zk_dependency_from_streams_reset_tool branch July 10, 2024 12:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants