From e8d97443407c64a0f6f88383f4596bfec66c5aa9 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 5 Mar 2019 12:37:49 +0100 Subject: [PATCH] Use Threadpool Time in ClusterApplierService (#39679) (#39685) * Use threadpool's time in `ClusterApplierService` to allow for deterministic tests * This is a part of/requirement for #39504 --- .../cluster/ClusterStateObserver.java | 13 +++++++----- .../service/ClusterApplierService.java | 18 ++++++++++------- .../service/ClusterApplierServiceTests.java | 20 +++++++++---------- 3 files changed, 29 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java index 2da0ff9286f01..3e61c203fd004 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @@ -44,6 +45,7 @@ public class ClusterStateObserver { private final Predicate MATCH_ALL_CHANGES_PREDICATE = state -> true; private final ClusterApplierService clusterApplierService; + private final ThreadPool threadPool; private final ThreadContext contextHolder; volatile TimeValue timeOutValue; @@ -52,7 +54,7 @@ public class ClusterStateObserver { final TimeoutClusterStateListener clusterStateListener = new ObserverClusterStateListener(); // observingContext is not null when waiting on cluster state changes final AtomicReference observingContext = new AtomicReference<>(null); - volatile Long startTimeNS; + volatile Long startTimeMS; volatile boolean timedOut; @@ -81,10 +83,11 @@ public ClusterStateObserver(ClusterState initialState, ClusterService clusterSer public ClusterStateObserver(ClusterState initialState, ClusterApplierService clusterApplierService, @Nullable TimeValue timeout, Logger logger, ThreadContext contextHolder) { this.clusterApplierService = clusterApplierService; + this.threadPool = clusterApplierService.threadPool(); this.lastObservedState = new AtomicReference<>(new StoredState(initialState)); this.timeOutValue = timeout; if (timeOutValue != null) { - this.startTimeNS = System.nanoTime(); + this.startTimeMS = threadPool.relativeTimeInMillis(); } this.logger = logger; this.contextHolder = contextHolder; @@ -134,7 +137,7 @@ public void waitForNextChange(Listener listener, Predicate statePr if (timeOutValue == null) { timeOutValue = this.timeOutValue; if (timeOutValue != null) { - long timeSinceStartMS = TimeValue.nsecToMSec(System.nanoTime() - startTimeNS); + long timeSinceStartMS = threadPool.relativeTimeInMillis() - startTimeMS; timeoutTimeLeftMS = timeOutValue.millis() - timeSinceStartMS; if (timeoutTimeLeftMS <= 0L) { // things have timeout while we were busy -> notify @@ -150,7 +153,7 @@ public void waitForNextChange(Listener listener, Predicate statePr timeoutTimeLeftMS = null; } } else { - this.startTimeNS = System.nanoTime(); + this.startTimeMS = threadPool.relativeTimeInMillis(); this.timeOutValue = timeOutValue; timeoutTimeLeftMS = timeOutValue.millis(); timedOut = false; @@ -240,7 +243,7 @@ public void onTimeout(TimeValue timeout) { ObservingContext context = observingContext.getAndSet(null); if (context != null) { clusterApplierService.removeTimeoutListener(this); - long timeSinceStartMS = TimeValue.nsecToMSec(System.nanoTime() - startTimeNS); + long timeSinceStartMS = threadPool.relativeTimeInMillis() - startTimeMS; logger.trace("observer: timeout notification from cluster service. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStartMS)); // update to latest, in case people want to retry diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index e254196caa47b..f2b0756d3d81f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -311,6 +311,10 @@ public void runOnApplierThread(final String source, Consumer clust runOnApplierThread(source, clusterStateConsumer, listener, Priority.HIGH); } + public ThreadPool threadPool() { + return threadPool; + } + @Override public void onNewClusterState(final String source, final Supplier clusterStateSupplier, final ClusterApplyListener listener) { @@ -383,12 +387,12 @@ protected void runTask(UpdateTask task) { logger.debug("processing [{}]: execute", task.source); final ClusterState previousClusterState = state.get(); - long startTimeNS = currentTimeInNanos(); + long startTimeMS = currentTimeInMillis(); final ClusterState newClusterState; try { newClusterState = task.apply(previousClusterState); } catch (Exception e) { - TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); + TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS)); logger.trace(() -> new ParameterizedMessage( "failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}", executionTime, previousClusterState.version(), task.source, previousClusterState), e); @@ -398,7 +402,7 @@ protected void runTask(UpdateTask task) { } if (previousClusterState == newClusterState) { - TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); + TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS)); logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime); warnAboutSlowTaskIfNeeded(executionTime, task.source); task.listener.onSuccess(task.source); @@ -411,14 +415,14 @@ protected void runTask(UpdateTask task) { } try { applyChanges(task, previousClusterState, newClusterState); - TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); + TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS)); logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source, executionTime, newClusterState.version(), newClusterState.stateUUID()); warnAboutSlowTaskIfNeeded(executionTime, task.source); task.listener.onSuccess(task.source); } catch (Exception e) { - TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); + TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS)); if (logger.isTraceEnabled()) { logger.warn(new ParameterizedMessage( "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}", @@ -617,8 +621,8 @@ public void run() { } // this one is overridden in tests so we can control time - protected long currentTimeInNanos() { - return System.nanoTime(); + protected long currentTimeInMillis() { + return threadPool.relativeTimeInMillis(); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java index 7f91513e3439a..740283736a22f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java @@ -88,7 +88,7 @@ public void tearDown() throws Exception { super.tearDown(); } - TimedClusterApplierService createTimedClusterService(boolean makeMaster) throws InterruptedException { + TimedClusterApplierService createTimedClusterService(boolean makeMaster) { DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); TimedClusterApplierService timedClusterApplierService = new TimedClusterApplierService(Settings.builder().put("cluster.name", @@ -141,9 +141,9 @@ public void testClusterStateUpdateLogging() throws Exception { Logger clusterLogger = LogManager.getLogger(ClusterApplierService.class); Loggers.addAppender(clusterLogger, mockAppender); try { - clusterApplierService.currentTimeOverride = System.nanoTime(); + clusterApplierService.currentTimeOverride = threadPool.relativeTimeInMillis(); clusterApplierService.runOnApplierThread("test1", - currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(), + currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).millis(), new ClusterApplyListener() { @Override public void onSuccess(String source) { } @@ -155,7 +155,7 @@ public void onFailure(String source, Exception e) { }); clusterApplierService.runOnApplierThread("test2", currentState -> { - clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(2).nanos(); + clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(2).millis(); throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task"); }, new ClusterApplyListener() { @@ -214,9 +214,9 @@ public void testLongClusterStateUpdateLogging() throws Exception { try { final CountDownLatch latch = new CountDownLatch(4); final CountDownLatch processedFirstTask = new CountDownLatch(1); - clusterApplierService.currentTimeOverride = System.nanoTime(); + clusterApplierService.currentTimeOverride = threadPool.relativeTimeInMillis(); clusterApplierService.runOnApplierThread("test1", - currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(), + currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).millis(), new ClusterApplyListener() { @Override public void onSuccess(String source) { @@ -232,7 +232,7 @@ public void onFailure(String source, Exception e) { processedFirstTask.await(); clusterApplierService.runOnApplierThread("test2", currentState -> { - clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(32).nanos(); + clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(32).millis(); throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task"); }, new ClusterApplyListener() { @@ -247,7 +247,7 @@ public void onFailure(String source, Exception e) { } }); clusterApplierService.runOnApplierThread("test3", - currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(34).nanos(), + currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(34).millis(), new ClusterApplyListener() { @Override public void onSuccess(String source) { @@ -510,11 +510,11 @@ static class TimedClusterApplierService extends ClusterApplierService { } @Override - protected long currentTimeInNanos() { + protected long currentTimeInMillis() { if (currentTimeOverride != null) { return currentTimeOverride; } - return super.currentTimeInNanos(); + return super.currentTimeInMillis(); } } }