Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-2678 Improve performance of LoadAwareShuffleGrouping #2261

Merged
merged 7 commits into from
Sep 7, 2017

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Aug 4, 2017

  • construct ring which represents distribution of tasks based on load
  • chooseTasks() just accesses the ring sequentially
  • port related tests from Clojure to Java

I'm not expert of micro-benchmark but I also craft some of simple performance tests which you can see them from LoadAwareShuffleGroupingTest. They are testBenchmarkLoadAwareShuffleGroupingEvenLoad and testBenchmarkLoadAwareShuffleGroupingUnevenLoad , and I put @Ignore to avoid running unless we want to do performance test on.

Here's my observation on running them, using old and new LoadAwareShuffleGrouping:

testBenchmarkLoadAwareShuffleGroupingEvenLoad (old)
Duration: 114470 ms
Duration: 115973 ms
Duration: 114807 ms

testBenchmarkLoadAwareShuffleGroupingEvenLoad (new)
Duration: 106819 ms
Duration: 105857 ms
Duration: 106789 ms

testBenchmarkLoadAwareShuffleGroupingUnevenLoad (old)
Duration: 113484 ms
Duration: 118152 ms
Duration: 112664 ms

testBenchmarkLoadAwareShuffleGroupingUnevenLoad (new)
Duration: 106071 ms
Duration: 105938 ms
Duration: 106115 ms

You can see that modified LoadAwareShuffleGrouping is faster than before, 5% or more for single threaded access. Maybe would want to do multi-threading performance test, with keeping in mind that accessing OutputCollector with single-thread is preferred over multi-threads.

This still respects thread-safety, and I think respecting thread-safety is better than before, given that we only allow one thread to update the ring, and we replace the new information at once, not updating information on the fly while other threads are referencing.
We still don't guard information with mutual-exclusion manner, but I think it is tolerable like we do before.

I'm planning to explore some more, mostly about reducing call System.currentTimeMillis() in chooseTasks(). I'll put additional commits if I find any more improvements: it will be easy to revert some if we don't want to.

@HeartSaVioR
Copy link
Contributor Author

Build failure is missing removing CHANGELOG.md to binary distribution pom. I'll address and rebase.

* construct ring which represents distribution of tasks based on load
* chooseTasks() just accesses the ring sequentially
* port related tests from Clojure to Java
* add performance tests for multi-threads
@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Aug 4, 2017

From second commits I added performance tests on multi threads: testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded and testBenchmarkLoadAwareShuffleGroupingUnevenLoadAndMultiThreaded.

I started exploring this with 2 threads, and may explore it with more threads if necessary.

testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded (old)
Max duration among threads is : 185274 ms
Max duration among threads is : 189359 ms

testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded (new)
Max duration among threads is : 123411 ms
Max duration among threads is : 128937 ms

testBenchmarkLoadAwareShuffleGroupingUnevenLoadAndMultiThreaded (old)
Max duration among threads is : 184834 ms
Max duration among threads is : 185551 ms

testBenchmarkLoadAwareShuffleGroupingUnevenLoadAndMultiThreaded (new)
Max duration among threads is : 123978 ms
Max duration among threads is : 124113 ms

The test result clearly shows that both are having performance hit on multi-threads but new LASG is affected much less than old LASG.

* introduce 'skip checking update count'
  * we no longer call System.currentTimeMillis() every time
  * but we call AtomicInteger.incrementAndGet() every time
  * this may hurt multi-thread perf. but really faster with single-thread
@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Aug 4, 2017

Now I introduce 'skip checking update count' to avoid calling System.currentTimeMillis() every time, but it has clear trade-off, we should call AtomicInteger.incrementAndGet() every time.
I set the skip checking update index to 10, 100, 1000, 10000, 100000, and re-run tests. Test result is below:

10

testBenchmarkLoadAwareShuffleGroupingEvenLoad
Duration: 48650 ms
Duration: 49058 ms
Duration: 48445 ms

testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded
Max duration among threads is : 159369 ms
Max duration among threads is : 130093 ms
Max duration among threads is : 138557 ms

100

testBenchmarkLoadAwareShuffleGroupingEvenLoad
Duration: 41093 ms
Duration: 40393 ms
Duration: 40524 ms

testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded
Max duration among threads is : 142575 ms
Max duration among threads is : 139276 ms
Max duration among threads is : 145470 ms

1000

testBenchmarkLoadAwareShuffleGroupingEvenLoad
Duration: 40238 ms
Duration: 39715 ms
Duration: 39242 ms

testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded
Max duration among threads is : 168089 ms
Max duration among threads is : 161082 ms
Max duration among threads is : 169998 ms

10000

testBenchmarkLoadAwareShuffleGroupingEvenLoad
Duration: 40535 ms
Duration: 39319 ms
Duration: 46815 ms

testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded
Max duration among threads is : 140426 ms
Max duration among threads is : 166214 ms
Max duration among threads is : 169368 ms

100000

testBenchmarkLoadAwareShuffleGroupingEvenLoad
Duration: 39801 ms
Duration: 39535 ms
Duration: 39537 ms

testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded
Max duration among threads is : 147115 ms
Max duration among threads is : 140722 ms
Max duration among threads is : 172955 ms

Test result seems to fluctuate, but we can see that the change is good roughly.

Now multi-threads fluctuates more and hurts on performance compared to before, but still faster than old LASG's. What we really get is performance improvement with single-thread: we reduced more than half of time than before.

The value of ‘skip checking update count' should be reasonable to answer the question: "Are we OK to delay updating load information if less than N (the value) calls occurred within 1 sec?" We may want to put better efforts to find the value (given that test results was not stable enough), but at least from test result, 100 seems be a good value. Higher value doesn't show linear performance improvement.

Btw, update duration (M secs) is another variable to explore. Maybe also need to see how often origin load information gets updated, since it is meaningless that LASG updates the information more often then origin load information gets updated.

UPDATE: load is updated per 1 sec so update duration seems correct.

* add a new way to provide LoadMapping: via push
* refresh load and push updated LoadMapping to all groupers when refreshLoadTimer is activated
* update tests to reflect the change
@HeartSaVioR
Copy link
Contributor Author

I just take opposite approach, pushing updated load mapping when load updater timer is activated. We no longer need any tricks or optimizations to reduce checking, and even no need to check updating duration.

This is based on fact that LoadAwareShuffleGrouping.updateRing() doesn't break other threads calling LoadAwareShuffleGrouping.chooseTasks() concurrently.

I guess we couldn't optimize better easily unless we change some specifications like allowing non-thread-safety or so.

testBenchmarkLoadAwareShuffleGroupingEvenLoad
Duration: 27596 ms
Duration: 27772 ms

testBenchmarkLoadAwareShuffleGroupingEvenLoadAndMultiThreaded
Max duration among threads is : 89274 ms
Max duration among threads is : 86466 ms

Given that it changes some interfaces, I would like to see many reviewers reviewing and providing opinions.

* change everything to Array and pre-allocate all
* use static length for choices
* prepare backup array for choices pre-allocated and swap to avoid allocating arrays
@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Aug 9, 2017

I explored and applied more changes:

  • change ArrayList to pre-allocated array (they're only allocated from prepare())
  • fixed length of chooses length: 1000
    • at first I set this to 100, and some tests were failing due to distribution

This might use a bit more memory, but will get rid of objects allocation on each updating loads.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Aug 22, 2017

Now I have another numbers to persuade this patch.

I just take same approach to what @Ethanlm is done with his patch #2270
Performance testing on ConstSpoutNullBoltTopo with ACKing disabled.

  1. Config: topology.message.timeout: 300; topology.max.spout.pending: 5000; topology.acker.executors: 0
  2. 1 VM from AWS c4.xlarge, dedicated to get more accurate result
  3. Launched 1 workers, 1 spout task and 1 bolt task. ACKing disabled.
  4. All experiments ran 300s.
  5. For clarity, only show the outputs at 240s.
  6. tested 3 times for each case, and pick one which showed median result
  7. Numbers fluctuate slightly during the experiments.

Used 08038b6 (last commit) for this patch, and 77354fe for baseline (master).

Grouping transferred (messages) transfer rate (message/s) spout_transferred spout_acks spout_throughput (acks/s)
New LocalOrShuffle (patch) 167984520 2799742 167984520 167984520 2799742
LocalOrShuffle (master) 130891240 2181520 130891240 130891260 2181520
LocalOrShuffle with loadaware disabled (master) 161410760 2690179 161410760 161410740 2690179

So the new LoadAwareShuffleGrouping is definitely faster than current LoadAwareShuffleGrouping (about 28%), and even faster than current ShuffleGrouping (about 4%).

Logically the performance for LoadAwareShuffleGrouping is same or slower than ShuffleGrouping (since the logic behind chooseTask() is identical and LoadAwareShuffleGrouping has another overhead on reconstructing information), but the test showed opposite than I expected.
The test may show the additional possibility on ShuffleGrouping: we could also replace List with array and see how it helps.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Aug 22, 2017

I got back to 8f63d5a which doesn't touch any interfaces and do same tests:

Grouping transferred (messages) transfer rate (message/s) spout_transferred spout_acks spout_throughput (acks/s)
New LocalOrShuffle (patch) 160441600 2674026 160441600 160441580 2674026

It is a bit slower than ShuffleGrouping but still faster than LoadAwareShuffleGrouping (about 22%).

So we can choose either better improvement with touching multiple parts or still great improvement without touching other parts.

I have tested another thing, replacing List with Array in ShuffleGrouping. Test result is below:

Grouping transferred (messages) transfer rate (message/s) spout_transferred spout_acks spout_throughput (acks/s)
LocalOrShuffle with loadaware disabled (master) 161437800 2690630 161437800 161437760 2690630

It doesn't seem to bring noticeable improvement.

The difference may be the length of the array: the array is too small (would have 1 element) in test and had to call another set() in addition to incrementAndGet() for every time. Please note that the length of array in the patch is 1000, so set() is called every 1000 times.

We could grow the array in prepare() if the count of target tasks is less than some threshold to get better performance, but that's going to be a micro-optimization and I'm not clear we would want to apply.

@HeartSaVioR
Copy link
Contributor Author

@HeartSaVioR
Copy link
Contributor Author

@revans2 @roshannaik
I know you're busy, but could you have time to take a look at the change? I guess it is clear improvement and I provide raw numbers to see the difference.

if (rightNow < CAPACITY) {
return rets[choices[rightNow]];
} else if (rightNow == CAPACITY) {
current.set(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are trying to make this thread safe, i suspect this current.set(0) is a race condition. not sure if its an acceptable race condition or not.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I borrowed this from ShuffleGrouping, and even I'm not clear whether it makes race condition, I think it is acceptable since race condition scenarios don't incur out of index, just letting some threads selecting same index and maybe skip some indices.
Moreover this patch contains tests which addresses multi-thread safety.

Above assumption also makes updateRing() thread-safe.

We can still replace set with getAndSet to make it fully thread-safe, but we need to do more experiments to see how it affects performance if we would want. Same applies to ShuffleGrouping.

@roshannaik
Copy link
Contributor

The impact of loadAware that you show here seems inline with what I have seen. Encouraging to see these improvements.
Reviewed only the core chooseTasks() implementation and left one comment there.

}

shuffleArray(choices);
current = new AtomicInteger(0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically this should be -1 because we increment and get, but it doesn't hurt much.
(Same applies to ShuffleGrouping)

choices = prepareChoices;
prepareChoices = tempForSwap;

current.set(0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again logically this should be -1 because we increment and get and unlike in chooseTasks() we don't read the value in this method, but it doesn't hurt much anyway.

* Let chooseTask() read from index 0, not 1
@@ -20,5 +20,6 @@
import java.util.List;

public interface LoadAwareCustomStreamGrouping extends CustomStreamGrouping {
void refreshLoad(LoadMapping loadMapping);
List<Integer> chooseTasks(int taskId, List<Object> values, LoadMapping load);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to refresh the load out of band then lets delete the LoadMapping from the parameters passed into chooseTask.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I was just waiting for reviewing to see if we are OK with changing the way of providing LoadMapping. Looks like you're OK with pushing so I'll remove the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

@revans2
Copy link
Contributor

revans2 commented Aug 23, 2017

From a code perspective the changes look fine to me. It would be nice to possibly clean up the load aware grouping Interface some.

From a performance perspective it is a bit of a wash from what I am seeing.

I ran ThroughputVsLatency with 1 spout, 1 splitter, and 1 counter. I also did the same with 2 splitters, so there was a choice to be made.

From a cost perspective I saw that for the 1 splitter case everything appeared to be within the noise range. For the 2 splitter case I saw the new version taking up about 5% more CPU than the old version, so I would like to explore this a bit more.

Similarly for latency at the mean, 99%ile and 99.9%ile latency measurements. The original one was slightly better, around 5%, when there were 2 splitters.

I do want to explore these a bit more, because it seams counter the benchmarks that others have run.

@revans2
Copy link
Contributor

revans2 commented Aug 23, 2017

I spent the last few hours running more tests and I get the same results. I am not too concerned about it. The overhead appears to be rather low if any.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Aug 23, 2017

@revans2
Thanks for doing the performance test. I just ran some tests (rate 100000, 1 worker, max spout pending 1000) with TVL and got below result. Picked only 240 secs, but I also have raw numbers as well.

acker executor 1

kind uptime acked acked/sec 99% 99.99% min max mean stddev user sys gc mem
patch, trial 1 240 3,014,700 100,490.00 14,041,087 17,924,095 3,284,992 30,244,863 7,576,927.04 1,771,441.20 120,360 5,830 1,454 135.74
patch, trial 2 241 3,013,460 100,448.67 13,623,295 16,859,135 3,282,944 24,559,615 7,498,565.53 1,626,489.06 119,890 5,550 1,320 73.44
master, trial 1 240 3,013,700 100,456.67 13,533,183 17,563,647 3,311,616 25,821,183 7,470,969.09 1,618,231.68 118,480 5,720 1,236 97.66
master, trial 2 241 3,015,020 100,500.67 13,516,799 16,654,335 3,284,992 22,331,391 7,501,903.32 1,650,650.14 119,170 5,320 1,364 84.73

acker executor 0 (acker off)

kind uptime acked acked/sec 99% 99.99% min max mean stddev user sys gc mem
patch, trial 1 240 3,013,040 100,434.67 4,464,639 6,959,103 283 11,902,975 634,505.19 696,511.91 76,820 6,470 868 165.32
patch, trial 2 240 3,012,520 100,417.33 4,255,743 6,008,831 323 9,650,175 627,677.58 651,341.18 76,300 6,860 885 111.09
master, trial 1 241 3,013,420 100,447.33 2,060,287 4,468,735 356 7,733,247 565,479.02 417,299.37 73,570 7,070 399 237.66
master, trial 2 241 3,012,800 100,426.67 3,928,063 7,241,727 450 17,530,879 617,198.01 658,032.31 75,270 7,290 731 241.72

I've shown numbers from grouper performance test, and also ConstSpoutNullBoltTopo which clearly shows newer is (much) faster, so actually the result is confusing to me. Maybe better to put more efforts to standardize approach of performance tests. I'll try to find time to have a look at loadgen.

* remove LoadAware specific chooseTasks method
Copy link
Contributor

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 These changes look good to me

@asfgit asfgit merged commit 4dade36 into apache:master Sep 7, 2017
asfgit pushed a commit that referenced this pull request Sep 7, 2017
…o STORM-2678

STORM-2678: Improve performance of LoadAwareShuffleGrouping

This closes #2261
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants