Skip to content

Commit

Permalink
STORM-2874: Minor refactoring of some backpressure code
Browse files Browse the repository at this point in the history
  • Loading branch information
srdo committed Dec 29, 2017
1 parent 41d6cdc commit 0d9e7ed
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 31 deletions.
Expand Up @@ -227,7 +227,7 @@ public void start() throws Exception {
.setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));

WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler(topologyConf);
backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, backpressureCallback);
if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
backpressureThread.start();
stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, workerState::refreshThrottle);
Expand Down Expand Up @@ -408,7 +408,7 @@ private WorkerBackpressureCallback mkBackpressureHandler(Map<String, Object> top
final List<IRunningExecutor> executors = executorsAtom.get();
final long updateFreqMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS)) * 1000;
return new WorkerBackpressureCallback() {
@Override public void onEvent(Object obj) {
@Override public void onEvent() {
if (null != executors) {
String topologyId = workerState.topologyId;
String assignmentId = workerState.assignmentId;
Expand All @@ -418,8 +418,9 @@ private WorkerBackpressureCallback mkBackpressureHandler(Map<String, Object> top
long currTimestamp = System.currentTimeMillis();
long currBackpressureTimestamp = 0;
// the backpressure flag is true if at least one of the disruptor queues has throttle-on
boolean backpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream()
.map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
boolean backpressureFlag = workerState.transferQueue.getThrottleOn() ||
(executors.stream()
.anyMatch(IRunningExecutor::getBackPressureFlag));

if (backpressureFlag) {
// update the backpressure timestamp every updateFreqMs ms
Expand Down
Expand Up @@ -21,6 +21,6 @@

public interface WorkerBackpressureCallback {

void onEvent(Object obj);
void onEvent();

}
Expand Up @@ -21,17 +21,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerBackpressureThread extends Thread {
public final class WorkerBackpressureThread extends Thread {

private static final Logger LOG = LoggerFactory.getLogger(WorkerBackpressureThread.class);
private final Object trigger;
private final Object workerData;
private final WorkerBackpressureCallback callback;
private volatile boolean running = true;

public WorkerBackpressureThread(Object trigger, Object workerData, WorkerBackpressureCallback callback) {
public WorkerBackpressureThread(Object trigger, WorkerBackpressureCallback callback) {
this.trigger = trigger;
this.workerData = workerData;
this.callback = callback;
this.setName("WorkerBackpressureThread");
this.setDaemon(true);
Expand Down Expand Up @@ -60,21 +58,22 @@ public void run() {
synchronized (trigger) {
trigger.wait(100);
}
callback.onEvent(workerData); // check all executors and update zk backpressure throttle for the worker if needed
callback.onEvent(); // check all executors and update zk backpressure throttle for the worker if needed
} catch (InterruptedException interEx) {
// ignored, we are shutting down.
}
}
}
}

class BackpressureUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(BackpressureUncaughtExceptionHandler.class);
private static class BackpressureUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {

private static final Logger LOG = LoggerFactory.getLogger(BackpressureUncaughtExceptionHandler.class);

@Override
public void uncaughtException(Thread t, Throwable e) {
// note that exception that happens during connecting to ZK has been ignored in the callback implementation
LOG.error("Received error or exception in WorkerBackpressureThread.. terminating the worker...", e);
Runtime.getRuntime().exit(1);
@Override
public void uncaughtException(Thread t, Throwable e) {
// note that exception that happens during connecting to ZK has been ignored in the callback implementation
LOG.error("Received error or exception in WorkerBackpressureThread.. terminating the worker...", e);
Runtime.getRuntime().exit(1);
}
}
}
Expand Up @@ -18,33 +18,33 @@

package org.apache.storm.utils;

import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import junit.framework.TestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerBackpressureThreadTest extends TestCase {
public class WorkerBackpressureThreadTest {
private static final Logger LOG = LoggerFactory.getLogger(WorkerBackpressureThreadTest.class);

@Test
public void testNormalEvent() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Object trigger = new Object();
AtomicLong workerData = new AtomicLong(0);
WorkerBackpressureCallback callback = new WorkerBackpressureCallback() {
@Override
public void onEvent(Object obj) {
((AtomicLong) obj).getAndDecrement();
public void onEvent() {
latch.countDown();
}
};
WorkerBackpressureThread workerBackpressureThread = new WorkerBackpressureThread(trigger, workerData, callback);
WorkerBackpressureThread workerBackpressureThread = new WorkerBackpressureThread(trigger, callback);
workerBackpressureThread.start();
WorkerBackpressureThread.notifyBackpressureChecker(trigger);
long start = System.currentTimeMillis();
while (workerData.get() == 0) {
assertTrue("Timeout", (System.currentTimeMillis() - start) < 1000);
Thread.sleep(100);
}
//The callback should be called when the trigger is notified
assertThat(latch.await(1, TimeUnit.SECONDS), is(true));
}
}

0 comments on commit 0d9e7ed

Please sign in to comment.