Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3856: Refactoring for KIP-120 #3536

Closed

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Jul 17, 2017

  • extract InternalTopologyBuilder from TopologyBuilder
  • deprecate all "leaking" methods from public TopologyBuilder API
  • changed TopologyDescription and all nested classed into interfaces

@mjsax mjsax force-pushed the kafka-3856-extract-internal-topology-builder branch from b963505 to 9783943 Compare July 17, 2017 09:16
@mjsax
Copy link
Member Author

mjsax commented Jul 17, 2017

Call for review @bbejeck @dguy @enothereska @guozhangwang

@asfgit
Copy link

asfgit commented Jul 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6110/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jul 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6095/
Test PASSed (JDK 8 and Scala 2.12).

@@ -222,9 +222,9 @@ public void shouldDriveGlobalStore() throws Exception {
}

@Test
public void testDrivingSimpleMultiSourceTopology() {
public void testDrivingSimpleMultiSourceTopology() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which part throws the exception? (same question for the other tests below). Could we use a specific exception, or is it broad?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, this is some "left over" as I cut down this PR to make is smaller... (ie, did remove some other changes that will follow in another RP -- just forgot to undo this changes to -- should be ok to just leave it IMHO) For test in general it's ok to just declare "Exception" as nobody catches it anyway (it's best practice in Java AFAIK).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to leave in. IMHO it makes the test more readable without the try/catch boilerplate.

@enothereska
Copy link
Contributor

Massive cleanup, thanks @mjsax. I couldn't find org.apache.kafka.streams.TopologyDescription as described in the KIP. Is this WiP?

@mjsax
Copy link
Member Author

mjsax commented Jul 17, 2017

@enothereska I'll split up the whole KIP in multiple PRs. Otherwise it's a mess to review. There is already #2301 that does add TopologyDescription -- but there will be at least 2 more PRs (one for adding Topology and one for adding StreamsBuilder). Small PRs FTW!

@bbejeck
Copy link
Contributor

bbejeck commented Jul 18, 2017

Looks good, thanks for the big cleanup.

One question (sorry if this is an obvious one): the TopologyBuilder has the InternalTopologyBuilder marked as deprecated. The deprecation indicates using the InternalTopologyBuilder from within TopologyBuilder is deprecated and not the InternalTopologyBuilder itself, correct?

@guozhangwang
Copy link
Contributor

guozhangwang commented Jul 21, 2017

@mjsax I'm wondering if could you try breaking this PR into to commits for easier reviewing? One commit for just extracting out the InternalTopologyBuilder from TopologyBuilder without the actual impl of the InternalTopologyBuilder, just to show what are the left public APIs of TopologyBuilder and what are the internal interfaces of InternalTopologyBuilder?

EDIT: actually nvm, assuming that the InternalTopologyBuilder did not change any actual logic of the original TopologyBuilder (it is very hard to tell from the diff file) this is okay to review.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments, otherwise LGTM.

import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Sum;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never know how do you find these: good catch :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intellij does -- it automatically sort imports if you enable it :)

@@ -36,7 +36,7 @@
* Factory for creating state stores in Kafka Streams.
*/
@InterfaceStability.Evolving
public class Stores {
public class Stores {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not intentional.

public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final String... topics) {
return addSource(offsetReset, name, null, null, null, topics);
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name, final String... topics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if we want to break into multi-lines better do that one parameter per line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! That was the intent. Just missed this one.

throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added.");
}
storeToChangelogTopic.put(sourceStoreName, topic);
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add similar NOTE as in 668 as well, also mentioning it has been deprecated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto for all other deprecated functions: mention it should only be used internally and has been deprecated.

@guozhangwang
Copy link
Contributor

@mjsax could you rebase?

@mjsax
Copy link
Member Author

mjsax commented Jul 21, 2017

@bbejeck

One question (sorry if this is an obvious one): the TopologyBuilder has the InternalTopologyBuilder marked as deprecated. The deprecation indicates using the InternalTopologyBuilder from within TopologyBuilder is deprecated and not the InternalTopologyBuilder itself, correct?

Yes.

 - extract InternalTopologyBuilder from TopologyBuilder
 - deprecate all "leaking" methods from public TopologyBuilder API
@mjsax mjsax force-pushed the kafka-3856-extract-internal-topology-builder branch 2 times, most recently from 80653c4 to bfc2527 Compare July 21, 2017 11:53
@asfgit
Copy link

asfgit commented Jul 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6251/
Test PASSed (JDK 7 and Scala 2.11).

@mjsax mjsax force-pushed the kafka-3856-extract-internal-topology-builder branch from bfc2527 to d53157d Compare July 21, 2017 12:58
@mjsax
Copy link
Member Author

mjsax commented Jul 21, 2017

Did rebase this and addressed "dangling comments" from #2301
Also converted TopologyDescription and all its nested classed into interfaces (need to update KIP if we keep it this way).

@asfgit
Copy link

asfgit commented Jul 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6256/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jul 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6240/
Test PASSed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Contributor

@mjsax LGTM now. I just left two follow-up comments to your replies on #2301 for 1) package names and 2) printing none. LMK.

@mjsax
Copy link
Member Author

mjsax commented Jul 22, 2017

Updates with regard to both comments on #2301

@asfgit
Copy link

asfgit commented Jul 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6285/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jul 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6269/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax I left a comment on the public APIs of describe(), as described in https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API. Will be merging as is since I understand this is not the last PR of KIP-120 so you may have some ideas about modifying such a bit later, but just as a reminder..

return false;
}

public TopologyDescription describe() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. The function is aimed to be a public API in Topology in KIP-120 (which I think will be renamed from the current TopologyBuilder?), but here it is only used as the function of the InternalTopologyBuilder which seems incorrect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the function is aimed to be a public API in Topology in KIP-120

correct

which I think will be renamed from the current TopologyBuilder?

No. We need to keep TopologyBuilder for backward compatibility (just deprecating it) and add Topology next to it. And Topology will also have describe() in it's public API and than call InternalTopologyBuilder#describe() (we might also be able to move this method completely from InternalTopologyBuilder to Topology, but this would be just an internal refactoring.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification.

Also one thing I notice from the unit tests is that in StreamThreadTest we are accessing the builder's internalTopologyBuilder object directly which should be deprecated and removed eventually from the TopologyBuilder. I believe you are going to update on that with the next follow-up PRs right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, that we will need to do some refactoring there. Not sure though if this should be part of KIP-120 or its own JIRA. (I would prefer own JIRA.)

With InternalTopologyBuilder being in package internals we can do any kind of refactoring we want without impacting users. There are couple of things we need to think through carefully for this part. I also hope, that we might be able to break some code into more classes that would also simplify testing (hopefully). But this part is hard to design on a whiteboard and must be done in an exploration way "playing" with the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

@asfgit asfgit closed this in 5d79851 Jul 24, 2017
@guozhangwang
Copy link
Contributor

Merged to trunk.

@mjsax mjsax deleted the kafka-3856-extract-internal-topology-builder branch June 5, 2018 23:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants