Skip to content

Commit

Permalink
Internal: ClusterInfoService should wipe local cache upon unknown exc…
Browse files Browse the repository at this point in the history
…eptions

 The InternalClusterInfoService reaches out to the nodes to get information about their disk usage and shard store size. Upon a node level error we currently remove the node info from the local cache. We should also clear the cache when we run into an error on the action level (excluding any info from all nodes).

 This also adds settings for the timeout used when waiting for nodes.

Closes elastic#9449
  • Loading branch information
bleskes committed Jan 27, 2015
1 parent 0bece08 commit 4e471dc
Show file tree
Hide file tree
Showing 4 changed files with 312 additions and 88 deletions.
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;

import java.util.*;
import java.util.concurrent.CountDownLatch;
Expand All @@ -61,13 +62,15 @@
public class InternalClusterInfoService extends AbstractComponent implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener {

public static final String INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL = "cluster.info.update.interval";
public static final String INTERNAL_CLUSTER_INFO_TIMEOUT = "cluster.info.update.timeout";

private volatile TimeValue updateFrequency;

private volatile ImmutableMap<String, DiskUsage> usages;
private volatile ImmutableMap<String, Long> shardSizes;
private volatile boolean isMaster = false;
private volatile boolean enabled;
private volatile TimeValue fetchTimeout;
private final TransportNodesStatsAction transportNodesStatsAction;
private final TransportIndicesStatsAction transportIndicesStatsAction;
private final ClusterService clusterService;
Expand All @@ -87,6 +90,7 @@ public InternalClusterInfoService(Settings settings, NodeSettingsService nodeSet
this.clusterService = clusterService;
this.threadPool = threadPool;
this.updateFrequency = settings.getAsTime(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, TimeValue.timeValueSeconds(30));
this.fetchTimeout = settings.getAsTime(INTERNAL_CLUSTER_INFO_TIMEOUT, TimeValue.timeValueSeconds(15));
this.enabled = settings.getAsBoolean(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true);
nodeSettingsService.addListener(new ApplySettings());

Expand All @@ -113,6 +117,13 @@ public void onRefreshSettings(Settings settings) {
}
}

TimeValue newFetchTimeout = settings.getAsTime(INTERNAL_CLUSTER_INFO_TIMEOUT, null);
if (newFetchTimeout != null) {
logger.info("updating fetch timeout [{}] from [{}] to [{}]", INTERNAL_CLUSTER_INFO_TIMEOUT, fetchTimeout, newFetchTimeout);
InternalClusterInfoService.this.fetchTimeout = newFetchTimeout;
}


// We don't log about enabling it here, because the DiskThresholdDecider will already be logging about enable/disable
if (newEnabled != null) {
InternalClusterInfoService.this.enabled = newEnabled;
Expand All @@ -131,7 +142,7 @@ public void onMaster() {
threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
if (clusterService.state().getNodes().getDataNodes().size() > 1) {
// Submit an info update job to be run immediately
threadPool.executor(executorName()).execute(new ClusterInfoUpdateJob(false));
updateOnce();
}
} catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
Expand All @@ -140,6 +151,16 @@ public void onMaster() {
}
}


// called from tests as well

/**
* will collect a fresh {@link ClusterInfo} from the nodes, without scheduling a future collection
*/
void updateOnce() {
threadPool.executor(executorName()).execute(new ClusterInfoUpdateJob(false));
}

@Override
public void offMaster() {
this.isMaster = false;
Expand Down Expand Up @@ -169,7 +190,7 @@ public void clusterChanged(ClusterChangedEvent event) {
if (logger.isDebugEnabled()) {
logger.debug("data node was added, retrieving new cluster info");
}
threadPool.executor(executorName()).execute(new ClusterInfoUpdateJob(false));
updateOnce();
}

if (this.isMaster && event.nodesRemoved()) {
Expand Down Expand Up @@ -227,7 +248,7 @@ protected CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse
final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
nodesStatsRequest.clear();
nodesStatsRequest.fs(true);
nodesStatsRequest.timeout(TimeValue.timeValueSeconds(15));
nodesStatsRequest.timeout(fetchTimeout);

transportNodesStatsAction.execute(nodesStatsRequest, new LatchedActionListener<>(listener, latch));
return latch;
Expand Down Expand Up @@ -316,12 +337,18 @@ public void onResponse(NodesStatsResponse nodeStatses) {

@Override
public void onFailure(Throwable e) {
if (e instanceof ClusterBlockException) {
if (logger.isTraceEnabled()) {
logger.trace("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e);
}
if (e instanceof ReceiveTimeoutTransportException) {
logger.error("NodeStatsAction timed out for ClusterInfoUpdateJob (reason [{}])", e.getMessage());
} else {
logger.warn("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e);
if (e instanceof ClusterBlockException) {
if (logger.isTraceEnabled()) {
logger.trace("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e);
}
} else {
logger.warn("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e);
}
// we empty the usages list, to be safe - we don't know what's going on.
usages = ImmutableMap.of();
}
}
});
Expand All @@ -344,24 +371,30 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {

@Override
public void onFailure(Throwable e) {
if (e instanceof ClusterBlockException) {
if (logger.isTraceEnabled()) {
logger.trace("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e);
}
if (e instanceof ReceiveTimeoutTransportException) {
logger.error("IndicesStatsAction timed out for ClusterInfoUpdateJob (reason [{}])", e.getMessage());
} else {
logger.warn("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e);
if (e instanceof ClusterBlockException) {
if (logger.isTraceEnabled()) {
logger.trace("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e);
}
} else {
logger.warn("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e);
}
// we empty the usages list, to be safe - we don't know what's going on.
shardSizes = ImmutableMap.of();
}
}
});

try {
nodeLatch.await(15, TimeUnit.SECONDS);
nodeLatch.await(fetchTimeout.getMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.warn("Failed to update node information for ClusterInfoUpdateJob within 15s timeout");
}

try {
indicesLatch.await(15, TimeUnit.SECONDS);
indicesLatch.await(fetchTimeout.getMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.warn("Failed to update shard information for ClusterInfoUpdateJob within 15s timeout");
}
Expand Down
Expand Up @@ -88,6 +88,7 @@ public ClusterDynamicSettingsModule() {
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, Validator.BOOLEAN);
clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT, Validator.TIME_NON_NEGATIVE);
clusterDynamicSettings.addDynamicSetting(SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED);
clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME);
clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE);
Expand Down

0 comments on commit 4e471dc

Please sign in to comment.