Skip to content

Commit

Permalink
Add worker threadPool and task metrics
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Add metrics to identify how many jobs started in master.

### Why are the changes needed?

Add a way to monitor worker load info. Include worker read/write thread
pool allowed max size, current size, actively count, and completed task
count


### Does this PR introduce any user facing changes?

Please list the user-facing changes introduced by your change, including
no

pr-link: #14140
change-id: cid-4a669327eecd02f927e09370dceba93afd52ae0e
  • Loading branch information
xichen01 committed Oct 13, 2021
1 parent 92842d8 commit fed20cd
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 8 deletions.
54 changes: 54 additions & 0 deletions core/common/src/main/java/alluxio/metrics/MetricKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,60 @@ public MetricKey build() {
.setMetricType(MetricType.GAUGE)
.setIsClusterAggregated(false)
.build();
public static final MetricKey WORKER_BLOCK_READER_THREAD_ACTIVELY_COUNT =
new Builder("Worker.BlockReaderThreadActivelyCount")
.setDescription("The approximate number of block read "
+ "threads that are actively executing tasks in reader thread pool")
.setMetricType(MetricType.GAUGE)
.setIsClusterAggregated(false)
.build();
public static final MetricKey WORKER_BLOCK_READER_THREAD_CURRENT_COUNT =
new Builder("Worker.BlockReaderThreadCurrentCount")
.setDescription("The current number of read threads in the reader thread pool")
.setMetricType(MetricType.GAUGE)
.setIsClusterAggregated(false)
.build();
public static final MetricKey WORKER_BLOCK_READER_THREAD_MAX_COUNT =
new Builder("Worker.BlockReaderThreadMaxCount")
.setDescription("The maximum allowed number of block read "
+ "thread in the reader thread pool")
.setMetricType(MetricType.GAUGE)
.setIsClusterAggregated(false)
.build();
public static final MetricKey WORKER_BLOCK_READER_COMPLETED_TASK_COUNT =
new Builder("Worker.BlockReaderCompleteTaskCount")
.setDescription("The approximate total number of block read tasks "
+ "that have completed execution")
.setMetricType(MetricType.GAUGE)
.setIsClusterAggregated(false)
.build();
public static final MetricKey WORKER_BLOCK_WRITER_THREAD_ACTIVELY_COUNT =
new Builder("Worker.BlockWriterThreadActivelyCount")
.setDescription("The approximate number of block write "
+ "threads that are actively executing tasks in writer thread pool")
.setMetricType(MetricType.GAUGE)
.setIsClusterAggregated(false)
.build();
public static final MetricKey WORKER_BLOCK_WRITER_THREAD_CURRENT_COUNT =
new Builder("Worker.BlockWriterThreadCurrentCount")
.setDescription("The current number of write threads in the writer thread pool")
.setMetricType(MetricType.GAUGE)
.setIsClusterAggregated(false)
.build();
public static final MetricKey WORKER_BLOCK_WRITER_THREAD_MAX_COUNT =
new Builder("Worker.BlockWriterThreadMaxCount")
.setDescription("The maximum allowed number of block write "
+ "thread in the writer thread pool")
.setMetricType(MetricType.GAUGE)
.setIsClusterAggregated(false)
.build();
public static final MetricKey WORKER_BLOCK_WRITER_COMPLETED_TASK_COUNT =
new Builder("Worker.BlockWriterCompleteTaskCount")
.setDescription("The approximate total number of block write tasks "
+ "that have completed execution")
.setMetricType(MetricType.GAUGE)
.setIsClusterAggregated(false)
.build();

// Client metrics
public static final MetricKey CLIENT_BLOCK_READ_CHUNK_REMOTE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import alluxio.conf.ServerConfiguration;
import alluxio.Constants;
import alluxio.conf.PropertyKey;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.security.User;
import alluxio.security.authentication.AuthenticatedClientUser;
import alluxio.util.ThreadFactoryUtils;
Expand Down Expand Up @@ -45,11 +47,13 @@ public final class GrpcExecutors {
PropertyKey.WORKER_NETWORK_ASYNC_CACHE_MANAGER_QUEUE_MAX)),
ThreadFactoryUtils.build("CacheManagerExecutor-%d", true)));

private static final ThreadPoolExecutor BLOCK_READER_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(THREADS_MIN, ServerConfiguration.getInt(
PropertyKey.WORKER_NETWORK_BLOCK_READER_THREADS_MAX), THREAD_STOP_MS,
TimeUnit.MILLISECONDS, new SynchronousQueue<>(),
ThreadFactoryUtils.build("BlockDataReaderExecutor-%d", true));
public static final ExecutorService BLOCK_READER_EXECUTOR =
new ImpersonateThreadPoolExecutor(new ThreadPoolExecutor(THREADS_MIN,
ServerConfiguration.getInt(PropertyKey.WORKER_NETWORK_BLOCK_READER_THREADS_MAX),
THREAD_STOP_MS, TimeUnit.MILLISECONDS, new SynchronousQueue<>(),
ThreadFactoryUtils.build("BlockDataReaderExecutor-%d", true)));
new ImpersonateThreadPoolExecutor(BLOCK_READER_THREAD_POOL_EXECUTOR);

public static final ExecutorService BLOCK_READER_SERIALIZED_RUNNER_EXECUTOR =
new ImpersonateThreadPoolExecutor(new ThreadPoolExecutor(THREADS_MIN,
Expand All @@ -58,11 +62,41 @@ public final class GrpcExecutors {
ThreadFactoryUtils.build("BlockDataReaderSerializedExecutor-%d", true),
new ThreadPoolExecutor.CallerRunsPolicy()));

private static final ThreadPoolExecutor BLOCK_WRITE_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(THREADS_MIN, ServerConfiguration.getInt(
PropertyKey.WORKER_NETWORK_BLOCK_WRITER_THREADS_MAX), THREAD_STOP_MS,
TimeUnit.MILLISECONDS, new SynchronousQueue<>(),
ThreadFactoryUtils.build("BlockDataWriterExecutor-%d", true));
public static final ExecutorService BLOCK_WRITER_EXECUTOR =
new ImpersonateThreadPoolExecutor(new ThreadPoolExecutor(THREADS_MIN,
ServerConfiguration.getInt(PropertyKey.WORKER_NETWORK_BLOCK_WRITER_THREADS_MAX),
THREAD_STOP_MS, TimeUnit.MILLISECONDS, new SynchronousQueue<>(),
ThreadFactoryUtils.build("BlockDataWriterExecutor-%d", true)));
new ImpersonateThreadPoolExecutor(BLOCK_WRITE_THREAD_POOL_EXECUTOR);

static {
MetricsSystem.registerGaugeIfAbsent(MetricsSystem.getMetricName(
MetricKey.WORKER_BLOCK_READER_THREAD_ACTIVELY_COUNT.getName()),
BLOCK_READER_THREAD_POOL_EXECUTOR::getActiveCount);
MetricsSystem.registerGaugeIfAbsent(MetricsSystem.getMetricName(
MetricKey.WORKER_BLOCK_READER_THREAD_CURRENT_COUNT.getName()),
BLOCK_READER_THREAD_POOL_EXECUTOR::getPoolSize);
MetricsSystem.registerGaugeIfAbsent(MetricsSystem.getMetricName(
MetricKey.WORKER_BLOCK_READER_THREAD_MAX_COUNT.getName()),
BLOCK_READER_THREAD_POOL_EXECUTOR::getMaximumPoolSize);
MetricsSystem.registerGaugeIfAbsent(MetricsSystem.getMetricName(
MetricKey.WORKER_BLOCK_READER_COMPLETED_TASK_COUNT.getName()),
BLOCK_READER_THREAD_POOL_EXECUTOR::getCompletedTaskCount);

MetricsSystem.registerGaugeIfAbsent(MetricsSystem.getMetricName(
MetricKey.WORKER_BLOCK_WRITER_THREAD_ACTIVELY_COUNT.getName()),
BLOCK_WRITE_THREAD_POOL_EXECUTOR::getActiveCount);
MetricsSystem.registerGaugeIfAbsent(MetricsSystem.getMetricName(
MetricKey.WORKER_BLOCK_WRITER_THREAD_CURRENT_COUNT.getName()),
BLOCK_WRITE_THREAD_POOL_EXECUTOR::getPoolSize);
MetricsSystem.registerGaugeIfAbsent(MetricsSystem.getMetricName(
MetricKey.WORKER_BLOCK_WRITER_THREAD_MAX_COUNT.getName()),
BLOCK_WRITE_THREAD_POOL_EXECUTOR::getMaximumPoolSize);
MetricsSystem.registerGaugeIfAbsent(MetricsSystem.getMetricName(
MetricKey.WORKER_BLOCK_WRITER_COMPLETED_TASK_COUNT.getName()),
BLOCK_WRITE_THREAD_POOL_EXECUTOR::getCompletedTaskCount);
}

/**
* Private constructor.
Expand Down

0 comments on commit fed20cd

Please sign in to comment.