Skip to content

Commit

Permalink
Logging: Add logging of slow cluster state tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
imotov committed Apr 30, 2015
1 parent aa968f6 commit c165afb
Show file tree
Hide file tree
Showing 4 changed files with 392 additions and 7 deletions.
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.cluster.service;

import com.google.common.collect.Iterables;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ClusterState.Builder;
Expand Down Expand Up @@ -59,6 +58,9 @@
*/
public class InternalClusterService extends AbstractLifecycleComponent<ClusterService> implements ClusterService {

public static final String SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD = "cluster.service.slow_task_logging_threshold";
public static final String SETTING_CLUSTER_SERVICE_RECONNECT_INTERVAL = "cluster.service.reconnect_interval";

public static final String UPDATE_THREAD_NAME = "clusterService#updateTask";
private final ThreadPool threadPool;

Expand All @@ -74,6 +76,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe

private final TimeValue reconnectInterval;

private TimeValue slowTaskLoggingThreshold;

private volatile PrioritizedEsThreadPoolExecutor updateTasksExecutor;

/**
Expand Down Expand Up @@ -115,8 +119,11 @@ public InternalClusterService(Settings settings, DiscoveryService discoveryServi
this.clusterState = ClusterState.builder(clusterName).build();

this.nodeSettingsService.setClusterService(this);
this.nodeSettingsService.addListener(new ApplySettings());

this.reconnectInterval = this.settings.getAsTime(SETTING_CLUSTER_SERVICE_RECONNECT_INTERVAL, TimeValue.timeValueSeconds(10));

this.reconnectInterval = this.settings.getAsTime("cluster.service.reconnect_interval", TimeValue.timeValueSeconds(10));
this.slowTaskLoggingThreshold = this.settings.getAsTime(SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD, TimeValue.timeValueSeconds(30));

localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);

Expand Down Expand Up @@ -371,29 +378,34 @@ public void run() {
return;
}
ClusterState newClusterState;
long startTime = System.currentTimeMillis();
try {
newClusterState = updateTask.execute(previousClusterState);
} catch (Throwable e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, System.currentTimeMillis() - startTime));
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("failed to execute cluster state update, state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
sb.append(previousClusterState.nodes().prettyPrint());
sb.append(previousClusterState.routingTable().prettyPrint());
sb.append(previousClusterState.readOnlyRoutingNodes().prettyPrint());
logger.trace(sb.toString(), e);
}
warnAboutSlowTaskIfNeeded(executionTime, source);
updateTask.onFailure(source, e);
return;
}

if (previousClusterState == newClusterState) {
logger.debug("processing [{}]: no change in cluster_state", source);
if (updateTask instanceof AckedClusterStateUpdateTask) {
//no need to wait for ack if nothing changed, the update can be counted as acknowledged
((AckedClusterStateUpdateTask) updateTask).onAllNodesAcked(null);
}
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, System.currentTimeMillis() - startTime));
logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, source);
return;
}

Expand Down Expand Up @@ -511,9 +523,12 @@ public void run() {
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
}

logger.debug("processing [{}]: done applying updated cluster_state (version: {}, uuid: {})", source, newClusterState.version(), newClusterState.uuid());
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, System.currentTimeMillis() - startTime));
logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.uuid());
warnAboutSlowTaskIfNeeded(executionTime, source);
} catch (Throwable t) {
StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(newClusterState.version()).append("], uuid [").append(newClusterState.uuid()).append("], source [").append(source).append("]\n");
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, System.currentTimeMillis() - startTime));
StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append(newClusterState.version()).append("], uuid [").append(newClusterState.uuid()).append("], source [").append(source).append("]\n");
sb.append(newClusterState.nodes().prettyPrint());
sb.append(newClusterState.routingTable().prettyPrint());
sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint());
Expand All @@ -523,6 +538,12 @@ public void run() {
}
}

private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) {
if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) {
logger.warn("cluster state update task [{}] took {} above the warn threshold of {}", source, executionTime, slowTaskLoggingThreshold);
}
}

class NotifyTimeout implements Runnable {
final TimeoutClusterStateListener listener;
final TimeValue timeout;
Expand Down Expand Up @@ -755,4 +776,13 @@ public void onTimeout() {
}
}
}

class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
final TimeValue slowTaskLoggingThreshold = settings.getAsTime(SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD, InternalClusterService.this.slowTaskLoggingThreshold);
InternalClusterService.this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
}
}

}
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.*;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ZenDiscovery;
Expand Down Expand Up @@ -101,6 +102,7 @@ public ClusterDynamicSettingsModule() {
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
clusterDynamicSettings.addDynamicSetting(InternalClusterService.SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD, Validator.TIME_NON_NEGATIVE);
}

public void addDynamicSettings(String... settings) {
Expand Down
216 changes: 215 additions & 1 deletion src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java
Expand Up @@ -21,10 +21,12 @@
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.ListenableFuture;

import org.elasticsearch.ElasticsearchException;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
Expand All @@ -38,6 +40,8 @@
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Test;

Expand All @@ -48,6 +52,7 @@

import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*;

/**
Expand Down Expand Up @@ -721,6 +726,215 @@ public void testPrioritizedTasks() throws Exception {
}
}

@Test
@TestLogging("cluster:TRACE") // To ensure that we log cluster state events on TRACE level
public void testClusterStateUpdateLogging() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.build();
internalCluster().startNode(settings);
ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class);
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", "cluster.service", Level.DEBUG, "*processing [test1]: took * no change in cluster_state"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", "cluster.service", Level.TRACE, "*failed to execute cluster state update in *"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", "cluster.service", Level.DEBUG, "*processing [test3]: took * done applying updated cluster_state (version: *, uuid: *)"));

Logger rootLogger = Logger.getRootLogger();
rootLogger.addAppender(mockAppender);
try {
final CountDownLatch latch = new CountDownLatch(4);
clusterService1.submitStateUpdateTask("test1", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return currentState;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}

@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
clusterService1.submitStateUpdateTask("test2", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail();
}

@Override
public void onFailure(String source, Throwable t) {
latch.countDown();
}
});
clusterService1.submitStateUpdateTask("test3", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).incrementVersion().build();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}

@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
// Additional update task to make sure all previous logging made it to the logger
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterService1.submitStateUpdateTask("test4", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}

@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
} finally {
rootLogger.removeAppender(mockAppender);
}
mockAppender.assertAllExpectationsMatched();
}

@Test
@TestLogging("cluster:WARN") // To ensure that we log cluster state events on WARN level
public void testLongClusterStateUpdateLogging() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.put(InternalClusterService.SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD, "10s")
.build();
internalCluster().startNode(settings);
ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class);
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation("test1 shouldn't see because setting is too low", "cluster.service", Level.WARN, "*cluster state update task [test1] took * above the warn threshold of *"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", "cluster.service", Level.WARN, "*cluster state update task [test2] took * above the warn threshold of 10ms"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", "cluster.service", Level.WARN, "*cluster state update task [test3] took * above the warn threshold of 10ms"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test4", "cluster.service", Level.WARN, "*cluster state update task [test4] took * above the warn threshold of 10ms"));

Logger rootLogger = Logger.getRootLogger();
rootLogger.addAppender(mockAppender);
try {
final CountDownLatch latch = new CountDownLatch(5);
final CountDownLatch processedFirstTask = new CountDownLatch(1);
clusterService1.submitStateUpdateTask("test1", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
return currentState;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
processedFirstTask.countDown();
}

@Override
public void onFailure(String source, Throwable t) {
fail();
}
});

processedFirstTask.await(1, TimeUnit.SECONDS);
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder()
.put(InternalClusterService.SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD, "10ms")));

clusterService1.submitStateUpdateTask("test2", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail();
}

@Override
public void onFailure(String source, Throwable t) {
latch.countDown();
}
});
clusterService1.submitStateUpdateTask("test3", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
return ClusterState.builder(currentState).incrementVersion().build();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}

@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
clusterService1.submitStateUpdateTask("test4", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
return currentState;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}

@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
// Additional update task to make sure all previous logging made it to the logger
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterService1.submitStateUpdateTask("test5", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}

@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
assertThat(latch.await(5, TimeUnit.SECONDS), equalTo(true));
} finally {
rootLogger.removeAppender(mockAppender);
}
mockAppender.assertAllExpectationsMatched();
}

private static class BlockingTask extends ClusterStateUpdateTask {
private final CountDownLatch latch = new CountDownLatch(1);

Expand Down

0 comments on commit c165afb

Please sign in to comment.