Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't recover from buggy version #9925

Merged
merged 1 commit into from
Mar 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.recovery.RecoverySettings;

/**
* An allocation decider that prevents relocation or allocation from nodes
Expand All @@ -37,10 +39,12 @@
public class NodeVersionAllocationDecider extends AllocationDecider {

public static final String NAME = "node_version";
private final RecoverySettings recoverySettings;

@Inject
public NodeVersionAllocationDecider(Settings settings) {
public NodeVersionAllocationDecider(Settings settings, RecoverySettings recoverySettings) {
super(settings);
this.recoverySettings = recoverySettings;
}

@Override
Expand All @@ -65,6 +69,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, RoutingAllocation allocation) {
final RoutingNode source = routingNodes.node(sourceNodeId);
if (source.node().version().before(Version.V_1_3_2) && recoverySettings.compress()) { // never recover from pre 1.3.2 with compression enabled
return allocation.decision(Decision.NO, NAME, "source node version [%s] is prone to corruption bugs with %s = true see issue #7210 for details",
source.node().version(), RecoverySettings.INDICES_RECOVERY_COMPRESS);
}
if (target.node().version().onOrAfter(source.node().version())) {
/* we can allocate if we can recover from a node that is younger or on the same version
* if the primary is already running on a newer version that won't work due to possible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.bootstrap.Elasticsearch;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -149,19 +150,37 @@ protected void retryRecovery(final RecoveryStatus recoveryStatus, final String r
threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryStatus.recoveryId()));
}

// pkd private for testing
Map<String, StoreFileMetaData> existingFiles(DiscoveryNode sourceNode, Store store) throws IOException {
final Version sourceNodeVersion = sourceNode.version();
if (sourceNodeVersion.onOrAfter(Version.V_1_4_0)) {
return store.getMetadataOrEmpty().asMap();
} else {
logger.debug("Force full recovery source node version {}", sourceNodeVersion);
// force full recovery if we recover from nodes < 1.4.0
return Collections.EMPTY_MAP;
}
}

private void doRecovery(final RecoveryStatus recoveryStatus) {
assert recoveryStatus.sourceNode() != null : "can't do a recovery without a source node";

logger.trace("collecting local files for {}", recoveryStatus);
final Map<String, StoreFileMetaData> existingFiles;
try {
existingFiles = recoveryStatus.store().getMetadataOrEmpty().asMap();
existingFiles = existingFiles(recoveryStatus.sourceNode(), recoveryStatus.store());
} catch (Exception e) {
logger.debug("error while listing local files, recovery as if there are none", e);
logger.debug("error while listing local files", e);
onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(),
new RecoveryFailedException(recoveryStatus.state(), "failed to list local files", e), true);
return;
}
final Version sourceNodeVersion = recoveryStatus.sourceNode().version();
if (sourceNodeVersion.before(Version.V_1_3_2) && recoverySettings.compress()) { // don't recover from pre 1.3.2 if compression is on?
throw new ElasticsearchIllegalStateException("Can't recovery from node "
+ recoveryStatus.sourceNode() + " with [" + RecoverySettings.INDICES_RECOVERY_COMPRESS
+ " : true] due to compression bugs - see issue #7210 for details" );
}
final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(),
false, existingFiles, recoveryStatus.state().getType(), recoveryStatus.recoveryId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@

/**
*/
@TestLogging("index.translog.fs:TRACE")
public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompatIntegrationTest {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Test;

Expand Down Expand Up @@ -83,23 +81,19 @@ public void testDoNotAllocateFromPrimary() {

logger.info("start two nodes and fully start the shards");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).replicaShardsWithState(UNASSIGNED).size(), equalTo(2));

}

logger.info("start all the primary shards, replicas will start initializing");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
Expand All @@ -109,10 +103,8 @@ public void testDoNotAllocateFromPrimary() {
}

routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
Expand All @@ -124,10 +116,8 @@ public void testDoNotAllocateFromPrimary() {
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node3", getPreviousVersion())))
.build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
Expand All @@ -140,10 +130,8 @@ public void testDoNotAllocateFromPrimary() {
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node4")))
.build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
Expand All @@ -153,10 +141,8 @@ public void testDoNotAllocateFromPrimary() {
}

routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();

for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
Expand Down Expand Up @@ -335,7 +321,79 @@ private final void assertRecoveryNodeVersions(RoutingNodes routingNodes) {
assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version()));
}
}
}

public void testFailRecoverFromPre132WithCompression() {
final boolean compress = randomBoolean();
AllocationService service = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "INDICES_ALL_ACTIVE")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.put(RecoverySettings.INDICES_RECOVERY_COMPRESS, compress)
.build());

logger.info("Building initial routing table");

MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(1))
.build();

RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();

ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();

assertThat(routingTable.index("test").shards().size(), equalTo(1));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
for (ShardRouting shard : routingTable.index("test").shard(i).shards()) {
assertEquals(shard.state(), UNASSIGNED);
assertNull(shard.currentNodeId());
}
}
Version version = randomVersion();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("old0", version))).build();
clusterState = stabilize(clusterState, service);
routingTable = clusterState.routingTable();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertEquals(routingTable.index("test").shard(i).shards().size(), 2);
for (ShardRouting shard : routingTable.index("test").shard(i).shards()) {
if (shard.primary()) {
assertEquals(shard.state(), STARTED);
assertEquals(shard.currentNodeId(), "old0");
} else {
assertEquals(shard.state(), UNASSIGNED);
assertNull(shard.currentNodeId());
}
}
}

clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("old0", version))
.put(newNode("new0"))).build();

clusterState = stabilize(clusterState, service);
routingTable = clusterState.routingTable();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertEquals(routingTable.index("test").shard(i).shards().size(), 2);
for (ShardRouting shard : routingTable.index("test").shard(i).shards()) {
if (shard.primary()) {
assertEquals(shard.state(), STARTED);
assertEquals(shard.currentNodeId(), "old0");
} else {
if (version.before(Version.V_1_3_2) && compress) { // can't recover from pre 1.3.2 with compression enabled
assertEquals(shard.state(), UNASSIGNED);
assertNull(shard.currentNodeId());
} else {
assertEquals(shard.state(), STARTED);
assertEquals(shard.currentNodeId(), "new0");
}
}
}


}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;

import java.io.IOException;
import java.util.Map;

/**
*
*/
public class RecoveryTargetTests extends ElasticsearchSingleNodeTest {

public void testFullRecoveryFromPre14() throws IOException {
createIndex("test");
int numDocs = scaledRandomIntBetween(10, 100);
for (int j = 0; j < numDocs; ++j) {
String id = Integer.toString(j);
client().prepareIndex("test", "type1", id).setSource("text", "sometext").get();
}
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).setForce(true).get();
RecoveryTarget recoveryTarget = getInstanceFromNode(RecoveryTarget.class);
IndexService idxService = getInstanceFromNode(IndicesService.class).indexService("test");
Store store = idxService.shard(0).store();
store.incRef();
try {
DiscoveryNode discoveryNode = new DiscoveryNode("123", new LocalTransportAddress("123"), Version.CURRENT);
Map<String, StoreFileMetaData> metaDataMap = recoveryTarget.existingFiles(discoveryNode, store);
assertTrue(metaDataMap.size() > 0);
int iters = randomIntBetween(10, 20);
for (int i = 0; i < iters; i++) {
Version version = randomVersion();
DiscoveryNode discoNode = new DiscoveryNode("123", new LocalTransportAddress("123"), version);
Map<String, StoreFileMetaData> map = recoveryTarget.existingFiles(discoNode, store);
if (version.before(Version.V_1_4_0)) {
assertTrue(map.isEmpty());
} else {
assertEquals(map.size(), metaDataMap.size());
}

}
} finally {
store.decRef();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.node.settings.NodeSettingsService;

import java.lang.reflect.Constructor;
Expand Down Expand Up @@ -64,16 +65,21 @@ public static AllocationService createAllocationService(Settings settings, Rando

public static AllocationDeciders randomAllocationDeciders(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
final ImmutableSet<Class<? extends AllocationDecider>> defaultAllocationDeciders = AllocationDecidersModule.DEFAULT_ALLOCATION_DECIDERS;
final RecoverySettings recoverySettings = new RecoverySettings(settings, nodeSettingsService);
final List<AllocationDecider> list = new ArrayList<>();
for (Class<? extends AllocationDecider> deciderClass : defaultAllocationDeciders) {
try {
try {
Constructor<? extends AllocationDecider> constructor = deciderClass.getConstructor(Settings.class, NodeSettingsService.class);
list.add(constructor.newInstance(settings, nodeSettingsService));
} catch (NoSuchMethodException e) {
Constructor<? extends AllocationDecider> constructor = null;
constructor = deciderClass.getConstructor(Settings.class);
list.add(constructor.newInstance(settings));
try {
Constructor<? extends AllocationDecider> constructor = deciderClass.getConstructor(Settings.class);
list.add(constructor.newInstance(settings));
} catch (NoSuchMethodException e1) {
Constructor<? extends AllocationDecider> constructor = deciderClass.getConstructor(Settings.class, RecoverySettings.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ouch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh well :) could be worse

list.add(constructor.newInstance(settings, recoverySettings));
}
}
} catch (Exception ex) {
throw new RuntimeException(ex);
Expand Down