Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-221 / Add KStream#repartition operation #7170

Merged

Conversation

lkokhreidze
Copy link
Contributor

@lkokhreidze lkokhreidze commented Aug 6, 2019

KIP-221: Enhance DSL with Connecting Topic Creation and Repartition Hint

Tickets: KAFKA-6037 KAFKA-8611

Description

This is PR for KIP-221. Goal of this PR is to introduce new KStream#repartition operator and underline machinery that can be used for repartition configuration on KStream instance.

Notable Changes

  • Introduced org.apache.kafka.streams.kstream.internals.graph.UnoptimizableRepartitionNode. This node is NOT subject of optimization algorithm, therefore, each repartition operation is excluded from optimization algorithm.
  • Introduced org.apache.kafka.streams.processor.internals.InternalTopicProperties class that can be used for capturing repartition topic configurations passed via DSL operations
  • Added org.apache.kafka.streams.processor.internals.InternalTopologyBuilder#internalTopicNamesWithProperties map for storing mapping between internal topics and their corresponding configuration. If configuration is present RepartitionTopicConfig is enriched with configurations passed via DSL operations (In this case via org.apache.kafka.streams.kstream.Repartitioned class).
  • Added KStreamRepartitionIntegrationTest for testing different scenarios of KStream#repartition
  1. Should create repartition topic if key changing operation was NOT performed
  2. Should Perform Key Select Operation When Repartition Operation Is Used With Key Selector
  3. Should Create Repartition Topic With Specified Number Of Partitions
  4. Should Inherit Repartition Topic Partition Number From Upstream Topic When Number Of Partitions Is Not Specified.
  5. Should Create Only One Repartition Topic When Repartition Is Followed By GroupByKey.
  6. Should Generate Repartition Topic When Name Is Not Specified.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax
Copy link
Member

mjsax commented Aug 6, 2019

@lkokhreidze Thanks for the PR. There are some checkstyle errors. Can you please fix them before we review your PR?

@mjsax mjsax added the streams label Aug 6, 2019
@lkokhreidze
Copy link
Contributor Author

@mjsax done

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.

Made an initial pass. I still need to wrap my head around the optimization layer and how we merge repartition nodes. We need to add more test to RepartitiontTopicNameTest and/or StreamsGraphTest IMHO, to verify that the new repartition() operator works as intended.

Also, it seems you forgot to update groupBy() and groupByKey().

Finally, thinking about the KIP once more: as we extend groupBy to configure the internal repartition topic, I am wondering if we should extend the KIP and also allow to do this for join() that may also create repartition topics? \cc @guozhangwang @bbejeck @vvcephei @ableegoldman @cadonna @abbccdda

@lkokhreidze
Copy link
Contributor Author

Thanks for the PR.

Made an initial pass. I still need to wrap my head around the optimization layer and how we merge repartition nodes. We need to add more test to RepartitiontTopicNameTest and/or StreamsGraphTest IMHO, to verify that the new repartition() operator works as intended.

Also, it seems you forgot to update groupBy() and groupByKey().

Finally, thinking about the KIP once more: as we extend groupBy to configure the internal repartition topic, I am wondering if we should extend the KIP and also allow to do this for join() that may also create repartition topics? \cc @guozhangwang @bbejeck @vvcephei @ableegoldman @cadonna @abbccdda

@mjsax this is the first PR (written in PR description) KStream#groupBy changes will be part of next PR if this one passes. I didn't want to create big PR without validating underline machinery and logic in the first place. I'm okay doing all the changes here, I was just thinking it will make review harder.

@lkokhreidze
Copy link
Contributor Author

lkokhreidze commented Aug 7, 2019

Made an initial pass. I still need to wrap my head around the optimization layer and how we merge repartition nodes. We need to add more test to RepartitiontTopicNameTest and/or StreamsGraphTest IMHO, to verify that the new repartition() operator works as intended.

Agreed, I'll do that. I wanted to tag @bbejeck as seems like he's the main author behind optimization logic. I'll add tests for optimization logic to make sure nothing breaks.

@mjsax
Copy link
Member

mjsax commented Aug 7, 2019

Ah. I guess, I skipped the PR description... Sorry for that.

I discussed the proposal with @vvcephei in person, and thinking about the semantics once more, I am actually wondering if it is wise to change groupBy at all (this thought also affects my previous comment to include join() -- I would like to retract this idea :) )

We have basically two dimensions which 2 cases each to consider for groupBy (and also join():

  1. repartition not required -- no Repartitioned object provided
  2. repartition not required -- Repartitioned object is provided
  3. repartition required -- no Repartitioned object provided
  4. repartition required --Repartitioned object is provided

Case (1), (2), and (4) are straight forward. However, case (2) is somewhat awkward because we actually want to treat Repartitioned as a configuration object but specifying it in groupBy should not enforce a repartitioning (if one want to enforce a repartitioning, they should use the new repartition() operator). Hence, for case (2) the Repartitioned configuration would be ignored.

Therefore, only case (4) is left in which passing in Repartitioned would have an effect. For this case, it would be possible to change the number of partitions or to specify a different partitioner. (I ignore the serde and naming because this can be achieved via Grouped anyway). However, if one wants to change the number of partitions or wants to set a specific partitioner, it seems they want to to apply (ie enforce) this configuration independent of the upstream topic configuration; ie, it is actually a case for which the user wants to enforce a repartitioning. Hence, it seems perfectly fine (and actually better, because it's semantically more explicit) that a user should call repartition() instead.

Therefore, I don't see a good use case for which it make sense to pass in Repartitioned into groupBy.

Would be great if you could share your thoughts about it?

A second point I discussed with @vvcephei is about the optimization. We both have the impression that repartition() should not be subject to the repartition topic optimization. Instead, an enforced repartitioning step, should be added to the topology in a hard coded way similar to a call to through(). Maybe we can be some advanced optimization rules later, but it seems difficult (and potentially unsafe/incorrect) to apply the repartition topic optimization for this case. Hence, we would suggest to skip this optimization in this PR.

@ableegoldman
Copy link
Contributor

However, if one wants to change the number of partitions or wants to set a specific partitioner, it seems they want to to apply (ie enforce) this configuration independent of the upstream topic configuration; ie, it is actually a case for which the user wants to enforce a repartitioning.

Not sure I agree @mjsax -- maybe you just want to control the parallelism in case a repartition is required? You could enforce users to step through their whole topology, figure out when/where repartitioning is needed, and use repartition to set the parallelism. Or, you could let Streams do this for you -- as it currently does, way more conveniently and probably less error-prone -- and supply a configuration to be used if Streams figures out you need to repartition.

@mjsax
Copy link
Member

mjsax commented Aug 7, 2019

maybe you just want to control the parallelism in case a repartition is required

I don't see this as a use case in practice. Why would one want to change the parallelism? Because, the aggregation operation is over or under provisions and thus one wants to decrease or increase the parallelism. If I am ok with the "default" parallelism in case there is no repartitioning, why would I not be ok with it if data is repartitioned?

You could enforce users to step through their whole topology, figure out when/where repartitioning is needed, and use repartition to set the parallelism.

This is less an issue IMHO, because if I want to scale up for example, it's sufficient to insert repartition() once upstream and all downstream auto-created topics would inherit the parallelism implicitly (as they inherit it now from source topics). Hence, I don't need to insert repartition all over the place.

@ableegoldman
Copy link
Contributor

If I am ok with the "default" parallelism in case there is no repartitioning, why would I not be ok with it if data is repartitioned?
Maybe you're processing data from a topic on your company's cluster, which has a huge number of partitions to begin with. Maybe your workload needs nowhere near this many partitions (you're filtering out most records, it's overpartitioned to begin, your just testing). You run your Streams app which creates some N topics all of which have this huge number of partitions. Your brokers struggle and your boss gets mad?
Why does anyone want to change this ever? (ie with repartition)

it's sufficient to insert repartition() once upstream
That's a good point though. So the repartition operator is really a "auto-create topic" + "set parallelism" operator -- should be sure to document this well. Now, what if someone wants to configure the parallelism of a certain repartition topic(s) but would like to continue using the source topic's parallelism as the default?
Not saying we should necessarily support that, but we should at least make it very clear to users how using this will affect downstream topics.

@mjsax
Copy link
Member

mjsax commented Aug 8, 2019

Why does anyone want to change this ever? (ie with repartition)

My question is, why do you need groupByKey(Repartitioned.with())? If you want to scale down, it seems better to explicitly call repartition(Repartitioned.with()).groupByKey() -- otherwise, you might not scale down if not auto-repartition topic is created and it seems rather error prone that we allow to specify the number of partitions and than ignore the config entirely.

Similar to your second comment, if you want to "scale up" again later, you call repartition() again. I agree that we need to document this explicitly if we follow this route. However, it's similar behavior as we have as-of now. If you insert a through() all downstream operators inherit the parallelism from it.

@ableegoldman
Copy link
Contributor

Ok, well I am fine with this framing it as a "set parallelism" operation...I don't want to stall this KIP/PR further, but what if this was split into a new set of setParallelism and repartition operators, where the repartition just auto-creates the topic while upstream setParallelism is responsible for setting the number of partitions?
Just wondering if it's worth making this more explicit, since there's really two new functionalities being introduced here. Doesn't hurt to bundle them into one operator, as long as users know what two things it actually does...

@lkokhreidze
Copy link
Contributor Author

lkokhreidze commented Aug 8, 2019

Hello @mjsax @ableegoldman @vvcephei
Thanks a lot for valuable insights and interesting discussion.
@mjsax - your arguments make sense, but I'm leaning more towards @ableegoldman points.
In addition, in my mind, one other important point that we need to take into account is not only parallelism but general configuration of repartition topics. In my mind, this KIP can be the foundation of actually giving users control over each individual repartition topic configuration. To be honest, I was tempted to propose deprecating KStream#groupBy(Grouped) operation altogether. Let me explain my reasoning a bit. After this KIP, I don't see any actual benefit nor need of actually using KStream#groupBy(Grouped). With KStream#groupBy(Repartitioned) user can do exact same things, plus more. Right now, in KStream there're Grouped and Joined configuration classes (and maybe some others that I'm missing) that are used for specifying
a) topology name (which translates to repartition topic naming)
b) producer serdes
c) partitioner
All those configurations can be encapsulated under Repartitioned, in addition with all other topic level configurations that user may want to pass to internal topic creation. Maybe this was discussed before, and there's a good reason why there're separate configuration classes for each operation (besides giving api the nice look, of course :) ).
One argument that comes to mind why we may actually want to have Repartitioned for groupBy is simple syntax sugar. For example, there isn't fundamental different between this two topologies:

builder
    .stream()
    .repartition((key, value) -> value.newKey(), Repartitioned.with("by-new-key").withNumberOfPartitions(2))
    .groupByKey()
builder
    .stream()
    .groupBy((key, value) -> value.newKey(), Repartitioned.with("by-new-key").withNumberOfPartitions(2))

While, for me, as a user, 2nd option looks much more appealing, similarly how key selector for KStream#groupBy merges together two operations (selectKey().groupBy()).

Again, your arguments are totally valid, and all can be achieved just by having repartition(Repartitioned) operation. But on the other hand, I don't see anything bad with adding Repartitioned option to groupBy. It won't break API semantics (at least I think it won't) and will give the user extra flexibility around controlling repartition topics.

@vvcephei vvcephei self-requested a review March 14, 2020 18:09
@lkokhreidze
Copy link
Contributor Author

lkokhreidze commented Mar 15, 2020

Thanks @vvcephei for the update and no worries :)

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, well. I did start the review, and made a fair amount of progress before getting sidetracked by a global catastrophe...

It's still in my "actively working on this" bucket, and I'll commit to not starting new work until I finish my review. For now, I'll go ahead and ask this one question, which came up early in my review. I skimmed over the KIP and discussion thread, but didn't see a specific discussion of the overload in question.

@vvcephei
Copy link
Contributor

test this please

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, @lkokhreidze , I finally finished my review, and it looks good to me. I'm not sure if @mjsax wants to make another pass.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay in reviewing!! And thanks to @vvcephei to help pushing this through.

Overall LGTM. Thanks for writing extensive tests!!!

}

@Test
public void shouldCreateOnlyOneRepartitionTopicWhenRepartitionIsFollowedByGroupByKey() throws ExecutionException, InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above: we should be able to test with via unit tests using Topology#describe()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought about that, but somehow it felt "safer" with integration tests. Mainly because I was more comfortable verifying that topics actually get created when using repartition operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a similar thought, that it looks like good fodder for unit testing, but I did like the safety blanket of verifying the actual partition counts. I guess I'm fine either way, with a preference for whatever is already in the PR ;)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly because I was more comfortable verifying that topics actually get created when using repartition operation.

I guess that is fair. (I just try to keep test runtime short if we can -- let's keep the integration test.)

}

@Test
public void shouldGenerateRepartitionTopicWhenNameIsNotSpecified() throws ExecutionException, InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be unit-test able via Topology#describe() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought about that, but somehow it felt "safer" with integration tests. Mainly because I was more comfortable verifying that topics actually get created when using repartition operation.

}

@Test
public void shouldGoThroughRebalancingCorrectly() throws ExecutionException, InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this test is about, ie, how does is relate to the repartition() feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's related to this comment #7170 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying!

}

@Test
public void shouldInvokePartitionerWhenSet() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this test actually verifies?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the "easiest" way I could figure out to verify that custom partitioner is invoked when it's set

Co-Authored-By: John Roesler <vvcephei@users.noreply.github.com>
@lkokhreidze
Copy link
Contributor Author

Hi @mjsax, I've addressed your comments, would appreciate another review.

@lkokhreidze
Copy link
Contributor Author

lkokhreidze commented Apr 7, 2020

Hi @mjsax @vvcephei

Small update: f2bcdfe In this commit I've added Topology optimization option as test parameter. This PR touches topology optimization (indirectly). In order to make sure that everything works as expected, I though it would beneficial in the integration tests verifying both, topology.optimization: all and topology.optimization: none configurations. Hope this makes sense.

Regards,
Levani

@vvcephei
Copy link
Contributor

vvcephei commented Apr 7, 2020

Wow, that's great. Thanks, @lkokhreidze !

Arrays.asList(StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION)
.forEach(x -> values.add(new Object[]{x}));

return values;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unnesseary complex? A simple

        return Arrays.asList(new String[][] {
            {StreamsConfig.OPTIMIZE},
            {StreamsConfig.NO_OPTIMIZATION}
        });

would do, too :)

(Feel free to ignore the comment.)

return values;
}

public KStreamRepartitionIntegrationTest(final String topologyOptimization) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simple

    @Parameter
    public String topologyOptimization;

Would be sufficient instead of adding a constructor and those lines could go into before().

(As above, feel free to ignore this comment.)

@mjsax mjsax merged commit e131a99 into apache:trunk Apr 9, 2020
@mjsax
Copy link
Member

mjsax commented Apr 9, 2020

Merged to trunk. Congrats @lkokhreidze! And thanks a lot for your hard work and patience!

@vvcephei
Copy link
Contributor

Yes, thank you @lkokhreidze for seeing this through!

@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
6 participants