Skip to content

Commit

Permalink
[Gateway] set a default of 5m to recover_after_time when any to the…
Browse files Browse the repository at this point in the history
… `expected*Nodes` is set

The `recovery_after_time` tells the gateway to wait before starting recovery from disk. The goal here is to allow for more nodes to join the cluster and thus not start potentially unneeded replications. The `expectedNodes` setting (and friends) tells the gateway when it can start recovering even if the `recover_after_time` has not yet elapsed. However, `expectedNodes` is useless if one doesn't set `recovery_after_time`. This commit changes that by setting a sensible default of 5m for `recover_after_time` *if* a `expectedNodes` setting is present.

Closes #6742
  • Loading branch information
bleskes committed Jul 11, 2014
1 parent cdca9f6 commit 84b654a
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 78 deletions.
6 changes: 3 additions & 3 deletions docs/reference/modules/gateway.asciidoc
Expand Up @@ -42,14 +42,14 @@ once all `gateway.recover_after...nodes` conditions are met.

The `gateway.expected_nodes` allows to set how many data and master
eligible nodes are expected to be in the cluster, and once met, the
`recover_after_time` is ignored and recovery starts. The
`gateway.expected_data_nodes` and `gateway.expected_master_nodes`
`gateway.recover_after_time` is ignored and recovery starts.
Setting `gateway.expected_nodes` also defaults `gateway.recovery_after_time` to `5m` coming[1.3.0, before `expected_nodes`
required `recovery_after_time` to be set]. The `gateway.expected_data_nodes` and `gateway.expected_master_nodes`
settings are also supported. For example setting:

[source,js]
--------------------------------------------------
gateway:
recover_after_nodes: 1
recover_after_time: 5m
expected_nodes: 2
--------------------------------------------------
Expand Down
5 changes: 2 additions & 3 deletions docs/reference/modules/gateway/local.asciidoc
Expand Up @@ -18,9 +18,8 @@ For example:
[source,js]
--------------------------------------------------
gateway:
recover_after_nodes: 1
recover_after_time: 5m
expected_nodes: 2
recover_after_nodes: 3
expected_nodes: 5
--------------------------------------------------

[float]
Expand Down
140 changes: 68 additions & 72 deletions src/main/java/org/elasticsearch/gateway/GatewayService.java
Expand Up @@ -50,6 +50,8 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i

public static final ClusterBlock STATE_NOT_RECOVERED_BLOCK = new ClusterBlock(1, "state not recovered / initialized", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);

public static final TimeValue DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET = TimeValue.timeValueMinutes(5);

private final Gateway gateway;

private final ThreadPool threadPool;
Expand Down Expand Up @@ -81,14 +83,20 @@ public GatewayService(Settings settings, Gateway gateway, AllocationService allo
this.discoveryService = discoveryService;
this.threadPool = threadPool;
// allow to control a delay of when indices will get created
this.recoverAfterTime = componentSettings.getAsTime("recover_after_time", null);
this.recoverAfterNodes = componentSettings.getAsInt("recover_after_nodes", -1);
this.expectedNodes = componentSettings.getAsInt("expected_nodes", -1);
this.recoverAfterDataNodes = componentSettings.getAsInt("recover_after_data_nodes", -1);
this.expectedDataNodes = componentSettings.getAsInt("expected_data_nodes", -1);
this.expectedMasterNodes = componentSettings.getAsInt("expected_master_nodes", -1);

TimeValue defaultRecoverAfterTime = null;
if (expectedNodes >= 0 || expectedDataNodes >= 0 || expectedMasterNodes >= 0) {
defaultRecoverAfterTime = DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET;
}

this.recoverAfterTime = componentSettings.getAsTime("recover_after_time", defaultRecoverAfterTime);
this.recoverAfterNodes = componentSettings.getAsInt("recover_after_nodes", -1);
this.recoverAfterDataNodes = componentSettings.getAsInt("recover_after_data_nodes", -1);
// default the recover after master nodes to the minimum master nodes in the discovery
this.recoverAfterMasterNodes = componentSettings.getAsInt("recover_after_master_nodes", settings.getAsInt("discovery.zen.minimum_master_nodes", -1));
this.expectedMasterNodes = componentSettings.getAsInt("expected_master_nodes", -1);

// Add the not recovered as initial state block, we don't allow anything until
this.clusterService.addInitialStateBlock(STATE_NOT_RECOVERED_BLOCK);
Expand All @@ -101,36 +109,8 @@ protected void doStart() throws ElasticsearchException {
// node from starting until we recovered properly
if (discoveryService.initialStateReceived()) {
ClusterState clusterState = clusterService.state();
DiscoveryNodes nodes = clusterState.nodes();
if (clusterState.nodes().localNodeMaster() && clusterState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
if (clusterState.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) {
logger.debug("not recovering from gateway, no master elected yet");
} else if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) {
logger.debug("not recovering from gateway, nodes_size (data+master) [" + nodes.masterAndDataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
} else if (recoverAfterDataNodes != -1 && nodes.dataNodes().size() < recoverAfterDataNodes) {
logger.debug("not recovering from gateway, nodes_size (data) [" + nodes.dataNodes().size() + "] < recover_after_data_nodes [" + recoverAfterDataNodes + "]");
} else if (recoverAfterMasterNodes != -1 && nodes.masterNodes().size() < recoverAfterMasterNodes) {
logger.debug("not recovering from gateway, nodes_size (master) [" + nodes.masterNodes().size() + "] < recover_after_master_nodes [" + recoverAfterMasterNodes + "]");
} else {
boolean ignoreRecoverAfterTime;
if (expectedNodes == -1 && expectedMasterNodes == -1 && expectedDataNodes == -1) {
// no expected is set, don't ignore the timeout
ignoreRecoverAfterTime = false;
} else {
// one of the expected is set, see if all of them meet the need, and ignore the timeout in this case
ignoreRecoverAfterTime = true;
if (expectedNodes != -1 && (nodes.masterAndDataNodes().size() < expectedNodes)) { // does not meet the expected...
ignoreRecoverAfterTime = false;
}
if (expectedMasterNodes != -1 && (nodes.masterNodes().size() < expectedMasterNodes)) { // does not meet the expected...
ignoreRecoverAfterTime = false;
}
if (expectedDataNodes != -1 && (nodes.dataNodes().size() < expectedDataNodes)) { // does not meet the expected...
ignoreRecoverAfterTime = false;
}
}
performStateRecovery(ignoreRecoverAfterTime);
}
checkStateMeetsSettingsAndMaybeRecover(clusterState, false);
}
} else {
logger.debug("can't wait on start for (possibly) reading state from gateway, will do it asynchronously");
Expand Down Expand Up @@ -161,65 +141,75 @@ public void clusterChanged(final ClusterChangedEvent event) {
scheduledRecovery.set(false);
}
if (event.localNodeMaster() && event.state().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
ClusterState clusterState = event.state();
DiscoveryNodes nodes = clusterState.nodes();
if (event.state().blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) {
logger.debug("not recovering from gateway, no master elected yet");
} else if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) {
logger.debug("not recovering from gateway, nodes_size (data+master) [" + nodes.masterAndDataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
} else if (recoverAfterDataNodes != -1 && nodes.dataNodes().size() < recoverAfterDataNodes) {
logger.debug("not recovering from gateway, nodes_size (data) [" + nodes.dataNodes().size() + "] < recover_after_data_nodes [" + recoverAfterDataNodes + "]");
} else if (recoverAfterMasterNodes != -1 && nodes.masterNodes().size() < recoverAfterMasterNodes) {
logger.debug("not recovering from gateway, nodes_size (master) [" + nodes.masterNodes().size() + "] < recover_after_master_nodes [" + recoverAfterMasterNodes + "]");
checkStateMeetsSettingsAndMaybeRecover(event.state(), true);
}
}

protected void checkStateMeetsSettingsAndMaybeRecover(ClusterState state, boolean asyncRecovery) {
DiscoveryNodes nodes = state.nodes();
if (state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) {
logger.debug("not recovering from gateway, no master elected yet");
} else if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) {
logger.debug("not recovering from gateway, nodes_size (data+master) [" + nodes.masterAndDataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
} else if (recoverAfterDataNodes != -1 && nodes.dataNodes().size() < recoverAfterDataNodes) {
logger.debug("not recovering from gateway, nodes_size (data) [" + nodes.dataNodes().size() + "] < recover_after_data_nodes [" + recoverAfterDataNodes + "]");
} else if (recoverAfterMasterNodes != -1 && nodes.masterNodes().size() < recoverAfterMasterNodes) {
logger.debug("not recovering from gateway, nodes_size (master) [" + nodes.masterNodes().size() + "] < recover_after_master_nodes [" + recoverAfterMasterNodes + "]");
} else {
boolean enforceRecoverAfterTime;
String reason;
if (expectedNodes == -1 && expectedMasterNodes == -1 && expectedDataNodes == -1) {
// no expected is set, honor the setting if they are there
enforceRecoverAfterTime = true;
reason = "recovery_after_time was set to [" + recoverAfterTime + "]";
} else {
boolean ignoreRecoverAfterTime;
if (expectedNodes == -1 && expectedMasterNodes == -1 && expectedDataNodes == -1) {
// no expected is set, don't ignore the timeout
ignoreRecoverAfterTime = false;
} else {
// one of the expected is set, see if all of them meet the need, and ignore the timeout in this case
ignoreRecoverAfterTime = true;
if (expectedNodes != -1 && (nodes.masterAndDataNodes().size() < expectedNodes)) { // does not meet the expected...
ignoreRecoverAfterTime = false;
}
if (expectedMasterNodes != -1 && (nodes.masterNodes().size() < expectedMasterNodes)) { // does not meet the expected...
ignoreRecoverAfterTime = false;
}
if (expectedDataNodes != -1 && (nodes.dataNodes().size() < expectedDataNodes)) { // does not meet the expected...
ignoreRecoverAfterTime = false;
}
// one of the expected is set, see if all of them meet the need, and ignore the timeout in this case
enforceRecoverAfterTime = false;
reason = "";
if (expectedNodes != -1 && (nodes.masterAndDataNodes().size() < expectedNodes)) { // does not meet the expected...
enforceRecoverAfterTime = true;
reason = "expecting [" + expectedNodes + "] nodes, but only have [" + nodes.masterAndDataNodes().size() + "]";
} else if (expectedDataNodes != -1 && (nodes.dataNodes().size() < expectedDataNodes)) { // does not meet the expected...
enforceRecoverAfterTime = true;
reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.dataNodes().size() + "]";
} else if (expectedMasterNodes != -1 && (nodes.masterNodes().size() < expectedMasterNodes)) { // does not meet the expected...
enforceRecoverAfterTime = true;
reason = "expecting [" + expectedMasterNodes + "] master nodes, but only have [" + nodes.masterNodes().size() + "]";
}
final boolean fIgnoreRecoverAfterTime = ignoreRecoverAfterTime;
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
performStateRecovery(fIgnoreRecoverAfterTime);
}
});
}
performStateRecovery(asyncRecovery, enforceRecoverAfterTime, reason);
}
}

private void performStateRecovery(boolean ignoreRecoverAfterTime) {
private void performStateRecovery(boolean asyncRecovery, boolean enforceRecoverAfterTime, String reason) {
final Gateway.GatewayStateRecoveredListener recoveryListener = new GatewayRecoveryListener(new CountDownLatch(1));

if (!ignoreRecoverAfterTime && recoverAfterTime != null) {
if (enforceRecoverAfterTime && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) {
logger.debug("delaying initial state recovery for [{}]", recoverAfterTime);
logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason);
threadPool.schedule(recoverAfterTime, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
if (recovered.compareAndSet(false, true)) {
logger.trace("performing state recovery...");
logger.info("recovery_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
gateway.performStateRecovery(recoveryListener);
}
}
});
}
} else {
if (recovered.compareAndSet(false, true)) {
logger.trace("performing state recovery...");
gateway.performStateRecovery(recoveryListener);
if (asyncRecovery) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
gateway.performStateRecovery(recoveryListener);
}
});
} else {
logger.trace("performing state recovery...");
gateway.performStateRecovery(recoveryListener);
}
}
}
}
Expand Down Expand Up @@ -300,4 +290,10 @@ public void onFailure(String message) {
logger.info("metadata state not restored, reason: {}", message);
}
}

// used for testing
public TimeValue recoverAfterTime() {
return recoverAfterTime;
}

}
68 changes: 68 additions & 0 deletions src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java
@@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.gateway;

import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.cluster.NoopClusterService;
import org.hamcrest.Matchers;
import org.junit.Test;

import java.io.IOException;


public class GatewayServiceTests extends ElasticsearchTestCase {


private GatewayService createService(ImmutableSettings.Builder settings) {
return new GatewayService(ImmutableSettings.builder()
.put("http.enabled", "false")
.put("discovery.type", "local")
.put(settings.build()).build(), null, null, new NoopClusterService(), null, null);

}

@Test
public void testDefaultRecoverAfterTime() throws IOException {

// check that the default is not set
GatewayService service = createService(ImmutableSettings.builder());
assertNull(service.recoverAfterTime());

// ensure default is set when setting expected_nodes
service = createService(ImmutableSettings.builder().put("gateway.expected_nodes", 1));
assertThat(service.recoverAfterTime(), Matchers.equalTo(GatewayService.DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET));

// ensure default is set when setting expected_data_nodes
service = createService(ImmutableSettings.builder().put("gateway.expected_data_nodes", 1));
assertThat(service.recoverAfterTime(), Matchers.equalTo(GatewayService.DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET));

// ensure default is set when setting expected_master_nodes
service = createService(ImmutableSettings.builder().put("gateway.expected_master_nodes", 1));
assertThat(service.recoverAfterTime(), Matchers.equalTo(GatewayService.DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET));

// ensure settings override default
TimeValue timeValue = TimeValue.timeValueHours(3);
// ensure default is set when setting expected_nodes
service = createService(ImmutableSettings.builder().put("gateway.expected_nodes", 1).put("gateway.recover_after_time", timeValue.toString()));
assertThat(service.recoverAfterTime().millis(), Matchers.equalTo(timeValue.millis()));
}
}

0 comments on commit 84b654a

Please sign in to comment.