From 0d9e7ed34afc8512d3fd94eff8a01f9189ea00f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Fri, 29 Dec 2017 22:21:46 +0100 Subject: [PATCH] STORM-2874: Minor refactoring of some backpressure code --- .../apache/storm/daemon/worker/Worker.java | 9 ++++--- .../utils/WorkerBackpressureCallback.java | 2 +- .../storm/utils/WorkerBackpressureThread.java | 25 +++++++++--------- .../utils/WorkerBackpressureThreadTest.java | 26 +++++++++---------- 4 files changed, 31 insertions(+), 31 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java index 519e7ceab4d..f7b78cb31d7 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java @@ -227,7 +227,7 @@ public void start() throws Exception { .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK))); WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler(topologyConf); - backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback); + backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, backpressureCallback); if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) { backpressureThread.start(); stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, workerState::refreshThrottle); @@ -408,7 +408,7 @@ private WorkerBackpressureCallback mkBackpressureHandler(Map top final List executors = executorsAtom.get(); final long updateFreqMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS)) * 1000; return new WorkerBackpressureCallback() { - @Override public void onEvent(Object obj) { + @Override public void onEvent() { if (null != executors) { String topologyId = workerState.topologyId; String assignmentId = workerState.assignmentId; @@ -418,8 +418,9 @@ private WorkerBackpressureCallback mkBackpressureHandler(Map top long currTimestamp = System.currentTimeMillis(); long currBackpressureTimestamp = 0; // the backpressure flag is true if at least one of the disruptor queues has throttle-on - boolean backpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream() - .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get()); + boolean backpressureFlag = workerState.transferQueue.getThrottleOn() || + (executors.stream() + .anyMatch(IRunningExecutor::getBackPressureFlag)); if (backpressureFlag) { // update the backpressure timestamp every updateFreqMs ms diff --git a/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java b/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java index 47c039aebcc..b03bcf05def 100755 --- a/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java +++ b/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java @@ -21,6 +21,6 @@ public interface WorkerBackpressureCallback { - void onEvent(Object obj); + void onEvent(); } diff --git a/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java b/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java index 832448e436f..66b22c54bc3 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java +++ b/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java @@ -21,17 +21,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class WorkerBackpressureThread extends Thread { +public final class WorkerBackpressureThread extends Thread { private static final Logger LOG = LoggerFactory.getLogger(WorkerBackpressureThread.class); private final Object trigger; - private final Object workerData; private final WorkerBackpressureCallback callback; private volatile boolean running = true; - public WorkerBackpressureThread(Object trigger, Object workerData, WorkerBackpressureCallback callback) { + public WorkerBackpressureThread(Object trigger, WorkerBackpressureCallback callback) { this.trigger = trigger; - this.workerData = workerData; this.callback = callback; this.setName("WorkerBackpressureThread"); this.setDaemon(true); @@ -60,21 +58,22 @@ public void run() { synchronized (trigger) { trigger.wait(100); } - callback.onEvent(workerData); // check all executors and update zk backpressure throttle for the worker if needed + callback.onEvent(); // check all executors and update zk backpressure throttle for the worker if needed } catch (InterruptedException interEx) { // ignored, we are shutting down. } } } -} -class BackpressureUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { - private static final Logger LOG = LoggerFactory.getLogger(BackpressureUncaughtExceptionHandler.class); + private static class BackpressureUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + + private static final Logger LOG = LoggerFactory.getLogger(BackpressureUncaughtExceptionHandler.class); - @Override - public void uncaughtException(Thread t, Throwable e) { - // note that exception that happens during connecting to ZK has been ignored in the callback implementation - LOG.error("Received error or exception in WorkerBackpressureThread.. terminating the worker...", e); - Runtime.getRuntime().exit(1); + @Override + public void uncaughtException(Thread t, Throwable e) { + // note that exception that happens during connecting to ZK has been ignored in the callback implementation + LOG.error("Received error or exception in WorkerBackpressureThread.. terminating the worker...", e); + Runtime.getRuntime().exit(1); + } } } diff --git a/storm-client/test/jvm/org/apache/storm/utils/WorkerBackpressureThreadTest.java b/storm-client/test/jvm/org/apache/storm/utils/WorkerBackpressureThreadTest.java index b8e1770cfb2..6b3e3d10691 100644 --- a/storm-client/test/jvm/org/apache/storm/utils/WorkerBackpressureThreadTest.java +++ b/storm-client/test/jvm/org/apache/storm/utils/WorkerBackpressureThreadTest.java @@ -18,33 +18,33 @@ package org.apache.storm.utils; -import java.util.concurrent.atomic.AtomicLong; -import org.junit.Assert; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.Test; -import junit.framework.TestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class WorkerBackpressureThreadTest extends TestCase { +public class WorkerBackpressureThreadTest { private static final Logger LOG = LoggerFactory.getLogger(WorkerBackpressureThreadTest.class); @Test public void testNormalEvent() throws Exception { + CountDownLatch latch = new CountDownLatch(1); Object trigger = new Object(); - AtomicLong workerData = new AtomicLong(0); WorkerBackpressureCallback callback = new WorkerBackpressureCallback() { @Override - public void onEvent(Object obj) { - ((AtomicLong) obj).getAndDecrement(); + public void onEvent() { + latch.countDown(); } }; - WorkerBackpressureThread workerBackpressureThread = new WorkerBackpressureThread(trigger, workerData, callback); + WorkerBackpressureThread workerBackpressureThread = new WorkerBackpressureThread(trigger, callback); workerBackpressureThread.start(); WorkerBackpressureThread.notifyBackpressureChecker(trigger); - long start = System.currentTimeMillis(); - while (workerData.get() == 0) { - assertTrue("Timeout", (System.currentTimeMillis() - start) < 1000); - Thread.sleep(100); - } + //The callback should be called when the trigger is notified + assertThat(latch.await(1, TimeUnit.SECONDS), is(true)); } }