From e486e17d17738cf44e3622d51b10ed5926244f40 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Tue, 20 Jun 2023 11:02:29 +0200 Subject: [PATCH 1/3] Track total task execution time in threadpool --- .../common/util/concurrent/EsExecutors.java | 6 +-- ...tionTimeTrackingEsThreadPoolExecutor.java} | 23 +++++++++-- .../search/query/QueryPhase.java | 6 +-- .../threadpool/FixedExecutorBuilder.java | 22 ++++++---- .../util/concurrent/EsExecutorsTests.java | 4 +- ...imeTrackingEsThreadPoolExecutorTests.java} | 40 ++++++++++++++----- .../threadpool/ThreadPoolTests.java | 6 +-- 7 files changed, 74 insertions(+), 33 deletions(-) rename server/src/main/java/org/elasticsearch/common/util/concurrent/{EWMATrackingEsThreadPoolExecutor.java => TaskExecutionTimeTrackingEsThreadPoolExecutor.java} (80%) rename server/src/test/java/org/elasticsearch/common/util/concurrent/{EWMATrackingEsThreadPoolExecutorTests.java => TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java} (68%) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 43e6e90548b7f..5d17be06e8ce6 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -122,7 +122,7 @@ public static EsThreadPoolExecutor newFixed( int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder, - boolean trackEWMA + boolean trackExecutionTime ) { BlockingQueue queue; if (queueCapacity < 0) { @@ -130,8 +130,8 @@ public static EsThreadPoolExecutor newFixed( } else { queue = new SizeBlockingQueue<>(ConcurrentCollections.newBlockingQueue(), queueCapacity); } - if (trackEWMA) { - return new EWMATrackingEsThreadPoolExecutor( + if (trackExecutionTime) { + return new TaskExecutionTimeTrackingEsThreadPoolExecutor( name, size, size, diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EWMATrackingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java similarity index 80% rename from server/src/main/java/org/elasticsearch/common/util/concurrent/EWMATrackingEsThreadPoolExecutor.java rename to server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java index 269d2f09ab4ae..1b5de374a011f 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EWMATrackingEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java @@ -15,20 +15,22 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; /** - * An extension to thread pool executor, which tracks the exponentially weighted moving average of the task execution time. + * An extension to thread pool executor, which tracks statistics for the task execution time. */ -public final class EWMATrackingEsThreadPoolExecutor extends EsThreadPoolExecutor { +public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThreadPoolExecutor { // This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable public static double EWMA_ALPHA = 0.3; private final Function runnableWrapper; private final ExponentiallyWeightedMovingAverage executionEWMA; + private final LongAdder totalExecutionTime = new LongAdder(); - EWMATrackingEsThreadPoolExecutor( + TaskExecutionTimeTrackingEsThreadPoolExecutor( String name, int corePoolSize, int maximumPoolSize, @@ -67,6 +69,13 @@ public double getTaskExecutionEWMA() { return executionEWMA.getAverage(); } + /** + * Returns the total time (in nanoseconds) spend executing tasks in this executor. + */ + public long getTotalTaskExecutionTime() { + return totalExecutionTime.sum(); + } + /** * Returns the current queue size (operations that are queued) */ @@ -93,12 +102,18 @@ protected void afterExecute(Runnable r, Throwable t) { if (taskExecutionNanos != -1) { // taskExecutionNanos may be -1 if the task threw an exception executionEWMA.addValue(taskExecutionNanos); + totalExecutionTime.add(taskExecutionNanos); } } @Override protected void appendThreadPoolExecutorDetails(StringBuilder sb) { - sb.append("task execution EWMA = ").append(TimeValue.timeValueNanos((long) executionEWMA.getAverage())).append(", "); + sb.append("task execution EWMA = ") + .append(TimeValue.timeValueNanos((long) executionEWMA.getAverage())) + .append(", ") + .append("total task execution time = ") + .append(TimeValue.timeValueNanos(getTotalTaskExecutionTime())) + .append(", "); } } diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 338087c112812..4b76958852c92 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -30,8 +30,8 @@ import org.elasticsearch.common.lucene.MinimumScoreCollector; import org.elasticsearch.common.lucene.search.FilteredCollector; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; -import org.elasticsearch.common.util.concurrent.EWMATrackingEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; import org.elasticsearch.lucene.queries.SearchAfterSortedDocQuery; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchContextSourcePrinter; @@ -268,10 +268,10 @@ static void addCollectorsAndSearch(SearchContext searchContext) throws QueryPhas searchWithCollectorManager(searchContext, searcher, query, collectorManager, timeoutRunnable != null); queryResult.topDocs(topDocsFactory.topDocsAndMaxScore(), topDocsFactory.sortValueFormats); ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); - assert executor instanceof EWMATrackingEsThreadPoolExecutor + assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor || (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */) : "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass(); - if (executor instanceof EWMATrackingEsThreadPoolExecutor rExecutor) { + if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) { queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); } diff --git a/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java index c9432012d179e..b61e7e43d8259 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java @@ -28,7 +28,7 @@ public final class FixedExecutorBuilder extends ExecutorBuilder sizeSetting; private final Setting queueSizeSetting; - private final boolean trackEWMA; + private final boolean trackExecutionTime; /** * Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name. @@ -37,10 +37,16 @@ public final class FixedExecutorBuilder extends ExecutorBuilder executor: {}", executor); assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L)); + assertThat(executor.getTotalTaskExecutionTime(), equalTo(0L)); executeTask(executor, 1); - assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(30L)); }); + assertBusy(() -> { + assertThat((long) executor.getTaskExecutionEWMA(), equalTo(30L)); + assertThat(executor.getTotalTaskExecutionTime(), equalTo(100L)); + }); executeTask(executor, 1); - assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(51L)); }); + assertBusy(() -> { + assertThat((long) executor.getTaskExecutionEWMA(), equalTo(51L)); + assertThat(executor.getTotalTaskExecutionTime(), equalTo(200L)); + }); executeTask(executor, 1); - assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(65L)); }); + assertBusy(() -> { + assertThat((long) executor.getTaskExecutionEWMA(), equalTo(65L)); + assertThat(executor.getTotalTaskExecutionTime(), equalTo(300L)); + }); executeTask(executor, 1); - assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(75L)); }); + assertBusy(() -> { + assertThat((long) executor.getTaskExecutionEWMA(), equalTo(75L)); + assertThat(executor.getTotalTaskExecutionTime(), equalTo(400L)); + }); executeTask(executor, 1); - assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(83L)); }); + assertBusy(() -> { + assertThat((long) executor.getTaskExecutionEWMA(), equalTo(83L)); + assertThat(executor.getTotalTaskExecutionTime(), equalTo(500L)); + }); executor.shutdown(); executor.awaitTermination(10, TimeUnit.SECONDS); @@ -59,7 +75,7 @@ public void testExecutionEWMACalculation() throws Exception { /** Use a runnable wrapper that simulates a task with unknown failures. */ public void testExceptionThrowingTask() throws Exception { ThreadContext context = new ThreadContext(Settings.EMPTY); - EWMATrackingEsThreadPoolExecutor executor = new EWMATrackingEsThreadPoolExecutor( + TaskExecutionTimeTrackingEsThreadPoolExecutor executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor( "test-threadpool", 1, 1, @@ -75,7 +91,11 @@ public void testExceptionThrowingTask() throws Exception { logger.info("--> executor: {}", executor); assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L)); - executeTask(executor, 1); + int taskCount = randomIntBetween(1, 100); + executeTask(executor, taskCount); + assertBusy(() -> assertThat(executor.getCompletedTaskCount(), equalTo((long) taskCount))); + assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L)); + assertThat(executor.getTotalTaskExecutionTime(), equalTo(0L)); executor.shutdown(); executor.awaitTermination(10, TimeUnit.SECONDS); } @@ -94,7 +114,7 @@ private Function exceptionalWrapper() { } /** Execute a blank task {@code times} times for the executor */ - private void executeTask(EWMATrackingEsThreadPoolExecutor executor, int times) { + private void executeTask(TaskExecutionTimeTrackingEsThreadPoolExecutor executor, int times) { logger.info("--> executing a task [{}] times", times); for (int i = 0; i < times; i++) { executor.execute(() -> {}); diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index b60ab25169bff..d452e03f9ee9a 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -14,9 +14,9 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.concurrent.EWMATrackingEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; @@ -351,10 +351,10 @@ public void testGetMaxSnapshotCores() { assertThat(getMaxSnapshotThreadPoolSize(allocatedProcessors, ByteSizeValue.ofGb(4)), equalTo(10)); } - public void testWriteThreadPoolUsesEWMATrackingEsThreadPoolExecutor() { + public void testWriteThreadPoolUsesTaskExecutionTimeTrackingEsThreadPoolExecutor() { final ThreadPool threadPool = new TestThreadPool("test", Settings.EMPTY); try { - assertThat(threadPool.executor(ThreadPool.Names.WRITE), instanceOf(EWMATrackingEsThreadPoolExecutor.class)); + assertThat(threadPool.executor(ThreadPool.Names.WRITE), instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class)); } finally { assertTrue(terminate(threadPool)); } From a356400f5ba079ab49308f2c1e23548187df412e Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Wed, 21 Jun 2023 09:43:28 +0200 Subject: [PATCH 2/3] address review comments --- ...ecutionTimeTrackingEsThreadPoolExecutorTests.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java index d04f065646b4f..7d419f965519b 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java @@ -32,7 +32,7 @@ public void testExecutionEWMACalculation() throws Exception { 1000, TimeUnit.MILLISECONDS, ConcurrentCollections.newBlockingQueue(), - fastWrapper(), + settableWrapper(TimeUnit.NANOSECONDS.toNanos(100)), EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context @@ -42,6 +42,7 @@ public void testExecutionEWMACalculation() throws Exception { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L)); assertThat(executor.getTotalTaskExecutionTime(), equalTo(0L)); + // Using the settableWrapper each task would take 100ns executeTask(executor, 1); assertBusy(() -> { assertThat((long) executor.getTaskExecutionEWMA(), equalTo(30L)); @@ -90,6 +91,7 @@ public void testExceptionThrowingTask() throws Exception { executor.prestartAllCoreThreads(); logger.info("--> executor: {}", executor); + // Using the exceptionalWrapper each task's execution time is -1 to simulate unknown failures/rejections. assertThat((long) executor.getTaskExecutionEWMA(), equalTo(0L)); int taskCount = randomIntBetween(1, 100); executeTask(executor, taskCount); @@ -100,8 +102,12 @@ public void testExceptionThrowingTask() throws Exception { executor.awaitTermination(10, TimeUnit.SECONDS); } - private Function fastWrapper() { - return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(100), false); + /** + * The returned function outputs a WrappedRunnabled that simulates the case + * where {@link TimedRunnable#getTotalExecutionNanos()} always returns {@code timeTakenNanos}. + */ + private Function settableWrapper(long timeTakenNanos) { + return (runnable) -> new SettableTimedRunnable(timeTakenNanos, false); } /** From d18c4030ada8344cea24e046c07a0b2a868ecd93 Mon Sep 17 00:00:00 2001 From: Pooya Salehi Date: Wed, 21 Jun 2023 10:15:04 +0200 Subject: [PATCH 3/3] Use TaskExecutionTimeTrackingEsThreadPoolExecutor for system write --- .../main/java/org/elasticsearch/threadpool/ThreadPool.java | 4 ++-- .../java/org/elasticsearch/threadpool/ThreadPoolTests.java | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 6568f4583b1cb..961a79a2c59a3 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -234,14 +234,14 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5), false) ); builders.put(Names.SYSTEM_READ, new FixedExecutorBuilder(settings, Names.SYSTEM_READ, halfProcMaxAt5, 2000, false)); - builders.put(Names.SYSTEM_WRITE, new FixedExecutorBuilder(settings, Names.SYSTEM_WRITE, halfProcMaxAt5, 1000, false)); + builders.put(Names.SYSTEM_WRITE, new FixedExecutorBuilder(settings, Names.SYSTEM_WRITE, halfProcMaxAt5, 1000, true)); builders.put( Names.SYSTEM_CRITICAL_READ, new FixedExecutorBuilder(settings, Names.SYSTEM_CRITICAL_READ, halfProcMaxAt5, 2000, false) ); builders.put( Names.SYSTEM_CRITICAL_WRITE, - new FixedExecutorBuilder(settings, Names.SYSTEM_CRITICAL_WRITE, halfProcMaxAt5, 1500, false) + new FixedExecutorBuilder(settings, Names.SYSTEM_CRITICAL_WRITE, halfProcMaxAt5, 1500, true) ); for (final ExecutorBuilder builder : customBuilders) { diff --git a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java index d452e03f9ee9a..5faf926c3b9a2 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -355,6 +355,11 @@ public void testWriteThreadPoolUsesTaskExecutionTimeTrackingEsThreadPoolExecutor final ThreadPool threadPool = new TestThreadPool("test", Settings.EMPTY); try { assertThat(threadPool.executor(ThreadPool.Names.WRITE), instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class)); + assertThat(threadPool.executor(ThreadPool.Names.SYSTEM_WRITE), instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class)); + assertThat( + threadPool.executor(ThreadPool.Names.SYSTEM_CRITICAL_WRITE), + instanceOf(TaskExecutionTimeTrackingEsThreadPoolExecutor.class) + ); } finally { assertTrue(terminate(threadPool)); }