Skip to content

Commit

Permalink
Support customizing BlockedThreadChecker
Browse files Browse the repository at this point in the history
Signed-off-by: Dan O'Reilly <oreilldf@gmail.com>
  • Loading branch information
dano committed Feb 24, 2022
1 parent 56fed16 commit 13e9571
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 16 deletions.
1 change: 1 addition & 0 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.vertx.core.eventbus.impl.EventBusInternal;
import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus;
import io.vertx.core.file.FileSystem;
import io.vertx.core.impl.btc.BlockedThreadChecker;
import io.vertx.core.spi.file.FileResolver;
import io.vertx.core.file.impl.FileSystemImpl;
import io.vertx.core.file.impl.WindowsFileSystem;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/vertx/core/impl/VertxInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.impl.HttpServerImpl;
import io.vertx.core.impl.btc.BlockedThreadChecker;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/vertx/core/impl/VertxThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package io.vertx.core.impl;

import io.netty.util.concurrent.FastThreadLocalThread;
import io.vertx.core.impl.btc.BlockedThreadChecker;

import java.util.concurrent.TimeUnit;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
* Copyright (c) 2011-2022 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
Expand All @@ -9,8 +9,9 @@
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/

package io.vertx.core.impl;
package io.vertx.core.impl.btc;

import io.vertx.core.Handler;
import io.vertx.core.VertxException;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
Expand Down Expand Up @@ -40,8 +41,11 @@ public interface Task {
private final Map<Thread, Task> threads = new WeakHashMap<>();
private final Timer timer; // Need to use our own timer - can't use event loop for this

BlockedThreadChecker(long interval, TimeUnit intervalUnit, long warningExceptionTime, TimeUnit warningExceptionTimeUnit) {
private Handler<BlockedThreadEvent> blockedThreadHandler;

public BlockedThreadChecker(long interval, TimeUnit intervalUnit, long warningExceptionTime, TimeUnit warningExceptionTimeUnit) {
timer = new Timer("vertx-blocked-thread-checker", true);
blockedThreadHandler = BlockedThreadChecker::defaultBlockedThreadHandler;
timer.schedule(new TimerTask() {
@Override
public void run() {
Expand All @@ -52,24 +56,29 @@ public void run() {
long dur = now - execStart;
final long timeLimit = entry.getValue().maxExecTime();
TimeUnit maxExecTimeUnit = entry.getValue().maxExecTimeUnit();
long val = maxExecTimeUnit.convert(dur, TimeUnit.NANOSECONDS);
if (execStart != 0 && val >= timeLimit) {
final String message = "Thread " + entry.getKey() + " has been blocked for " + (dur / 1_000_000) + " ms, time limit is " + TimeUnit.MILLISECONDS.convert(timeLimit, maxExecTimeUnit) + " ms";
if (warningExceptionTimeUnit.convert(dur, TimeUnit.NANOSECONDS) <= warningExceptionTime) {
log.warn(message);
} else {
VertxException stackTrace = new VertxException("Thread blocked");
stackTrace.setStackTrace(entry.getKey().getStackTrace());
log.warn(message, stackTrace);
}
long maxExecTimeInNanos = TimeUnit.NANOSECONDS.convert(timeLimit, maxExecTimeUnit);
long warningExceptionTimeInNanos = TimeUnit.NANOSECONDS.convert(warningExceptionTime, warningExceptionTimeUnit);
BlockedThreadEvent bts = new BlockedThreadEvent(entry.getKey(), dur, maxExecTimeInNanos, warningExceptionTimeInNanos);
if (execStart != 0 && dur >= maxExecTimeInNanos) {
blockedThreadHandler.handle(bts);
}
}
}
}
}, intervalUnit.toMillis(interval), intervalUnit.toMillis(interval));
}

synchronized void registerThread(Thread thread, Task checked) {
/**
* Specify the handler to run when it is determined a thread has been blocked for longer than allowed.
* Note that the handler will be called on the blocked thread checker thread, not an event loop thread.
*
* @param handler The handler to run
*/
public synchronized void setThreadBlockedHandler(Handler<BlockedThreadEvent> handler) {
this.blockedThreadHandler = handler;
}

public synchronized void registerThread(Thread thread, BlockedThreadChecker.Task checked) {
threads.put(thread, checked);
}

Expand All @@ -82,4 +91,15 @@ public void close() {
}
}

private static void defaultBlockedThreadHandler(BlockedThreadEvent bte) {
final String message = "Thread " + bte.thread() + " has been blocked for " + (bte.duration() / 1_000_000) + " ms, time limit is " + (bte.maxExecTime() / 1_000_000) + " ms";
if (bte.duration() <= bte.warningExceptionTime()) {
log.warn(message);
} else {
VertxException stackTrace = new VertxException("Thread blocked");
stackTrace.setStackTrace(bte.thread().getStackTrace());
log.warn(message, stackTrace);
}
}

}
55 changes: 55 additions & 0 deletions src/main/java/io/vertx/core/impl/btc/BlockedThreadEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2011-2022 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/

package io.vertx.core.impl.btc;

/**
* A class containing status details about how long a particular thread has been blocked,
* and how long it is allowed to be blocked before warnings start getting logged. All times
* and durations are in nanoseconds.
*/
public class BlockedThreadEvent {
private final Thread thread;
private final long maxExecTime;
private final long duration;
private final long warningExceptionTime;

/**
* Create an instance of BlockedThreadEvent
*
* @param thread The thread being checked
* @param duration The duration the thread has been blocked, in nanoseconds
* @param maxExecTime The max execution time the thread is allowed, in nanoseconds
* @param warningExceptionTime The max time a thread can be blocked before stack traces get logged, in nanoseconds
*/
public BlockedThreadEvent(Thread thread, long duration, long maxExecTime, long warningExceptionTime) {
this.thread = thread;
this.duration = duration;
this.maxExecTime = maxExecTime;
this.warningExceptionTime = warningExceptionTime;
}

public Thread thread() {
return thread;
}

public long maxExecTime() {
return maxExecTime;
}

public long duration() {
return duration;
}

public long warningExceptionTime() {
return warningExceptionTime;
}
}
35 changes: 34 additions & 1 deletion src/test/java/io/vertx/core/BlockedThreadCheckerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
package io.vertx.core;

import io.vertx.core.*;
import io.vertx.core.impl.VertxInternal;
import io.vertx.test.core.BlockedThreadWarning;
import io.vertx.test.core.VertxTestBase;
import org.junit.Rule;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.concurrent.TimeUnit.*;

/**
* @author <a href="http://oss.lehmann.cx/">Alexander Lehmann</a>
Expand Down Expand Up @@ -119,4 +120,36 @@ public void start() throws InterruptedException {
await();
blockedThreadWarning.expectMessage("vert.x-worker-thread", maxWorkerExecuteTime, maxWorkerExecuteTimeUnit);
}

@Test
public void testCustomThreadBlockedHandler() throws Exception {
disableThreadChecks();
waitFor(2);
Verticle verticle = new AbstractVerticle() {
@Override
public void start() throws InterruptedException {
Thread.sleep(3000);
complete();
}
};
long maxWorkerExecuteTime = 1000;
long warningExceptionTime = 2;
TimeUnit maxWorkerExecuteTimeUnit = MILLISECONDS;
TimeUnit warningExceptionTimeUnit = SECONDS;
VertxOptions vertxOptions = new VertxOptions();
vertxOptions.setMaxWorkerExecuteTime(maxWorkerExecuteTime);
vertxOptions.setMaxWorkerExecuteTimeUnit(maxWorkerExecuteTimeUnit);
vertxOptions.setWarningExceptionTime(warningExceptionTime);
vertxOptions.setWarningExceptionTimeUnit(warningExceptionTimeUnit);
Vertx newVertx = vertx(vertxOptions);
((VertxInternal) newVertx).blockedThreadChecker().setThreadBlockedHandler(bte -> {
assertEquals(NANOSECONDS.convert(maxWorkerExecuteTime, maxWorkerExecuteTimeUnit), bte.maxExecTime());
assertEquals(NANOSECONDS.convert(warningExceptionTime, warningExceptionTimeUnit), bte.warningExceptionTime());
complete();
});
DeploymentOptions deploymentOptions = new DeploymentOptions();
deploymentOptions.setWorker(true);
newVertx.deployVerticle(verticle, deploymentOptions);
await();
}
}
1 change: 1 addition & 0 deletions src/test/java/io/vertx/core/ContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import io.netty.channel.EventLoop;
import io.vertx.core.impl.*;
import io.vertx.core.impl.btc.BlockedThreadChecker;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.test.core.VertxTestBase;
import org.junit.Test;
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/vertx/test/core/BlockedThreadWarning.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package io.vertx.test.core;

import io.vertx.core.impl.BlockedThreadChecker;
import io.vertx.core.impl.btc.BlockedThreadChecker;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
Expand Down

0 comments on commit 13e9571

Please sign in to comment.