Skip to content

Commit

Permalink
Refactor the pool metrics to be reusable for other kind of pools that…
Browse files Browse the repository at this point in the history
… can exposed to the metrics implementation
  • Loading branch information
vietj committed May 4, 2016
1 parent b59cf28 commit b0aaf9d
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 52 deletions.
16 changes: 8 additions & 8 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Expand Up @@ -23,7 +23,7 @@
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger; import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory; import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.metrics.ThreadPoolMetrics; import io.vertx.core.spi.metrics.PoolMetrics;


import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -294,12 +294,12 @@ public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, Handler<


<T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler, <T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler,
Handler<AsyncResult<T>> resultHandler, Handler<AsyncResult<T>> resultHandler,
Executor exec, ThreadPoolMetrics metrics) { Executor exec, PoolMetrics metrics) {
Object metric = metrics != null ? metrics.taskSubmitted() : null; Object metric = metrics != null ? metrics.taskSubmitted() : null;
try { try {
exec.execute(() -> { exec.execute(() -> {
if (metrics != null) { if (metrics != null) {
metrics.taskExecuting(metric); metrics.taskBegin(metric);
} }
Future<T> res = Future.future(); Future<T> res = Future.future();
try { try {
Expand All @@ -314,7 +314,7 @@ <T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandle
res.fail(e); res.fail(e);
} }
if (metrics != null) { if (metrics != null) {
metrics.taskCompleted(metric, res.succeeded()); metrics.taskEnd(metric, res.succeeded());
} }
if (resultHandler != null) { if (resultHandler != null) {
runOnContext(v -> res.setHandler(resultHandler)); runOnContext(v -> res.setHandler(resultHandler));
Expand All @@ -335,7 +335,7 @@ protected synchronized Map<String, Object> contextData() {
return contextData; return contextData;
} }


protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean checkThread, ThreadPoolMetrics metrics) { protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean checkThread, PoolMetrics metrics) {
Object metric = metrics != null ? metrics.taskSubmitted() : null; Object metric = metrics != null ? metrics.taskSubmitted() : null;
return () -> { return () -> {
Thread th = Thread.currentThread(); Thread th = Thread.currentThread();
Expand All @@ -351,7 +351,7 @@ protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean chec
} }
} }
if (metrics != null) { if (metrics != null) {
metrics.taskExecuting(metric); metrics.taskBegin(metric);
} }
if (!DISABLE_TIMINGS) { if (!DISABLE_TIMINGS) {
current.executeStart(); current.executeStart();
Expand All @@ -364,7 +364,7 @@ protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean chec
hTask.handle(null); hTask.handle(null);
} }
if (metrics != null) { if (metrics != null) {
metrics.taskCompleted(metric, true); metrics.taskEnd(metric, true);
} }
} catch (Throwable t) { } catch (Throwable t) {
log.error("Unhandled exception", t); log.error("Unhandled exception", t);
Expand All @@ -376,7 +376,7 @@ protected Runnable wrapTask(ContextTask cTask, Handler<Void> hTask, boolean chec
handler.handle(t); handler.handle(t);
} }
if (metrics != null) { if (metrics != null) {
metrics.taskCompleted(metric, false); metrics.taskEnd(metric, false);
} }
} finally { } finally {
// We don't unset the context after execution - this is done later when the context is closed via // We don't unset the context after execution - this is done later when the context is closed via
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Expand Up @@ -58,7 +58,7 @@
import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.metrics.Metrics; import io.vertx.core.spi.metrics.Metrics;
import io.vertx.core.spi.metrics.MetricsProvider; import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.ThreadPoolMetrics; import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.core.spi.metrics.VertxMetrics; import io.vertx.core.spi.metrics.VertxMetrics;


import java.io.File; import java.io.File;
Expand Down Expand Up @@ -136,12 +136,12 @@ public class VertxImpl implements VertxInternal, MetricsProvider {


metrics = initialiseMetrics(options); metrics = initialiseMetrics(options);


ThreadPoolMetrics workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics("vert.x-worker-thread", options.getWorkerPoolSize()) : null;
ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(), ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime())); new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime()));
ThreadPoolMetrics internalBlockingPoolMetrics = isMetricsEnabled() ? metrics.createMetrics("vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null; PoolMetrics workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(workerExec, "vert.x-worker-thread", options.getWorkerPoolSize()) : null;
ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(), ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime())); new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics internalBlockingPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(internalBlockingExec, "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null;
internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics); internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics);
namedWorkerPools = new HashMap<>(); namedWorkerPools = new HashMap<>();
workerPool = new WorkerPool(workerExec, workerPoolMetrics); workerPool = new WorkerPool(workerExec, workerPoolMetrics);
Expand Down Expand Up @@ -872,7 +872,7 @@ class SharedWorkerPool extends WorkerPool {
private final String name; private final String name;
private int refCount = 1; private int refCount = 1;


public SharedWorkerPool(String name, ExecutorService workerExec, ThreadPoolMetrics workerMetrics) { public SharedWorkerPool(String name, ExecutorService workerExec, PoolMetrics workerMetrics) {
super(workerExec, workerMetrics); super(workerExec, workerMetrics);
this.workerExec = workerExec; this.workerExec = workerExec;
this.name = name; this.name = name;
Expand Down Expand Up @@ -911,8 +911,8 @@ public synchronized NamedWorkerExecutor createWorkerExecutor(String name, int po
} }
SharedWorkerPool sharedWorkerPool = namedWorkerPools.get(name); SharedWorkerPool sharedWorkerPool = namedWorkerPools.get(name);
if (sharedWorkerPool == null) { if (sharedWorkerPool == null) {
ThreadPoolMetrics workerMetrics = isMetricsEnabled() ? metrics.createMetrics(name, poolSize) : null;
ExecutorService workerExec = Executors.newFixedThreadPool(poolSize, new VertxThreadFactory(name + "-", checker, true, maxExecuteTime)); ExecutorService workerExec = Executors.newFixedThreadPool(poolSize, new VertxThreadFactory(name + "-", checker, true, maxExecuteTime));
PoolMetrics workerMetrics = isMetricsEnabled() ? metrics.createMetrics(workerExec, name, poolSize) : null;
namedWorkerPools.put(name, sharedWorkerPool = new SharedWorkerPool(name, workerExec, workerMetrics)); namedWorkerPools.put(name, sharedWorkerPool = new SharedWorkerPool(name, workerExec, workerMetrics));
} else { } else {
sharedWorkerPool.refCount++; sharedWorkerPool.refCount++;
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/io/vertx/core/impl/VertxThreadFactory.java
Expand Up @@ -16,8 +16,6 @@


package io.vertx.core.impl; package io.vertx.core.impl;


import io.vertx.core.spi.metrics.ThreadPoolMetrics;

import java.util.Map; import java.util.Map;
import java.util.WeakHashMap; import java.util.WeakHashMap;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/vertx/core/impl/WorkerPool.java
Expand Up @@ -16,7 +16,7 @@


package io.vertx.core.impl; package io.vertx.core.impl;


import io.vertx.core.spi.metrics.ThreadPoolMetrics; import io.vertx.core.spi.metrics.PoolMetrics;


import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
Expand All @@ -28,9 +28,9 @@ class WorkerPool {


private final OrderedExecutorFactory orderedFact; private final OrderedExecutorFactory orderedFact;
private final ExecutorService pool; private final ExecutorService pool;
private final ThreadPoolMetrics metrics; private final PoolMetrics metrics;


public WorkerPool(ExecutorService pool, ThreadPoolMetrics metrics) { public WorkerPool(ExecutorService pool, PoolMetrics metrics) {
this.orderedFact = new OrderedExecutorFactory(pool); this.orderedFact = new OrderedExecutorFactory(pool);
this.pool = pool; this.pool = pool;
this.metrics = metrics; this.metrics = metrics;
Expand All @@ -44,7 +44,7 @@ Executor createOrderedExecutor() {
return orderedFact.getExecutor(); return orderedFact.getExecutor();
} }


ThreadPoolMetrics metrics() { PoolMetrics metrics() {
return metrics; return metrics;
} }


Expand Down
Expand Up @@ -91,11 +91,10 @@ public DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocket
} }


@Override @Override
public ThreadPoolMetrics createMetrics(String name, int poolSize) { public <P> PoolMetrics<?> createMetrics(P pool, String poolName, int maxPoolSize) {
return new DummyWorkerPoolMetrics(); return new DummyWorkerPoolMetrics();
} }



@Override @Override
public void close() { public void close() {
} }
Expand Down Expand Up @@ -345,7 +344,7 @@ public boolean isEnabled() {
} }
} }


private class DummyWorkerPoolMetrics implements ThreadPoolMetrics<Void> { private class DummyWorkerPoolMetrics implements PoolMetrics<Void> {


@Override @Override
public Void taskSubmitted() { public Void taskSubmitted() {
Expand All @@ -357,11 +356,11 @@ public void taskRejected(Void task) {
} }


@Override @Override
public void taskExecuting(Void task) { public void taskBegin(Void task) {
} }


@Override @Override
public void taskCompleted(Void task, boolean succeeded) { public void taskEnd(Void task, boolean succeeded) {
} }


@Override @Override
Expand Down
Expand Up @@ -17,35 +17,35 @@
package io.vertx.core.spi.metrics; package io.vertx.core.spi.metrics;


/** /**
* An SPI used internally by Vert.x to gather metrics on a worker thread (execute blocking, worker verticle). * An SPI used internally by Vert.x to gather metrics on pools used by Vert.x (execute blocking, worker verticle).
* *
* @author <a href="http://escoffier.me">Clement Escoffier</a> * @author <a href="http://escoffier.me">Clement Escoffier</a>
*/ */
public interface ThreadPoolMetrics<T> extends Metrics { public interface PoolMetrics<T> extends Metrics {


/** /**
* A new task has been submitted to the worker queue. * A new task has been submitted to access the resource.
* This method is called from the submitter context. * This method is called from the submitter context.
* *
* @return the submitted task. * @return the submitted task.
*/ */
T taskSubmitted(); T taskSubmitted();


/** /**
* The task has been rejected. The underlying thread pool has probably be shutdown. * The task has been rejected. The underlying resource has probably be shutdown.
*/ */
void taskRejected(T task); void taskRejected(T task);


/** /**
* The submitted task start its execution. * The submitted task start to use the resource.
*/ */
void taskExecuting(T task); void taskBegin(T task);


/** /**
* The submitted tasks has completed its execution. * The submitted tasks has completed its execution and release the resource.
* *
* @param succeeded whether or not the task has gracefully completed * @param succeeded whether or not the task has gracefully completed
*/ */
void taskCompleted(T task, boolean succeeded); void taskEnd(T task, boolean succeeded);


} }
8 changes: 4 additions & 4 deletions src/main/java/io/vertx/core/spi/metrics/VertxMetrics.java
Expand Up @@ -17,7 +17,6 @@
package io.vertx.core.spi.metrics; package io.vertx.core.spi.metrics;


import io.vertx.core.Verticle; import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.datagram.DatagramSocket; import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions; import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.eventbus.EventBus; import io.vertx.core.eventbus.EventBus;
Expand Down Expand Up @@ -168,11 +167,12 @@ default void eventBusInitialized(EventBus bus) {
} }


/** /**
* Provides the thread pool metrics SPI. * Provides the pool metrics SPI.
* *
* @param pool the pool of resource, it can be used by the metrics implementation to gather extra statistics
* @param poolName the name of the thread pool * @param poolName the name of the thread pool
* @param poolSize the size of the pool * @param maxPoolSize the pool max size, or -1 if the number cannot be determined
* @return the thread pool metrics SPI * @return the thread pool metrics SPI
*/ */
ThreadPoolMetrics<?> createMetrics(String poolName, int poolSize); <P> PoolMetrics<?> createMetrics(P pool, String poolName, int maxPoolSize);
} }
10 changes: 5 additions & 5 deletions src/test/java/io/vertx/test/core/MetricsTest.java
Expand Up @@ -27,7 +27,7 @@
import io.vertx.core.http.*; import io.vertx.core.http.*;
import io.vertx.core.metrics.MetricsOptions; import io.vertx.core.metrics.MetricsOptions;
import io.vertx.core.net.NetSocket; import io.vertx.core.net.NetSocket;
import io.vertx.core.spi.metrics.ThreadPoolMetrics; import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.test.fakemetrics.*; import io.vertx.test.fakemetrics.*;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
Expand Down Expand Up @@ -673,7 +673,7 @@ private void testDatagram(String host, Consumer<PacketMetric> checker) throws Ex


@Test @Test
public void testThreadPoolMetricsWithExecuteBlocking() { public void testThreadPoolMetricsWithExecuteBlocking() {
Map<String, ThreadPoolMetrics> all = FakeThreadPoolMetrics.getThreadPoolMetrics(); Map<String, PoolMetrics> all = FakeThreadPoolMetrics.getThreadPoolMetrics();


FakeThreadPoolMetrics metrics = (FakeThreadPoolMetrics) all.get("vert.x-worker-thread"); FakeThreadPoolMetrics metrics = (FakeThreadPoolMetrics) all.get("vert.x-worker-thread");


Expand Down Expand Up @@ -723,7 +723,7 @@ public void testThreadPoolMetricsWithExecuteBlocking() {
public void testThreadPoolMetricsWithInternalExecuteBlocking() { public void testThreadPoolMetricsWithInternalExecuteBlocking() {
// Internal blocking thread pool is used by blocking file system actions. // Internal blocking thread pool is used by blocking file system actions.


Map<String, ThreadPoolMetrics> all = FakeThreadPoolMetrics.getThreadPoolMetrics(); Map<String, PoolMetrics> all = FakeThreadPoolMetrics.getThreadPoolMetrics();
FakeThreadPoolMetrics metrics = (FakeThreadPoolMetrics) all.get("vert.x-internal-blocking"); FakeThreadPoolMetrics metrics = (FakeThreadPoolMetrics) all.get("vert.x-internal-blocking");


assertThat(metrics.getPoolSize(), is(getOptions().getInternalBlockingPoolSize())); assertThat(metrics.getPoolSize(), is(getOptions().getInternalBlockingPoolSize()));
Expand Down Expand Up @@ -784,7 +784,7 @@ public void testThreadPoolMetricsWithWorkerVerticleAndMultiThread() {


private void testWithWorkerVerticle(DeploymentOptions options) { private void testWithWorkerVerticle(DeploymentOptions options) {
AtomicInteger counter = new AtomicInteger(); AtomicInteger counter = new AtomicInteger();
Map<String, ThreadPoolMetrics> all = FakeThreadPoolMetrics.getThreadPoolMetrics(); Map<String, PoolMetrics> all = FakeThreadPoolMetrics.getThreadPoolMetrics();
FakeThreadPoolMetrics metrics = (FakeThreadPoolMetrics) all.get("vert.x-worker-thread"); FakeThreadPoolMetrics metrics = (FakeThreadPoolMetrics) all.get("vert.x-worker-thread");


assertThat(metrics.getPoolSize(), is(getOptions().getInternalBlockingPoolSize())); assertThat(metrics.getPoolSize(), is(getOptions().getInternalBlockingPoolSize()));
Expand Down Expand Up @@ -854,7 +854,7 @@ public void testThreadPoolMetricsWithNamedExecuteBlocking() {


WorkerExecutor workerExec = vertx.createWorkerExecutor("my-pool", 10); WorkerExecutor workerExec = vertx.createWorkerExecutor("my-pool", 10);


Map<String, ThreadPoolMetrics> all = FakeThreadPoolMetrics.getThreadPoolMetrics(); Map<String, PoolMetrics> all = FakeThreadPoolMetrics.getThreadPoolMetrics();


FakeThreadPoolMetrics metrics = (FakeThreadPoolMetrics) all.get("my-pool"); FakeThreadPoolMetrics metrics = (FakeThreadPoolMetrics) all.get("my-pool");


Expand Down
Expand Up @@ -16,20 +16,20 @@


package io.vertx.test.fakemetrics; package io.vertx.test.fakemetrics;


import io.vertx.core.spi.metrics.ThreadPoolMetrics; import io.vertx.core.spi.metrics.PoolMetrics;


import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


/** /**
* A fake implementation of the {@link ThreadPoolMetrics} SPI. * A fake implementation of the {@link PoolMetrics} SPI.
* *
* @author <a href="http://escoffier.me">Clement Escoffier</a> * @author <a href="http://escoffier.me">Clement Escoffier</a>
*/ */
public class FakeThreadPoolMetrics implements ThreadPoolMetrics<Void> { public class FakeThreadPoolMetrics implements PoolMetrics<Void> {
private final static Map<String, ThreadPoolMetrics> METRICS = new ConcurrentHashMap<>(); private final static Map<String, PoolMetrics> METRICS = new ConcurrentHashMap<>();


private final int poolSize; private final int poolSize;


Expand Down Expand Up @@ -69,14 +69,14 @@ public void taskRejected(Void task) {
} }


@Override @Override
public void taskExecuting(Void task) { public void taskBegin(Void task) {
waiting.decrementAndGet(); waiting.decrementAndGet();
idle.decrementAndGet(); idle.decrementAndGet();
running.incrementAndGet(); running.incrementAndGet();
} }


@Override @Override
public void taskCompleted(Void task, boolean succeeded) { public void taskEnd(Void task, boolean succeeded) {
running.decrementAndGet(); running.decrementAndGet();
idle.incrementAndGet(); idle.incrementAndGet();
completed.incrementAndGet(); completed.incrementAndGet();
Expand Down Expand Up @@ -117,7 +117,7 @@ public int numberOfRunningTasks() {
return running.get(); return running.get();
} }


public static Map<String, ThreadPoolMetrics> getThreadPoolMetrics() { public static Map<String, PoolMetrics> getThreadPoolMetrics() {
return METRICS; return METRICS;
} }


Expand Down
5 changes: 3 additions & 2 deletions src/test/java/io/vertx/test/fakemetrics/FakeVertxMetrics.java
Expand Up @@ -32,6 +32,7 @@
import io.vertx.core.net.SocketAddress; import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.metrics.*; import io.vertx.core.spi.metrics.*;


import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;


/** /**
Expand Down Expand Up @@ -135,8 +136,8 @@ public DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocket
} }


@Override @Override
public ThreadPoolMetrics createMetrics(String poolName, int poolSize) { public <P> PoolMetrics<?> createMetrics(P pool, String poolName, int maxPoolSize) {
return new FakeThreadPoolMetrics(poolName, poolSize); return new FakeThreadPoolMetrics(poolName, maxPoolSize);
} }


public boolean isEnabled() { public boolean isEnabled() {
Expand Down

0 comments on commit b0aaf9d

Please sign in to comment.