Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce method invocation of reservoir sampling #11257

Conversation

yuanlihan
Copy link
Contributor

Fixes #11256.

Description

Adding a new Reservoir Sampling method to sample K elements each time instead of only one element per method invocation.
A default method pickSegmentsToMove will be added to interface BalancerStrategy to pick K segments to move in a single method invocation.


Key changed/added classes in this PR
  • BalancerStrategy
  • ReservoirSegmentSampler
  • BalanceSegments

This PR has:

  • been self-reviewed.(Remove this item if the PR doesn't have any relation to concurrency.)
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to add a benchmark for this code if possible to show the difference of the new implementation, maybe here: https://github.com/apache/druid/tree/master/benchmarks/src/test/java/org/apache/druid/server/coordinator

final BalancerStrategy strategy = params.getBalancerStrategy();
final int maxIterations = 2 * maxSegmentsToMove;
final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
int moved = 0, unmoved = 0;

Iterator<BalancerSegmentHolder> segmetnsToMove = strategy.pickSegmentsToMove(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo segmetnsToMove -> segmentsToMove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

static List<BalancerSegmentHolder> getRandomBalancerSegmentHolders(
final List<ServerHolder> serverHolders,
Set<String> broadcastDatasources,
int k
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to retain the percentOfSegmentsToConsider functionality to allow short-circuiting iterateAllSegments across all servers? We update the docs https://github.com/apache/druid/blob/master/docs/configuration/index.md#dynamic-configuration accordingly either way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to me to retain the percentOfSegmentsToConsider and add an another dynamic parameter useBatchedSegmentSampler to enable the new improvement. In this way, the changing will be less aggressive.

Comment on lines +56 to +64
if (numSoFar < k) {
holders.add(new BalancerSegmentHolder(server.getServer(), segment));
numSoFar++;
continue;
}
int randNum = ThreadLocalRandom.current().nextInt(numSoFar + 1);
if (randNum < k) {
holders.set(randNum, new BalancerSegmentHolder(server.getServer(), segment));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it would be worth adding some code comments here to describe that this algorithm has 2 phases, where it picks the first k segments from the servers, in order, then iterates through the server list randomly replacing these picked segments with decreasing frequency the more segments have been iterated.

Im also somewhat curious/nervous about random this pick method is compared to the previous one, though I'm not entirely sure how it would be measured.

I think we should add a new coordinator dynamic config property, https://github.com/apache/druid/blob/master/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java, to allow falling back to the old algorithm in case there are any unpredictable strange behavior caused by this new algorithm, maybe useBatchedSegmentSampler or .. i'm not sure, naming is hard.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random picking here is a standard implementation of a reservoir sampling algorithm. It makes all segments have the same probability to be selected. The original code also uses this trick, so no need to worry :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Echoing what @clintropolis suggested, it would be useful to have a new dynamic config to toggle between the old and new implementations. We could keep the new config for a couple of releases which would be enough time for operators to test it out and later on we could do away with the config property.
Could we also consider including percentOfSegmentsToConsider as part of this new approach? This is a useful knob especially for larger clusters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to me to retain the percentOfSegmentsToConsider and add an another dynamic parameter useBatchedSegmentSampler to enable the new improvement. In this way, the changing will be less aggressive.

@@ -71,13 +71,60 @@
* @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if
* there are no segments to pick from (i. e. all provided serverHolders are empty).
*/
@Deprecated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't really seem deprecated so much as no longer called at all, maybe we should remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@FrankChen021
Copy link
Member

The new algorithm implementation looks good to me. But it drops the percentOfSegmentsToConsider parameter, I have no idea whether it should be kept. What do you think ? @jihoonson

@a2l007
Copy link
Contributor

a2l007 commented Jun 10, 2021

It would be useful to incorporate percentOfSegmentsToConsider into the new algorithm as it is relevant in larger clusters. It could help the balance segments duty perform even better if say only 50% of the total segments are considered for the move. @capistrant What do you think about this?

@capistrant
Copy link
Contributor

capistrant commented Jun 10, 2021

I haven't read the code yet, just reading comments and responding to my tag from @a2l007 ... I do think it is very helpful to keep the percentOfSegmentsToConsider value. Our largest cluster has over 2MM segments with replication and slashing that value down has allowed us to cut a large chunk out of coordination time in this phase by short circuiting at what we have deemed to be an acceptable point in the iteration. I'll try to read through full PR today and update my comment.

Also this patch and the issue it is resolving would help with the underlying performance issues of the design here. My patch to short circuit was more of a bandaid. This seems like a design improvement that visits the root issue of this method being invoked more than needed! So maybe my tuning config is less helpful once this PR is in? Or maybe it is still useful and both together would be great?

@yuanlihan
Copy link
Contributor Author

I haven't read the code yet, just reading comments and responding to my tag from @a2l007 ... I do think it is very helpful to keep the percentOfSegmentsToConsider value. Our largest cluster has over 2MM segments with replication and slashing that value down has allowed us to cut a large chunk out of coordination time in this phase by short circuiting at what we have deemed to be an acceptable point in the iteration. I'll try to read through full PR today and update my comment.

Also this patch and the issue it is resolving would help with the underlying performance issues of the design here. My patch to short circuit was more of a bandaid. This seems like a design improvement that visits the root issue of this method being invoked more than needed! So maybe my tuning config is less helpful once this PR is in? Or maybe it is still useful and both together would be great?

Hi @capistrant, thanks for your comment. It's great to know that we both have tried to solve this issue.

Ideally, this patch should be able to solve this issue fundamentally while it makes sense to me to evolve gradually in engineering. So, I agree to retain the percentOfSegmentsToConsider improvement and add an another parameter useBatchedSegmentSampler to enable to the new improvement as suggested by previous comments.

And it'll be great if you can help to review this PR!

@yuanlihan
Copy link
Contributor Author

It would be useful to add a benchmark for this code if possible to show the difference of the new implementation, maybe here: https://github.com/apache/druid/tree/master/benchmarks/src/test/java/org/apache/druid/server/coordinator

Here's the benchmark data(or see JMH result graph)

Benchmark                                     (maxSegmentsToMove)                                (mode)  (numberOfSegments)  Mode  Cnt      Score      Error  Units
BalancerStrategyBenchmark.pickSegmentsToMove                   10                               default               10000  avgt   10      2.402 ±    0.019  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10                               default              100000  avgt   10     41.890 ±    2.326  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10                               default             1000000  avgt   10    620.377 ±   14.541  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10  50percentOfSegmentsToConsiderPerMove               10000  avgt   10      1.475 ±    0.008  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10  50percentOfSegmentsToConsiderPerMove              100000  avgt   10     15.383 ±    0.342  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10  50percentOfSegmentsToConsiderPerMove             1000000  avgt   10    310.409 ±   18.073  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10              useBatchedSegmentSampler               10000  avgt   10      0.234 ±    0.003  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10              useBatchedSegmentSampler              100000  avgt   10      3.944 ±    0.199  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10              useBatchedSegmentSampler             1000000  avgt   10     59.548 ±    2.327  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100                               default               10000  avgt   10     22.205 ±    0.105  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100                               default              100000  avgt   10    392.765 ±   23.001  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100                               default             1000000  avgt   10   5759.416 ±  171.515  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100  50percentOfSegmentsToConsiderPerMove               10000  avgt   10     13.925 ±    0.026  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100  50percentOfSegmentsToConsiderPerMove              100000  avgt   10    181.052 ±    2.517  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100  50percentOfSegmentsToConsiderPerMove             1000000  avgt   10   2900.901 ±   81.967  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100              useBatchedSegmentSampler               10000  avgt   10      0.276 ±    0.001  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100              useBatchedSegmentSampler              100000  avgt   10      4.330 ±    0.037  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100              useBatchedSegmentSampler             1000000  avgt   10     56.426 ±    1.235  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000                               default               10000  avgt   10    222.513 ±    0.559  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000                               default              100000  avgt   10   4369.863 ±   62.496  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000                               default             1000000  avgt   10  57107.843 ± 1082.160  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000  50percentOfSegmentsToConsiderPerMove               10000  avgt   10    136.567 ±    0.301  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000  50percentOfSegmentsToConsiderPerMove              100000  avgt   10   1766.142 ±   12.927  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000  50percentOfSegmentsToConsiderPerMove             1000000  avgt   10  28437.935 ±  257.949  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000              useBatchedSegmentSampler               10000  avgt   10      0.303 ±    0.002  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000              useBatchedSegmentSampler              100000  avgt   10      4.247 ±    0.057  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000              useBatchedSegmentSampler             1000000  avgt   10     59.161 ±    1.597  ms/op

@yuanlihan yuanlihan force-pushed the reduce-method-invocation-of-reservoir-sampling branch from 6f2ad91 to 3b14ef2 Compare July 7, 2021 09:49
@yuanlihan yuanlihan force-pushed the reduce-method-invocation-of-reservoir-sampling branch from 3b14ef2 to b02a9e6 Compare July 7, 2021 10:05
Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuanlihan whoa, the benchmark result looks great 👍 The latest change LGTM.


@Override
public BalancerSegmentHolder next()
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

        if (!hasNext()) {
          throw new NoSuchElementException();
        }

@jihoonson
Copy link
Contributor

@FrankChen021 @a2l007 @clintropolis do you have more comments?

@FrankChen021
Copy link
Member

@yuanlihan Great work. Code change LGTM. For benchmark, could you add a combination of 50percentOfSegmentsToConsiderPerMove and useBatchedSegmentSampler so that we could see how percentOfSegmentsToConsider improves the performance based on the new algorithm. This benchmark could help us to determine whether we need to keep percentOfSegmentsToConsider in future.

@a2l007
Copy link
Contributor

a2l007 commented Jul 30, 2021

Thanks for this patch. Looking forward to using this in our clusters. It would be nice if you could add a few lines in the doc regarding a recommended value for useBatchedSegmentSampler. Cluster operators could use this value as a starting point to see how it impacts their cluster. This is not a blocker for this PR, but can be done as a followup as well.

Copy link
Member

@asdf2014 asdf2014 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for your contribution

@asdf2014 asdf2014 merged commit b837421 into apache:master Jul 30, 2021
@jihoonson jihoonson mentioned this pull request Jul 30, 2021
1 task
@clintropolis clintropolis added this to the 0.22.0 milestone Aug 12, 2021
@capistrant
Copy link
Contributor

It would be useful to add a benchmark for this code if possible to show the difference of the new implementation, maybe here: https://github.com/apache/druid/tree/master/benchmarks/src/test/java/org/apache/druid/server/coordinator

Here's the benchmark data(or see JMH result graph)

Benchmark                                     (maxSegmentsToMove)                                (mode)  (numberOfSegments)  Mode  Cnt      Score      Error  Units
BalancerStrategyBenchmark.pickSegmentsToMove                   10                               default               10000  avgt   10      2.402 ±    0.019  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10                               default              100000  avgt   10     41.890 ±    2.326  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10                               default             1000000  avgt   10    620.377 ±   14.541  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10  50percentOfSegmentsToConsiderPerMove               10000  avgt   10      1.475 ±    0.008  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10  50percentOfSegmentsToConsiderPerMove              100000  avgt   10     15.383 ±    0.342  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10  50percentOfSegmentsToConsiderPerMove             1000000  avgt   10    310.409 ±   18.073  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10              useBatchedSegmentSampler               10000  avgt   10      0.234 ±    0.003  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10              useBatchedSegmentSampler              100000  avgt   10      3.944 ±    0.199  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                   10              useBatchedSegmentSampler             1000000  avgt   10     59.548 ±    2.327  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100                               default               10000  avgt   10     22.205 ±    0.105  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100                               default              100000  avgt   10    392.765 ±   23.001  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100                               default             1000000  avgt   10   5759.416 ±  171.515  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100  50percentOfSegmentsToConsiderPerMove               10000  avgt   10     13.925 ±    0.026  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100  50percentOfSegmentsToConsiderPerMove              100000  avgt   10    181.052 ±    2.517  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100  50percentOfSegmentsToConsiderPerMove             1000000  avgt   10   2900.901 ±   81.967  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100              useBatchedSegmentSampler               10000  avgt   10      0.276 ±    0.001  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100              useBatchedSegmentSampler              100000  avgt   10      4.330 ±    0.037  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                  100              useBatchedSegmentSampler             1000000  avgt   10     56.426 ±    1.235  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000                               default               10000  avgt   10    222.513 ±    0.559  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000                               default              100000  avgt   10   4369.863 ±   62.496  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000                               default             1000000  avgt   10  57107.843 ± 1082.160  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000  50percentOfSegmentsToConsiderPerMove               10000  avgt   10    136.567 ±    0.301  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000  50percentOfSegmentsToConsiderPerMove              100000  avgt   10   1766.142 ±   12.927  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000  50percentOfSegmentsToConsiderPerMove             1000000  avgt   10  28437.935 ±  257.949  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000              useBatchedSegmentSampler               10000  avgt   10      0.303 ±    0.002  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000              useBatchedSegmentSampler              100000  avgt   10      4.247 ±    0.057  ms/op
BalancerStrategyBenchmark.pickSegmentsToMove                 1000              useBatchedSegmentSampler             1000000  avgt   10     59.161 ±    1.597  ms/op

wow, awesome benchmark results! So sorry that I didn't participate more in the review of this PR, I hav been pulled away from Druid more than I'd like lately. Regardless, I see that y'all had it more than covered! So excited to start using this 💯

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reduce method invocation of reservoir sampling
7 participants