[STORM-886] Automatic Back Pressure (ABP) #700

Merged
merged 3 commits into from Sep 18, 2015

Projects

None yet

9 participants

@zhuoliu
Contributor
zhuoliu commented Aug 26, 2015

This new feature is aimed for automatic flow control through the topology DAG since different components may have unmatched tuple processing speed. Currently, the tuples may get dropped if the downstream components can not process as quickly, thereby causing a waste of network bandwidth and processing capability. In addition, it is difficult to tune the max.spout.pending parameter for best backpressure performance. Another big motivation is that using max.spout.pending for flow control forces users to enable acking, which does not make sense for the scenarios where acking is not needed and flow control is needed (e.g., the at-most-once cases). Therefore, an automatic back pressure scheme is highly desirable.

In this design, spouts throttle not only when max.spout.pending is hit, but also when any bolt has gone over a high water mark in their receive queue, and has not yet gone below a low water mark again. There is a lot of room for potential improvement here around control theory and having spouts only respond to downstream bolts backing up, but a simple bang-bang controller like this is a great start.

Our ABP scheme implements a light-weight yet efficient back pressure scheme. It monitors certain queues in executors and worker and exploits the callback schemes on ZooKeeper and disruptor queue for a fast-responding (in a push manner) flow control.

Please check the attached figures for more details about the implementation.
https://issues.apache.org/jira/secure/attachment/12761186/aSimpleExampleOfBackpressure.png
image

@revans2 revans2 commented on an outdated diff Aug 26, 2015
conf/defaults.yaml
@@ -140,6 +140,14 @@ task.heartbeat.frequency.secs: 3
task.refresh.poll.secs: 10
task.credentials.poll.secs: 30
+# now should be null by default
+topology.backpressure.enable: true
+backpressure.worker.high.watermark: 0.9
+backpressure.worker.low.watermark: 0.4
+backpressure.executor.high.watermark: 0.9
+backpressure.executor.low.watermark: 0.4
+backpressure.spout.suspend.time.ms: 100
@revans2
revans2 Aug 26, 2015 Contributor

This seems way too high. It is not that expensive to poll at a 1ms interval in the spout waiting for back-pressure to be turned off.

@revans2 revans2 commented on an outdated diff Aug 26, 2015
storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -602,7 +607,15 @@
(log-message "Activating spout " component-id ":" (keys task-datas))
(fast-list-iter [^ISpout spout spouts] (.activate spout)))
- (fast-list-iter [^ISpout spout spouts] (.nextTuple spout)))
+ (if (and
+ ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
+ @(:throttle-on (:worker executor-data))
+ suspend-time
+ (not= suspend-time 0))
+ (do
@revans2
revans2 Aug 26, 2015 Contributor

Can we move this conditional up to be part of the overflow-buffer/max-spout-pending check? It feels like it fits better there, and that we don't want to output anything when back-pressure is on, instead of just slowing down how quickly we output.

Also instead of logging something, can we look at having a metric instead that shows how may times we called paused for a given reason. Logging is not going to really give us the picture we want and is going to be difficult to follow.

@revans2 revans2 commented on an outdated diff Aug 26, 2015
storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -671,7 +689,19 @@
user-context (:user-context task-data)
sampler? (sampler)
execute-sampler? (execute-sampler)
- now (if (or sampler? execute-sampler?) (System/currentTimeMillis))]
+ now (if (or sampler? execute-sampler?) (System/currentTimeMillis))
+ receive-queue (:receive-queue executor-data)]
+ (if (and ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
+ (> (.population receive-queue) high-watermark)
+ (not @(:backpressure executor-data)))
+ (do (reset! (:backpressure executor-data) true)
+ (log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true")
+ (disruptor/notify-backpressure-checker (:backpressure-trigger (:worker executor-data)))))
+ (if (and ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
+ (< (.population receive-queue) low-watermark)
+ @(:backpressure executor-data))
+ (do (reset! (:backpressure executor-data) false)
+ (disruptor/notify-backpressure-checker (:backpressure-trigger (:worker executor-data)))))
@revans2
revans2 Aug 26, 2015 Contributor

I am a bit confused. Why are we doing this as part of the metrics tick? What does that have to do with back pressure at all?

@revans2 revans2 commented on an outdated diff Aug 26, 2015
storm-core/src/clj/backtype/storm/daemon/worker.clj
(defn mk-transfer-fn [worker]
(let [local-tasks (-> worker :task-ids set)
local-transfer (:transfer-local-fn worker)
- ^DisruptorQueue transfer-queue (:transfer-queue worker)
+ transfer-queue (:transfer-queue worker)
@revans2
revans2 Aug 26, 2015 Contributor

Why did we drop the type hint?

@revans2 revans2 commented on an outdated diff Aug 26, 2015
storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -114,12 +114,34 @@
(fast-list-iter [[task tuple :as pair] tuple-batch]
(.serialize serializer tuple)))
+(defn- mk-backpressure-handler [executors]
+ "make a handler that checks and updates worker's backpressure flag"
+ (disruptor/backpressure-handler
+ (fn [worker]
+ (let [storm-id (:storm-id worker)
+ assignment-id (:assignment-id worker)
+ port (:port worker)
+ storm-cluster-state (:storm-cluster-state worker)]
+ (if executors
+ (if (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))
+ (reset! (:backpressure worker) true) ;; at least one executor has set backpressure
+ (reset! (:backpressure worker) false))) ;; no executor has backpressure set
+ ;; update the worker's backpressure flag to zookeeper here
+ (.worker-backpressure! storm-cluster-state storm-id assignment-id port @(:backpressure worker))
@revans2
revans2 Aug 26, 2015 Contributor

I think we have an opportunity here to not put so much load on ZK. We should be able to keep cached if we think back-pressure is on or not for this worker, and only update if we think it is changing, instead of reading the state each time and updating it, if it is different. This is being called at least ever 100 ms, which is going to put more load on ZK then I like.

@revans2 revans2 commented on an outdated diff Aug 26, 2015
storm-core/src/clj/backtype/storm/daemon/worker.clj
- (.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-changed)))
- (schedule-recurring (:refresh-credentials-timer worker) 0 (conf TASK-CREDENTIALS-POLL-SECS) check-credentials-changed)
+ check-credentials-throttle-changed (fn []
+ (let [callback (fn cb [& ignored]
+ (let [throttle-on (.topology-backpressure (:storm-cluster-state worker) storm-id cb)]
+ (reset! (:throttle-on worker) throttle-on)))
+ new-throttle-on (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+ (.topology-backpressure (:storm-cluster-state worker) storm-id callback) nil)
+ new-creds (.credentials (:storm-cluster-state worker) storm-id nil)]
+ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
+ (reset! (:throttle-on worker) new-throttle-on))
+ (when-not (= new-creds @credentials) ;;This does not have to be atomic, worst case we update when one is not needed
+ (AuthUtils/updateSubject subject auto-creds new-creds)
+ (dofor [e @executors] (.credentials-changed e new-creds))
+ (reset! credentials new-creds))))]
+ (.credentials (:storm-cluster-state worker) storm-id (fn [args] (check-credentials-throttle-changed)))
@revans2
revans2 Aug 26, 2015 Contributor

I think I would prefer to have two functions instead of just one. Both can be scheduled on the same timer, but keeping them separate seems cleaner.

@revans2
Contributor
revans2 commented Aug 26, 2015

This looks very interesting. I would really like to see some unit tests, especially around the disruptor queue to show that the callback is working. Perhaps we can also handle the corner cases for callback in DisruptorQueue itself instead of having special case code in other locations as a backup.

@vesense
Member
vesense commented Aug 27, 2015

+1 Looks very exciting.

@arunmahadevan
Contributor

It appears that the spouts across all the workers are throttled if an executor queue fills up. For instance if there are multiple spouts, even the ones not causing the executor recv queue to fill would be throttled. Is this correct and if so desirable?

Wont it be ideal to trickle the back pressure up via the immediately preceding component(s) than slowing down the entire topology ?

@revans2
Contributor
revans2 commented Aug 27, 2015

@arunmahadevan It is the intention to throttle the entire topology, all spouts. This is what Heron does and is intended to be a last resort, which is better then nothing, but not a truly final solution. The reason for this is that from the level of a single queue it is very difficult to know what is causing that congestion. STORM-907 is intended as a follow on that will analyze the topology, determine where there are loops and provide more of true backpressure. But in the case of a loop, and storm does support loops, you have no way to determine which upstream part is causing slowness. And in fact it could be the bolt itself, and it needs to increase it's parallelism.

@knusbaum knusbaum commented on an outdated diff Aug 27, 2015
storm-core/src/clj/backtype/storm/daemon/executor.clj
(disruptor/consumer-started! receive-queue)
(fn []
+ ;; this additional check is necessary because rec-q can be 0 while the executor backpressure flag is forever set
+ (if (and ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE) (< (.population receive-queue) low-watermark) @(:backpressure executor-data))
@knusbaum
knusbaum Aug 27, 2015 Contributor

Can we move the and form's second and third conditions to new lines?

@knusbaum knusbaum commented on an outdated diff Aug 27, 2015
storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -137,9 +159,14 @@
(.add remote (TaskMessage. task (.serialize serializer tuple)))
(log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
))))
- (local-transfer local)
- (disruptor/publish transfer-queue remoteMap)
- ))]
+ ;; each executor itself will do the self setting for the worker's backpressure tag
+ ;; however, when the backpressure is set, the worker still need to check whether all the executors' flag has cleared to unset worker's backpressure
+ (if (and ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) (> (.population transfer-queue) high-watermark))
@knusbaum
knusbaum Aug 27, 2015 Contributor

Here too.

@zhuoliu
Contributor
zhuoliu commented Aug 27, 2015

Addressed comments:
a. Removed the config of suspend time, reuse the empty emit and put the judgement together with max.spout.pending
b. Separate functions for credentials and throttle checks in schedule-recurring thread
c. Update worker flag to ZK only when it has changed (for reducing ZK load)
d. Other comments like type hint and And.

To continue work on the rest three comments:
a. Add metrics for executor's suspending times for throttle-on, inactive, spout.spending etc. Done!
b. Concern on the possible corner cases since I put the backpressure checks in tuple-action-fn. Done!
c. Unit tests. Done!
d. Performance tests (e.g., overhead on ZooKeeper nodes)

@redsanket redsanket and 1 other commented on an outdated diff Sep 1, 2015
storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -114,12 +114,38 @@
(fast-list-iter [[task tuple :as pair] tuple-batch]
(.serialize serializer tuple)))
+(defn- mk-backpressure-handler [executors]
+ "make a handler that checks and updates worker's backpressure flag"
+ (disruptor/backpressure-handler
+ (fn [worker]
+ (let [storm-id (:storm-id worker)
+ assignment-id (:assignment-id worker)
+ port (:port worker)
+ storm-cluster-state (:storm-cluster-state worker)
+ prev-backpressure-flag @(:backpressure worker)]
+ (doseq [ed executors] ;; Debug, TODO: delete
+ (log-message "zliu executor" (.get-executor-id ed) " flag is " (.get-backpressure-flag ed)))
@redsanket
redsanket Sep 1, 2015

zliu in log message

@zhuoliu
zhuoliu Sep 1, 2015 Contributor

Thanks for the reminder, I will remove all those debug messages soon after the unit test commit.

@zhuoliu
Contributor
zhuoliu commented Sep 3, 2015

Hi, Bobby @revans2 , all comments and concerns have been addressed. Ready for another round of review. Thanks!

@zhuoliu
Contributor
zhuoliu commented Sep 11, 2015

Initial tests on 5 node openstack cluster (1 zk, 1 nimbus, 3 supervisors).
Results demonstrate that ABP are doing well for both non-congested and congested topologies.

I. Test a normal topology (no congestion happen)
WordCountTopology3 (3 times more workers and executors than original WordCount).

Without topology running, the zookeeper node's received packets is 5 pack/second;
without BP, the zk load is 20.1 pack/second;
with BP, the zk load is 20.9 pack/second.
This makes sense: if a topology is running fluently (no congestion happens), the ZK will almost never be accessed by Backpressure procedures. "almost" is because we have added an additional topo-backpressure recurring checking with credential thread (just for dealing with the very rare case that ZK callback fails to proceed), which reads from ZK every 30 seconds.
So 12 workers cause 12/30 = 0.4 pack / second overheads to ZK.

This shows that Backpressure will have minimal additional overheads to zookeeper for any non-congested topologies.

@zhuoliu
Contributor
zhuoliu commented Sep 11, 2015

II. Test with a congested topology.
WordCountTopology2 (3 times more workers and executors than original WordCount; in every minute, for the wordcount bolt: in the first 30 second, there is a 50 ms sleep before each tuple emits.
https://github.com/zhuoliu/storm/blob/888/examples/storm-starter/src/jvm/storm/starter/WordCountTopology2.java

Interestingly, without ABP, this 12 worker (87 executors) topology can not run successfully in Storm since workers frequently crash because overflowing of tuples in the bolt executors.
And the ZK receive workload is 20.6 to 21 pack/second.

On the contrast, with ABP enabled, this topology runs pretty well.
And we see about 22.5 pack/sec receive workloads on Zookeeper nodes.

This test shows the great advantage of Backpressure when dealing with topologies that may have congested or slow components. Since (1) ABP makes sure this topology can run successfully; (2) ABP causes small overheads to Zookeeper.
I will also try to run huge topologies in a big cluster later on when our cluster is free.

@revans2 @d2r @knusbaum

@revans2
Contributor
revans2 commented Sep 18, 2015

@zhuoliu looks like you missed adding in a file, We also need to upmerge.

@revans2 revans2 commented on an outdated diff Sep 18, 2015
storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -602,6 +640,7 @@
(log-message "Activating spout " component-id ":" (keys task-datas))
(fast-list-iter [^ISpout spout spouts] (.activate spout)))
+ ;; (log-message "Spout executor " (:executor-id executor-data) " found throttle-on, now suspends sending tuples")
@revans2
revans2 Sep 18, 2015 Contributor

Can we remove this line? it looks like it is no longer needed.

@revans2 revans2 commented on an outdated diff Sep 18, 2015
storm-core/src/jvm/backtype/storm/Config.java
+ * This signifies the tuple congestion in an executor's receiving queue.
+ * When the used ratio of an executor's receiving queue is higher than the high watermark,
+ * the backpressure scheme, if enabled, should slow down the tuple sending speed of
+ * the spouts until reaching the low watermark.
+ */
+ public static final String BACKPRESSURE_EXECUTOR_HIGH_WATERMARK="backpressure.executor.high.watermark";
+ public static final Object BACKPRESSURE_EXECUTOR_HIGH_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
+
+ /**
+ * This signifies a state that an executor has left the congestion.
+ * If the used ratio of an execuotr's receive queue is lower than the low watermark,
+ * it may notify the worker to check whether all its executors have also left congestion,
+ * if yes, the worker's backpressure flag will be unset on the Zookeeper
+ */
+ public static final String BACKPRESSURE_EXECUTOR_LOW_WATERMARK="backpressure.executor.low.watermark";
+ public static final Object BACKPRESSURE_EXECUTOR_LOW_WATERMARK_SCHEMA =ConfigValidation.PositiveNumberValidator;
@revans2
revans2 Sep 18, 2015 Contributor

Looking through the code it looks like these two configs are never used.

BACKPRESSURE_EXECUTOR_HIGH_WATERMARK and BACKPRESSURE_EXECUTOR_LOW_WATERMARK. I don't see a reason to have different configs for different queues. Perhaps we can just remove these configs. and make the description for the other configs more generic.

@revans2 revans2 commented on an outdated diff Sep 18, 2015
...core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -138,7 +150,21 @@ private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
//TODO: only set this if the consumer cursor has changed?
_consumer.set(cursor);
}
-
+
+ public void registerBackpressureCallback(DisruptorBackpressureCallback cb) {
+ this._cb = cb;
+ }
+
+ static public void notifyBackpressureChecker(Object trigger) {
@revans2
revans2 Sep 18, 2015 Contributor

I don't think this code really belongs here. It would make more since to move it to the WorkerBackpressureThread itself.

@revans2 revans2 commented on an outdated diff Sep 18, 2015
...core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -35,6 +35,7 @@
import java.util.HashMap;
import java.util.Map;
import backtype.storm.metric.api.IStatefulObject;
+import org.jgrapht.graph.DirectedSubgraph;
@revans2
revans2 Sep 18, 2015 Contributor

Where did this come from?

@revans2
Contributor
revans2 commented Sep 18, 2015

For the most part things look really good. I would also love to see something added to Examples. Like a Word Count that just goes as fast as it can. That way we can see before, with acking disabled it crashes, after with acking disabled it stays up and can run really well. I can help with code for that if you want.

zhuoliu added some commits Aug 18, 2015
@zhuoliu zhuoliu Initial commit for YSTORM-1949 automatic backpressure
Temp version that can do both throttle and back-to-normal, WordCountTopology changed, may revert it later

re-enable the recurring of throttle check to zk in worker

Use ephemeral-node for worker backpressure

Make backpressure configurable

use disruptor/notify-backpressure-checker

Remove debug message for daemons

Address Bobby and Kyle's comments, to continue for the other three

Now we can obtain the latest q size during a batch, addressed the big concern during q check

Add metrics for backpressure and emptyEmit stats

Put disruptor queue checking inside disruptor queue as a new callback

Add unit test for disruptor queue backpressure and callback checks

Remove debug messages

now seq-id is not needed in tuple-action-fn
f64121b
@zhuoliu zhuoliu Add missing file cf50518
@zhuoliu zhuoliu Address the comments on config, notifyBPC and upmerge, etc. 5374036
@zhuoliu
Contributor
zhuoliu commented Sep 18, 2015

Thanks, Bobby @revans2 . I addressed all the comments.
Actually I have such examples already written and tested. See:
https://github.com/zhuoliu/storm/blob/888/examples/storm-starter/src/jvm/storm/starter/WordCountTopology2.java
This test will crash without BackpressureEnabled, and runs smoothly with our ABP.
I can check them in to master if needed.

@revans2
Contributor
revans2 commented Sep 18, 2015

@zhuoliu I looked over the code, and I ran some tests with a word count program that does not sleep, and there are not enough split sentence bolts. This kept the workers from crashing, which I verified happened without it. It was able to keep a throughput similar to setting a decent value for max spout pending, but with a larger latency.

I am +1 for merging this in. Great work.

@asfgit asfgit merged commit 5374036 into apache:master Sep 18, 2015

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
@rsltrifork

Does this solve the problem of tuple timeout, when a bolt is completely stalled waiting for an external component to get back up?
I believe waiting for too long in a bolt triggers the tuple timeout, which causes the Spout to reemit, which is usually not what we want, when a bolt is waiting in a controlled manner before it can resume computation.

Ideally, the tuple timeout should be used as a last resort to detect that internal storm components don't respond. And back pressure should ensure that the Spout doesn't reemit to temporarily busy/blocked bolts - regardless of timeout.

@revans2
Contributor
revans2 commented Oct 1, 2015

@rsltrifork, No this does not solve that issue. The timeout is still a hard coded value. The backpressure just means that the spout will not be outputting new values. Old tuples can still time out and be replayed. The problem here is the halting problem. How do you know that the bolt is waiting in a controlled manner? Even if you do know that how do you know that if it is waiting in a controlled manner that it has not lost track of a tuple? You have to be able to predict the future to truly solve this problem. A hard coded timeout is a simple solution. There have been a few other proposals to adjust the timeout dynamically, but that all have potentially serious limitations compared to a static timeout.

@rsltrifork

Thanks for the quick reply, revans2.

How do you know that the bolt is waiting in a controlled manner?

A Bolt sending to an external system can include a circuit breaker, so it can keep waiting while the CB is tripped, while regularly retrying sends to check if the recipient is up.

While doing this, the Bolt could also tell Storm that processing of the current tuple is currently blocked, and that Storm should reset the tuple timeout for it (and subsequent inflight tuples):
collector.resetTupleTimeout(tuple);

Even if you do know that how do you know that if it is waiting in a controlled manner that it has not lost track of a tuple?

The bolt is continuously saying that it still has the tuple.

@revans2
Contributor
revans2 commented Oct 9, 2015

@rsltrifork That does sound like a very interesting approach. I would love to see some numbers on how it would perform.

@acommuni

In case external system is down, it could be interesting to pause the Spout for an amount of time. The state "OPEN" of the CB could be directly linked to the spout. I don't know if the implementation of the back pressure is manageable through public API. But it would be a nice enhancement to be able to implement Circuit Breaker algorithm with bolt and spout.

Error count and timeout generated by request to external system can be detected by the bolt using those external component and could be propagated to the spout. In case the amount of consecutive error reaches a defined value the spout could be paused by the bolt for an amout of time (CLOSE to OPEN state). After sleeping the spout is considered in HALF OPEN state. If new error occurs spout sleeps for another amount of time else it goes to CLOSE state and continue to read new tuples.

Being able to use Circuit breaker framework like Hystrix could be a nice enhancement of the back pressure feature.

https://github.com/Netflix/Hystrix
https://github.com/Netflix/Hystrix/wiki/How-it-Works

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment