diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java index 1d9c4df75711e..3ec816a2e21c3 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing.allocation.decider; +import org.elasticsearch.Version; import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; @@ -26,6 +27,7 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.recovery.RecoverySettings; /** * An allocation decider that prevents relocation or allocation from nodes @@ -37,10 +39,12 @@ public class NodeVersionAllocationDecider extends AllocationDecider { public static final String NAME = "node_version"; + private final RecoverySettings recoverySettings; @Inject - public NodeVersionAllocationDecider(Settings settings) { + public NodeVersionAllocationDecider(Settings settings, RecoverySettings recoverySettings) { super(settings); + this.recoverySettings = recoverySettings; } @Override @@ -65,6 +69,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, RoutingAllocation allocation) { final RoutingNode source = routingNodes.node(sourceNodeId); + if (source.node().version().before(Version.V_1_3_2) && recoverySettings.compress()) { // never recover from pre 1.3.2 with compression enabled + return allocation.decision(Decision.NO, NAME, "source node version [%s] is prone to corruption bugs with %s = true see issue #7210 for details", + source.node().version(), RecoverySettings.INDICES_RECOVERY_COMPRESS); + } if (target.node().version().onOrAfter(source.node().version())) { /* we can allocate if we can recover from a node that is younger or on the same version * if the primary is already running on a newer version that won't work due to possible diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 7f2779b17b026..cd11746bc7a1a 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -26,6 +26,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; +import org.elasticsearch.bootstrap.Elasticsearch; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; @@ -149,19 +150,37 @@ protected void retryRecovery(final RecoveryStatus recoveryStatus, final String r threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryStatus.recoveryId())); } + // pkd private for testing + Map existingFiles(DiscoveryNode sourceNode, Store store) throws IOException { + final Version sourceNodeVersion = sourceNode.version(); + if (sourceNodeVersion.onOrAfter(Version.V_1_4_0)) { + return store.getMetadataOrEmpty().asMap(); + } else { + logger.debug("Force full recovery source node version {}", sourceNodeVersion); + // force full recovery if we recover from nodes < 1.4.0 + return Collections.EMPTY_MAP; + } + } + private void doRecovery(final RecoveryStatus recoveryStatus) { assert recoveryStatus.sourceNode() != null : "can't do a recovery without a source node"; logger.trace("collecting local files for {}", recoveryStatus); final Map existingFiles; try { - existingFiles = recoveryStatus.store().getMetadataOrEmpty().asMap(); + existingFiles = existingFiles(recoveryStatus.sourceNode(), recoveryStatus.store()); } catch (Exception e) { - logger.debug("error while listing local files, recovery as if there are none", e); + logger.debug("error while listing local files", e); onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(recoveryStatus.state(), "failed to list local files", e), true); return; } + final Version sourceNodeVersion = recoveryStatus.sourceNode().version(); + if (sourceNodeVersion.before(Version.V_1_3_2) && recoverySettings.compress()) { // don't recover from pre 1.3.2 if compression is on? + throw new ElasticsearchIllegalStateException("Can't recovery from node " + + recoveryStatus.sourceNode() + " with [" + RecoverySettings.INDICES_RECOVERY_COMPRESS + + " : true] due to compression bugs - see issue #7210 for details" ); + } final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(), false, existingFiles, recoveryStatus.state().getType(), recoveryStatus.recoveryId()); diff --git a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java index c8a2c66067a25..0e06974afc483 100644 --- a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java +++ b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java @@ -81,7 +81,6 @@ /** */ -@TestLogging("index.translog.fs:TRACE") public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompatIntegrationTest { /** diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java index 0e22d88f3d161..3601d78220eaf 100644 --- a/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java +++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java @@ -25,13 +25,11 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.MutableShardRouting; -import org.elasticsearch.cluster.routing.RoutingNodes; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.test.ElasticsearchAllocationTestCase; import org.junit.Test; @@ -83,7 +81,6 @@ public void testDoNotAllocateFromPrimary() { logger.info("start two nodes and fully start the shards"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); - RoutingTable prevRoutingTable = routingTable; routingTable = strategy.reroute(clusterState).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); @@ -91,15 +88,12 @@ public void testDoNotAllocateFromPrimary() { assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); assertThat(routingTable.index("test").shard(i).replicaShardsWithState(UNASSIGNED).size(), equalTo(2)); - } logger.info("start all the primary shards, replicas will start initializing"); RoutingNodes routingNodes = clusterState.routingNodes(); - prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - routingNodes = clusterState.routingNodes(); for (int i = 0; i < routingTable.index("test").shards().size(); i++) { assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); @@ -109,10 +103,8 @@ public void testDoNotAllocateFromPrimary() { } routingNodes = clusterState.routingNodes(); - prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - routingNodes = clusterState.routingNodes(); for (int i = 0; i < routingTable.index("test").shards().size(); i++) { assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); @@ -124,10 +116,8 @@ public void testDoNotAllocateFromPrimary() { clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) .put(newNode("node3", getPreviousVersion()))) .build(); - prevRoutingTable = routingTable; routingTable = strategy.reroute(clusterState).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - routingNodes = clusterState.routingNodes(); for (int i = 0; i < routingTable.index("test").shards().size(); i++) { assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); @@ -140,10 +130,8 @@ public void testDoNotAllocateFromPrimary() { clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) .put(newNode("node4"))) .build(); - prevRoutingTable = routingTable; routingTable = strategy.reroute(clusterState).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - routingNodes = clusterState.routingNodes(); for (int i = 0; i < routingTable.index("test").shards().size(); i++) { assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); @@ -153,10 +141,8 @@ public void testDoNotAllocateFromPrimary() { } routingNodes = clusterState.routingNodes(); - prevRoutingTable = routingTable; routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - routingNodes = clusterState.routingNodes(); for (int i = 0; i < routingTable.index("test").shards().size(); i++) { assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); @@ -335,7 +321,79 @@ private final void assertRecoveryNodeVersions(RoutingNodes routingNodes) { assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version())); } } + } + + public void testFailRecoverFromPre132WithCompression() { + final boolean compress = randomBoolean(); + AllocationService service = createAllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "INDICES_ALL_ACTIVE") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .put(RecoverySettings.INDICES_RECOVERY_COMPRESS, compress) + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + for (ShardRouting shard : routingTable.index("test").shard(i).shards()) { + assertEquals(shard.state(), UNASSIGNED); + assertNull(shard.currentNodeId()); + } + } + Version version = randomVersion(); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .put(newNode("old0", version))).build(); + clusterState = stabilize(clusterState, service); + routingTable = clusterState.routingTable(); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertEquals(routingTable.index("test").shard(i).shards().size(), 2); + for (ShardRouting shard : routingTable.index("test").shard(i).shards()) { + if (shard.primary()) { + assertEquals(shard.state(), STARTED); + assertEquals(shard.currentNodeId(), "old0"); + } else { + assertEquals(shard.state(), UNASSIGNED); + assertNull(shard.currentNodeId()); + } + } + } + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .put(newNode("old0", version)) + .put(newNode("new0"))).build(); + clusterState = stabilize(clusterState, service); + routingTable = clusterState.routingTable(); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertEquals(routingTable.index("test").shard(i).shards().size(), 2); + for (ShardRouting shard : routingTable.index("test").shard(i).shards()) { + if (shard.primary()) { + assertEquals(shard.state(), STARTED); + assertEquals(shard.currentNodeId(), "old0"); + } else { + if (version.before(Version.V_1_3_2) && compress) { // can't recover from pre 1.3.2 with compression enabled + assertEquals(shard.state(), UNASSIGNED); + assertNull(shard.currentNodeId()); + } else { + assertEquals(shard.state(), STARTED); + assertEquals(shard.currentNodeId(), "new0"); + } + } + } + + + } } } diff --git a/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java b/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java new file mode 100644 index 0000000000000..ced68e303a227 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java @@ -0,0 +1,71 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ElasticsearchSingleNodeTest; + +import java.io.IOException; +import java.util.Map; + +/** + * + */ +public class RecoveryTargetTests extends ElasticsearchSingleNodeTest { + + public void testFullRecoveryFromPre14() throws IOException { + createIndex("test"); + int numDocs = scaledRandomIntBetween(10, 100); + for (int j = 0; j < numDocs; ++j) { + String id = Integer.toString(j); + client().prepareIndex("test", "type1", id).setSource("text", "sometext").get(); + } + client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).setForce(true).get(); + RecoveryTarget recoveryTarget = getInstanceFromNode(RecoveryTarget.class); + IndexService idxService = getInstanceFromNode(IndicesService.class).indexService("test"); + Store store = idxService.shard(0).store(); + store.incRef(); + try { + DiscoveryNode discoveryNode = new DiscoveryNode("123", new LocalTransportAddress("123"), Version.CURRENT); + Map metaDataMap = recoveryTarget.existingFiles(discoveryNode, store); + assertTrue(metaDataMap.size() > 0); + int iters = randomIntBetween(10, 20); + for (int i = 0; i < iters; i++) { + Version version = randomVersion(); + DiscoveryNode discoNode = new DiscoveryNode("123", new LocalTransportAddress("123"), version); + Map map = recoveryTarget.existingFiles(discoNode, store); + if (version.before(Version.V_1_4_0)) { + assertTrue(map.isEmpty()); + } else { + assertEquals(map.size(), metaDataMap.size()); + } + + } + } finally { + store.decRef(); + } + + } +} diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java b/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java index 9ec94574c4440..0540161751c15 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.node.settings.NodeSettingsService; import java.lang.reflect.Constructor; @@ -64,6 +65,7 @@ public static AllocationService createAllocationService(Settings settings, Rando public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) { final ImmutableSet> defaultAllocationDeciders = AllocationDecidersModule.DEFAULT_ALLOCATION_DECIDERS; + final RecoverySettings recoverySettings = new RecoverySettings(settings, nodeSettingsService); final List list = new ArrayList<>(); for (Class deciderClass : defaultAllocationDeciders) { try { @@ -71,9 +73,13 @@ public static AllocationDeciders randomAllocationDeciders(Settings settings, Nod Constructor constructor = deciderClass.getConstructor(Settings.class, NodeSettingsService.class); list.add(constructor.newInstance(settings, nodeSettingsService)); } catch (NoSuchMethodException e) { - Constructor constructor = null; - constructor = deciderClass.getConstructor(Settings.class); - list.add(constructor.newInstance(settings)); + try { + Constructor constructor = deciderClass.getConstructor(Settings.class); + list.add(constructor.newInstance(settings)); + } catch (NoSuchMethodException e1) { + Constructor constructor = deciderClass.getConstructor(Settings.class, RecoverySettings.class); + list.add(constructor.newInstance(settings, recoverySettings)); + } } } catch (Exception ex) { throw new RuntimeException(ex);