Skip to content

Conversation

@mjsax
Copy link
Member

@mjsax mjsax commented Jul 19, 2016

  • added Kafka Stream Application Reset Tool

@mjsax
Copy link
Member Author

mjsax commented Jul 19, 2016

Still working on some test to be added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

May only be called [either] before instance is started or after instance is closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps move this into a separate method (for which we could also add unit tests if needed), like isInternalStreamsTopic(String topicName)?

@dguy
Copy link
Contributor

dguy commented Jul 21, 2016

Overall LGTM - @miguno has some good suggestions for some of the Sysout messages.

@guozhangwang
Copy link
Contributor

@mjsax @miguno Some context about packaging: currently we have an AdminClient that uses ZkUtils under kafka-core mainly for topics manipulation etc, which I assume would be removed after KIP-4. According to @ijuma the new AdminClient will only depend on our protocol, so it should be in kafka-clients under org.apache.kafka.* (the actual package name is up for discussion, there are a few options).

In that case, maybe we should, in the very-near future, move this client from kafka-tools to kafka-streams as well in trunk once we remove the ZK dependency, while let it stay in 0.10.0 branch, and state in the upgrade docs that this class is moved. WDYT?

@mjsax
Copy link
Member Author

mjsax commented Jul 21, 2016

Sounds ok to me.

@ijuma
Copy link
Member

ijuma commented Jul 21, 2016

@guozhangwang, one thing to consider is whether the new AdminClient will have all the functionality you need for this tool. It won't necessarily support everything supported by the existing one. Also, the existing one will probably continue to exist (the broker still needs to make the various changes to ZK), but we may want to rename it to avoid confusion (this hasn't been discussed much yet).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to recycle the existing constants like REPARTITION_TOPIC_SUFFIX?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need a dependency from kafka-tools to kafka-streams -- I would rather not add this dependency.

}

@Test(timeout = 120000)
public void testCleanUp() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we find a more descriptive name for this test case? e.g. "cleaningUpAfterRunShouldLeadToSameResultsOnRerun" (just saying)

@miguno
Copy link
Contributor

miguno commented Jul 26, 2016

Thanks @mjsax. Added some minor comments.

@mjsax
Copy link
Member Author

mjsax commented Jul 26, 2016

@miguno Updated

streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CONSUMER_TIMEOUT);
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the "" + number?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To get data type String -- without it, there is an exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. I was confused because streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); is not using type String for 100 either.

@mjsax
Copy link
Member Author

mjsax commented Jul 27, 2016

@miguno Updated according to last discussion about re-nameing. Also addressed all other comments.

@miguno
Copy link
Contributor

miguno commented Jul 27, 2016

@mjsax: Question: rename bin/kafka-streams-reset.sh to bin/kafka-streams-application-reset.sh?

@mjsax
Copy link
Member Author

mjsax commented Jul 27, 2016

Already done like this.

@mjsax
Copy link
Member Author

mjsax commented Jul 27, 2016

@guozhangwang Please review to get this merged by Friday.

@guozhangwang
Copy link
Contributor

LGTM overall. Is the Jenkins failure related?

Also I will merge in both trunk and 0.10.0 now, but moving forward we may move it to a different package but only in trunk. Just FYI.

@mjsax
Copy link
Member Author

mjsax commented Jul 27, 2016

@guozhangwang just added one commit to remove the test timeout -- hope Jenkins passes now.

@asfgit asfgit closed this in 8deedca Jul 27, 2016
@guozhangwang
Copy link
Contributor

LGTM. Merged to trunk. Will use a separate PR against 0.10.0 branch.

@mjsax mjsax deleted the kafka-3185 branch July 28, 2016 23:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants