Skip to content

Commit

Permalink
[RECOVERY] Don't recover from buggy version
Browse files Browse the repository at this point in the history
This commit forces a full recovery if the source node is < 1.4.0 and
prevents any recoveries from pre 1.3.2 nodes if compression is enabled to
work around elastic#7210

Closes elastic#9922
  • Loading branch information
s1monw committed Mar 2, 2015
1 parent 3896cf2 commit dd78370
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 23 deletions.
Expand Up @@ -19,13 +19,15 @@

package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.recovery.RecoverySettings;

/**
* An allocation decider that prevents relocation or allocation from nodes
Expand All @@ -37,10 +39,12 @@
public class NodeVersionAllocationDecider extends AllocationDecider {

public static final String NAME = "node_version";
private final RecoverySettings recoverySettings;

@Inject
public NodeVersionAllocationDecider(Settings settings) {
public NodeVersionAllocationDecider(Settings settings, RecoverySettings recoverySettings) {
super(settings);
this.recoverySettings = recoverySettings;
}

@Override
Expand All @@ -65,6 +69,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, RoutingAllocation allocation) {
final RoutingNode source = routingNodes.node(sourceNodeId);
if (source.node().version().before(Version.V_1_3_2) && recoverySettings.compress()) { // never recover from pre 1.3.2 with compression enabled
return allocation.decision(Decision.NO, NAME, "source node version [%s] is prone to corruption bugs with %s = true see issue #7210 for details",
source.node().version(), RecoverySettings.INDICES_RECOVERY_COMPRESS);
}
if (target.node().version().onOrAfter(source.node().version())) {
/* we can allocate if we can recover from a node that is younger or on the same version
* if the primary is already running on a newer version that won't work due to possible
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.bootstrap.Elasticsearch;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -149,19 +150,37 @@ protected void retryRecovery(final RecoveryStatus recoveryStatus, final String r
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryStatus.recoveryId()));
}

// pkd private for testing
Map<String, StoreFileMetaData> existingFiles(DiscoveryNode sourceNode, Store store) throws IOException {
final Version sourceNodeVersion = sourceNode.version();
if (sourceNodeVersion.onOrAfter(Version.V_1_4_0)) {
return store.getMetadataOrEmpty().asMap();
} else {
logger.debug("Force full recovery source node version {}", sourceNodeVersion);
// force full recovery if we recover from nodes < 1.4.0
return Collections.EMPTY_MAP;
}
}

private void doRecovery(final RecoveryStatus recoveryStatus) {
assert recoveryStatus.sourceNode() != null : "can't do a recovery without a source node";

logger.trace("collecting local files for {}", recoveryStatus);
final Map<String, StoreFileMetaData> existingFiles;
try {
existingFiles = recoveryStatus.store().getMetadataOrEmpty().asMap();
existingFiles = existingFiles(recoveryStatus.sourceNode(), recoveryStatus.store());
} catch (Exception e) {
logger.debug("error while listing local files, recovery as if there are none", e);
logger.debug("error while listing local files", e);
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(),
new RecoveryFailedException(recoveryStatus.state(), "failed to list local files", e), true);
return;
}
final Version sourceNodeVersion = recoveryStatus.sourceNode().version();
if (sourceNodeVersion.before(Version.V_1_3_2) && recoverySettings.compress()) { // don't recover from pre 1.3.2 if compression is on?
throw new ElasticsearchIllegalStateException("Can't recovery from node "
+ recoveryStatus.sourceNode() + " with [" + RecoverySettings.INDICES_RECOVERY_COMPRESS
+ " : true] due to compression bugs - see issue #7210 for details" );
}
final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(),
false, existingFiles, recoveryStatus.state().getType(), recoveryStatus.recoveryId());

Expand Down
Expand Up @@ -81,7 +81,6 @@

/**
*/
@TestLogging("index.translog.fs:TRACE")
public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompatIntegrationTest {

/**
Expand Down
Expand Up @@ -25,13 +25,11 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Test;

Expand Down Expand Up @@ -83,23 +81,19 @@ public void testDoNotAllocateFromPrimary() {

logger.info("start two nodes and fully start the shards");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).replicaShardsWithState(UNASSIGNED).size(), equalTo(2));

}

logger.info("start all the primary shards, replicas will start initializing");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
Expand All @@ -109,10 +103,8 @@ public void testDoNotAllocateFromPrimary() {
}

routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
Expand All @@ -124,10 +116,8 @@ public void testDoNotAllocateFromPrimary() {
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node3", getPreviousVersion())))
.build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
Expand All @@ -140,10 +130,8 @@ public void testDoNotAllocateFromPrimary() {
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node4")))
.build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
Expand All @@ -153,10 +141,8 @@ public void testDoNotAllocateFromPrimary() {
}

routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
Expand Down Expand Up @@ -335,7 +321,79 @@ private final void assertRecoveryNodeVersions(RoutingNodes routingNodes) {
assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version()));
}
}
}

public void testFailRecoverFromPre132WithCompression() {
final boolean compress = randomBoolean();
AllocationService service = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "INDICES_ALL_ACTIVE")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.put(RecoverySettings.INDICES_RECOVERY_COMPRESS, compress)
.build());

logger.info("Building initial routing table");

MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(1))
.build();

RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();

ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();

assertThat(routingTable.index("test").shards().size(), equalTo(1));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
for (ShardRouting shard : routingTable.index("test").shard(i).shards()) {
assertEquals(shard.state(), UNASSIGNED);
assertNull(shard.currentNodeId());
}
}
Version version = randomVersion();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("old0", version))).build();
clusterState = stabilize(clusterState, service);
routingTable = clusterState.routingTable();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertEquals(routingTable.index("test").shard(i).shards().size(), 2);
for (ShardRouting shard : routingTable.index("test").shard(i).shards()) {
if (shard.primary()) {
assertEquals(shard.state(), STARTED);
assertEquals(shard.currentNodeId(), "old0");
} else {
assertEquals(shard.state(), UNASSIGNED);
assertNull(shard.currentNodeId());
}
}
}

clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("old0", version))
.put(newNode("new0"))).build();

clusterState = stabilize(clusterState, service);
routingTable = clusterState.routingTable();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertEquals(routingTable.index("test").shard(i).shards().size(), 2);
for (ShardRouting shard : routingTable.index("test").shard(i).shards()) {
if (shard.primary()) {
assertEquals(shard.state(), STARTED);
assertEquals(shard.currentNodeId(), "old0");
} else {
if (version.before(Version.V_1_3_2) && compress) { // can't recover from pre 1.3.2 with compression enabled
assertEquals(shard.state(), UNASSIGNED);
assertNull(shard.currentNodeId());
} else {
assertEquals(shard.state(), STARTED);
assertEquals(shard.currentNodeId(), "new0");
}
}
}


}
}
}
@@ -0,0 +1,71 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;

import java.io.IOException;
import java.util.Map;

/**
*
*/
public class RecoveryTargetTests extends ElasticsearchSingleNodeTest {

public void testFullRecoveryFromPre14() throws IOException {
createIndex("test");
int numDocs = scaledRandomIntBetween(10, 100);
for (int j = 0; j < numDocs; ++j) {
String id = Integer.toString(j);
client().prepareIndex("test", "type1", id).setSource("text", "sometext").get();
}
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).setForce(true).get();
RecoveryTarget recoveryTarget = getInstanceFromNode(RecoveryTarget.class);
IndexService idxService = getInstanceFromNode(IndicesService.class).indexService("test");
Store store = idxService.shard(0).store();
store.incRef();
try {
DiscoveryNode discoveryNode = new DiscoveryNode("123", new LocalTransportAddress("123"), Version.CURRENT);
Map<String, StoreFileMetaData> metaDataMap = recoveryTarget.existingFiles(discoveryNode, store);
assertTrue(metaDataMap.size() > 0);
int iters = randomIntBetween(10, 20);
for (int i = 0; i < iters; i++) {
Version version = randomVersion();
DiscoveryNode discoNode = new DiscoveryNode("123", new LocalTransportAddress("123"), version);
Map<String, StoreFileMetaData> map = recoveryTarget.existingFiles(discoNode, store);
if (version.before(Version.V_1_4_0)) {
assertTrue(map.isEmpty());
} else {
assertEquals(map.size(), metaDataMap.size());
}

}
} finally {
store.decRef();
}

}
}
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.node.settings.NodeSettingsService;

import java.lang.reflect.Constructor;
Expand Down Expand Up @@ -64,16 +65,21 @@ public static AllocationService createAllocationService(Settings settings, Rando

public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
final ImmutableSet<Class<? extends AllocationDecider>> defaultAllocationDeciders = AllocationDecidersModule.DEFAULT_ALLOCATION_DECIDERS;
final RecoverySettings recoverySettings = new RecoverySettings(settings, nodeSettingsService);
final List<AllocationDecider> list = new ArrayList<>();
for (Class<? extends AllocationDecider> deciderClass : defaultAllocationDeciders) {
try {
try {
Constructor<? extends AllocationDecider> constructor = deciderClass.getConstructor(Settings.class, NodeSettingsService.class);
list.add(constructor.newInstance(settings, nodeSettingsService));
} catch (NoSuchMethodException e) {
Constructor<? extends AllocationDecider> constructor = null;
constructor = deciderClass.getConstructor(Settings.class);
list.add(constructor.newInstance(settings));
try {
Constructor<? extends AllocationDecider> constructor = deciderClass.getConstructor(Settings.class);
list.add(constructor.newInstance(settings));
} catch (NoSuchMethodException e1) {
Constructor<? extends AllocationDecider> constructor = deciderClass.getConstructor(Settings.class, RecoverySettings.class);
list.add(constructor.newInstance(settings, recoverySettings));
}
}
} catch (Exception ex) {
throw new RuntimeException(ex);
Expand Down

0 comments on commit dd78370

Please sign in to comment.