-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-1151: Batching in DisruptorQueue #765
Conversation
@revans2 this looks great . I am in favor of batching at disruptor. So this means we don't have any issues with acking? |
I am just rebasing the code now so you can test that out yourself. This code has no issues with acking. But there are a few real issues. First the latency on low throughput queues is much higher. This is because it has to wait for the batch to time out. That timeout is set to 1 ms by default, so it is not that bad, but we should be able to do some on the fly adjustments in a follow on JIRA to dynamically adjust the batch size for each queue to compensate. Second the number of threads used is a lot more. 1 more per disruptor queue. I expect to reduce the total number of disruptor queues once I have optimized other parts of the code. As it stands right now I don't want to do that, because the two queues per bolt/spout design still improves performance in many cases. Third in the worst case situation it is possible to allocate many more objects than previously. It is not actually that many more, we already allocate a lot of objects, which needs to be looked at on a separate JIRA at some point. Also I don't want to shove this code in without doing a real comparison between the two approaches and the code. This is one way of doing batching, but there are others that may have advantages over this, or may compliment this approach as well. I just want storm to eventually be a lot closer to the 1 million sentences/second mark than it is now. |
5673c54
to
a99a047
Compare
Curious to see first performance results :) |
@mjsax The difference is not that huge yet. It is 2x in a number of use cases. But we have a lot of slowness in the metrics processing code, which, at least in the case of word count, is dominating the performance, which is probably why you were not seeing a 6x performance improvement in your pull like you were seeing before. I am going to be doing some follow on optimization work on that. The big thing right now is that the System CPU utilization went from 15-30% depending on the OS, etc. to 2-6%. This frees up a huge amount of CPU for other processing, so the overall throughput capability of a node increased significantly. |
(delete-node cluster-state path)) ;; delete the znode since the worker is not congested | ||
(if on? | ||
(set-ephemeral-node cluster-state path nil acls))))) ;; create the znode since worker is congested | ||
(when (not on?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there is one conditional statement to run, is there any special reason for replacing "if" with "when"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is mostly that I put in some debugging and didn't want an (if (do (log-message) ...)))
to fix an issue, and ended up not changing it back after removing the debug logging. If you want me to change it back and add back in the comments I can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is OK, never mind.
Really exciting work! It is very interesting that we move the overflow buffer inside the disruptor queue and use it for batch publishing purpose. I would also look forward to seeing how tuning the disruptor queue size, batch interval will impact the throughput of topologies with different tuple-sending/processing speeds. |
Ran some sanity tests. LGTM. +1 |
a99a047
to
0b628f2
Compare
I have some new test results. I did a comparison of several different branches. I looked at this branch, the upgraded-disruptor branch #750, STORM-855 #694, and apache-master 0.11.0-SNAPSHOT (04cf3f6). I had to make a few modifications to get my test to work. I applied the following patch https://gist.github.com/revans2/84301ef0fde0dc4fbe44 to each of the branches. For STORM-855 I had to modify the test a bit so it would optionally do batching. In that case batching was enabled on all streams and all spouts and bolts. I then ran the test at various throughputs 100, 200, 400, 800, 1600, 3200, 6400, 10000, 12800, 25600. and possibly a few others when looking for it to hit the maximum throughput, and different batch sizes. Each test ran for 5 mins. Here is the results of that test, excluding the tests where the worker could not keep up with the rate.
I then filtered the list to show the maximum throughput for a given latency (several different ones) 99th percentile:
99.9th percentile:
mean latency:
service latency ms (storm's complete latency):
I also looked at about the maximum throughput each branch could handle.
I really would like some feedback here, because these numbers seem to contradict STORM-855 using my original speed of light test. I don't really like that test, even though I wrote it, because the throughput is limited only by storm, so with acking disabled it is measuring what the latency is when we hit the wall, and cannot provide any more throughput. No one should run in production that way. When acking is enabled and we are using max-spout pending for flow control the throughput is directly related to the end to end latency. This too shouldn't be the common case in production because it means we cannot keep up with the incoming rate and are falling behind. This seems to indicate that the only time STORM-855 makes since is when looking at the 99%-ile latency at a very low throughput, and then it only seems to save 1/20th of a ms advantage over the others. In other cases it looks like the throughput per host it can support is about 1/2 of that without the change. This branch however has a weakness on the low end when batching is enabled it is about 12 ms slower, but on the high end it can handle more then 2x the throughput with little change to the latency. If that 12 ms is important I think we can mitigate it by allowing the batch size to self-adjust on a per queue bases. I really would like others to look at my numbers and my test to see if there are issues with it that I am missing, because like I said it seems to contradict the numbers from STORM-855. The only thing I can think of is that the messaging layer is the bottleneck in the speed of light test, which is what it was intended to stress test, and STORM-855 is giving a significant batching advantage there. If that is the case then we should look at what STORM-855 is doing around that to try and combine it with the batching we are doing here. @ptgoetz @d2r @rfarivar @mjsax @kishorvpatil @knusbaum please let me know what you think. |
Sorry I forgot to mention that for one test STORM-855-100 was able to run at a throughput of 6000 sentences/second successfully, but in other runs it failed, so I set the number to 5500, even though a 6000 is shown in the results. |
0b628f2
to
d3d8e82
Compare
I just rebased on the latest fixed disruptor upgrade code, now in pull request #797. The critical path did not change at all, so the numbers should be similar. |
I had a look at the numbers. It's a lot of stuff and hard to parse for me... I am not sure what you mean by "contradict the numbers"? Can you explain in more detail? However, I agree with your thought about STORM-855 #694 It basically reduces the contention on the output queue, because less calls are made here. I profiled Storm once and observed that on high-end data-rates (when we hit the wall) the contention on the output-queue is the bottleneck (the writing and reading thread have to lock the queue and a lot of waiting for the lock consumes a fair share of the consumed time). This waiting-for-the-lock time is reduced significantly by batching as STORM-855 does. |
What I saw when testing STORM-855 was that the maximum throughput was cut almost in half from 10,000 sentences per second to 5,500. But your numbers showed maximum throughput more than doubling from around 7,960,300 tuples sent in 30 seconds to 16,347,100 in the same time period (no-acking). And 1,832,160 in 30 seconds to 2,323,580 an increase of 25% with acking. To me this feels like a contradiction. The only thing I can think of is that the messaging layer is so scary slow that cutting the maximum throughput of a worker by half has no impact on the overall performance if it can double the throughput of the messaging layer, by doing more batching. This is likely the case, as on the high end 16,347,100 / 30 seconds / 24 workers is about 22,000 tuples per second per worker, where as 5,500 sentences per second results in about 181,500 total tuples per second/worker being processed. I'm just looking for feedback from others on this, but it looks like I need to do a distributed apples to apples comparison as well to see the impact the messaging layer has. |
Thanks for clarification. :) |
d3d8e82
to
4533347
Compare
So I have been running a number of tests trying to come to a conclusive decision on how storm should handle batching, and trying to understand the difference between my test results and the test results from #694. I ran the word count test I wrote as a part of #805 on a 35 node storm cluster. This was done against several different storm versions, the baseline in the #805 pull request; this patch + #805 (batch-v2); and #694 + #805 + modifications to use the hybrid approach to enable acking and batch to work in a multi-process topology (STORM-855). To avoid having all of the numbers be hard to parse I am just going to include some charts, but if anyone wants to see the raw numbers or reproduce it themselves I am happy to provide data and/or branches. The numbers below were collected after the topology had been running for at least 200 seconds. This is to avoid startup issues like JIT etc. I filtered out any 30 second interval where the measured throughput was not +/- 10% of the target throughput on the assumption that if the topology cannot keep up with the desired throughput or it was trying to catch up from previous slowness it would not be within that range. I did not filter based off of the number of failures that happened, simply because that would have resulted in removing all of the STORM-855 with batching enabled results. None of the other test configurations saw any failures at all during testing. This shows the 99%-ile latency vs measured throughput. It is not too interesting except to note that batching in STORM-855 at low throughput resulted in nothing being fully processed. All of the tuples timed out before they could finish. Only at a medium throughput above 16,000 sentences/second were we able to maintain enough tuples to complete batches regularly, but even then many tuples would still time out. This should be able to be fixed with a batch timeout, but that is not implemented yet. To get a better view I adjusted the latency to be a log scale. From this we can see that on the very low end batching-v2 is increasing the 99%-ile latency from 5-10 ms to 19-21 ms. Most of that you can get back by configuring the batch size to 1, instead of the default 100 tuples. However, once the baseline stops functioning at around 7000 sentences/sec the batching code is able to continue working, with either a batch size of 1 or 100. I believe that this has to do with the automatic backpressure. In the baseline code backpressure does not take into account the overflow buffer, but in the batching code it does. I think this gives the topology more stability in maintaining a throughput, but I don't have any solid evidence for that. I then zoomed in on the graphs to show what a 2 second SLA would look like and a 100 ms SLA. In both cases the batching v2 with a batch size of 100 was able to handle the highest throughput for that given latency. Then I wanted to look at memory and CPU Utilization. Memory does not show much, the amount of memory used varies a bit from one to the other, but if you realize this is for 35 worker processes it is varying from 70 MB/worker to about 200 MB/worker. The numbers simply show that as the throughput increases the memory utilizations does too, and it does not vary too much from one implementation to another. CPU however shows that on the low end we are going from 7 or 8 cores worth of CPU time to about 35 cores worth for the batching code. This seems to be the result of the batch flushing threads waking up periodically. We should be able to mitigate this by adjusting that interval to be larger, but that would in turn impact the latency. I believe that with further work we should be able to reduce that CPU utilization and the latency on the low end by dynamically adjusting the batch size and timeout based off of a specified SLA. At this point I feel this branch is ready for a formal review and inclusion into storm, the drawbacks to this patch do not seem to out weigh the clear advantages to it. Additionally with the stability problems associated with #694 I cannot feel good in recommending it at this time. It is clear that some of what it is doing is worthwhile, and I think we should explore the alternative batch serialization between worker processes as a potential stand alone piece. @d2r @knusbaum @kishorvpatil @harshach @HeartSaVioR @ptgoetz If you could please review this remembering that it is based off of #797, I would like to try and get this in soon. |
Great analysis! Btw: I am still wondering, you #694 behaves so badly with low throughput. From a conceptual point of view there is no reason for this. Maybe my code is not good enough (as I am a Clojure newbe). Hope I can reproduce your result and get better insight into the problem. |
@mjsax If you want to take some time to debug things for a better comparison I am happy to wait. It is entirely possible that I somehow messed things up when I was enabling hybrid-mode so getting and apples to apples comparison is important. I want the best final solution no matter what it is, but I also don't want to wait forever. For efficiency and cluster utilization purposes I really would like to pull in batching into the clusters that I run, but because the configuration/interfaces are incompatible with one another between the two approaches I don't want to do it until open source has decided on a direction they want to go in. In the mean time I am going to be working on improving the metrics gathering. You said previously that with your first batching attempt you saw up to 6x the throughput. I suspect a lot of that is because the metrics are so very very slow. |
I am not sure how the metric relate to it... Furthermore, I also used Aeolus on the current master and get 6x improvement (with acking disabled -- acking is currently not supported by Aeolus). |
@mjsax At least in my profiling the next largest consumer of CPU in word count, without acking enabled, after adding in batching is the metrics. They are horribly slow. I assumed that because Aeolus was only counting batches, not individual tuples that it mitigated a lot of this slowness. |
Make sense. It might be worth explore counting in batches... (ie, increase a counter by "batch-size" for each batch). So we still get the correct value. Of course the metric will not increase slowly over time but "jump" in larger intervals. Not sure if this i acceptable behavior and/or what other metrics are there that cannot be updated in a "batch" fashion... count might be a too simple example. |
…icReference so debugging checks can be simpler.
4533347
to
c415d32
Compare
Just rebased. |
ran few topologies with acking and without. I am +1 on merging this in. |
I wanted to come up with an alternative to #694 that does batching at the queue level instead of at that spout level. This is the result of that work, although it still needs some testing/cleanup before I consider it fully ready. This is based off of #750 using the latest version of disruptor, and has a few bug fixes for automatic back-pressure that I need to split out and include as a separate pull request.
The work here is driven by results I got from some micro benchmarks I wrote looking at what the bottlenecks are in storm and what the theoretical limit is for storm. In this case doing a simple word count topology as represented by Q_SWC_X_04. With the same batching here the throughput when from 300,000 sentences/second processed to around 1.5 million, as run on my Mac Book Pro Laptop.
https://github.com/revans2/storm-micro-perf
The reason for queue level batching is that it gives a finer grained control over latency, which I would like to use for automatic tuning in the future. It also is a smaller code change. If others disagree I am okay with #694. But I do want to do a head to head comparison between the two approaches.