Skip to content

Commit

Permalink
Put a fake allocation id on allocate stale primary command (#34140)
Browse files Browse the repository at this point in the history
removes fake allocation id after recovery is done

Relates to #33432
  • Loading branch information
vladimirdolzhenko committed Nov 7, 2018
1 parent 314b9ca commit f789d49
Show file tree
Hide file tree
Showing 8 changed files with 352 additions and 40 deletions.
Expand Up @@ -140,12 +140,19 @@ boolean validate(MetaData metaData) {
}

if (shardRouting.primary() && shardRouting.initializing() &&
shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE &&
inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false)
throw new IllegalStateException("a primary shard routing " + shardRouting + " is a primary that is recovering from " +
"a known allocation id but has no corresponding entry in the in-sync " +
"allocation set " + inSyncAllocationIds);

shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE) {
if (inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID)) {
if (inSyncAllocationIds.size() != 1) {
throw new IllegalStateException("a primary shard routing " + shardRouting
+ " is a primary that is recovering from a stale primary has unexpected allocation ids in in-sync " +
"allocation set " + inSyncAllocationIds);
}
} else if (inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) {
throw new IllegalStateException("a primary shard routing " + shardRouting
+ " is a primary that is recovering from a known allocation id but has no corresponding entry in the in-sync " +
"allocation set " + inSyncAllocationIds);
}
}
}
}
return true;
Expand Down
Expand Up @@ -132,6 +132,11 @@ public String toString() {
* Recovery from an existing on-disk store
*/
public static final class ExistingStoreRecoverySource extends RecoverySource {
/**
* Special allocation id that shard has during initialization on allocate_stale_primary
*/
public static final String FORCED_ALLOCATION_ID = "_forced_allocation_";

public static final ExistingStoreRecoverySource INSTANCE = new ExistingStoreRecoverySource(false);
public static final ExistingStoreRecoverySource FORCE_STALE_PRIMARY_INSTANCE = new ExistingStoreRecoverySource(true);

Expand Down
Expand Up @@ -39,6 +39,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -68,7 +69,16 @@ public void shardInitialized(ShardRouting unassignedShard, ShardRouting initiali

@Override
public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) {
addAllocationId(startedShard);
assert Objects.equals(initializingShard.allocationId().getId(), startedShard.allocationId().getId())
: "initializingShard.allocationId [" + initializingShard.allocationId().getId()
+ "] and startedShard.allocationId [" + startedShard.allocationId().getId() + "] have to have the same";
Updates updates = changes(startedShard.shardId());
updates.addedAllocationIds.add(startedShard.allocationId().getId());
if (startedShard.primary()
// started shard has to have null recoverySource; have to pick up recoverySource from its initializing state
&& (initializingShard.recoverySource() == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE)) {
updates.removedAllocationIds.add(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID);
}
}

@Override
Expand Down Expand Up @@ -144,7 +154,8 @@ private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTab
oldInSyncAllocationIds.contains(updates.initializedPrimary.allocationId().getId()) == false) {
// we're not reusing an existing in-sync allocation id to initialize a primary, which means that we're either force-allocating
// an empty or a stale primary (see AllocateEmptyPrimaryAllocationCommand or AllocateStalePrimaryAllocationCommand).
RecoverySource.Type recoverySourceType = updates.initializedPrimary.recoverySource().getType();
RecoverySource recoverySource = updates.initializedPrimary.recoverySource();
RecoverySource.Type recoverySourceType = recoverySource.getType();
boolean emptyPrimary = recoverySourceType == RecoverySource.Type.EMPTY_STORE;
assert updates.addedAllocationIds.isEmpty() : (emptyPrimary ? "empty" : "stale") +
" primary is not force-initialized in same allocation round where shards are started";
Expand All @@ -156,16 +167,26 @@ private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTab
// forcing an empty primary resets the in-sync allocations to the empty set (ShardRouting.allocatedPostIndexCreate)
indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), Collections.emptySet());
} else {
final String allocationId;
if (recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE) {
allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID;
} else {
assert recoverySource instanceof RecoverySource.SnapshotRecoverySource : recoverySource;
allocationId = updates.initializedPrimary.allocationId().getId();
}
// forcing a stale primary resets the in-sync allocations to the singleton set with the stale id
indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(),
Collections.singleton(updates.initializedPrimary.allocationId().getId()));
indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), Collections.singleton(allocationId));
}
} else {
// standard path for updating in-sync ids
Set<String> inSyncAllocationIds = new HashSet<>(oldInSyncAllocationIds);
inSyncAllocationIds.addAll(updates.addedAllocationIds);
inSyncAllocationIds.removeAll(updates.removedAllocationIds);

assert oldInSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false
|| inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false :
"fake allocation id has to be removed, inSyncAllocationIds:" + inSyncAllocationIds;

// Prevent set of inSyncAllocationIds to grow unboundedly. This can happen for example if we don't write to a primary
// but repeatedly shut down nodes that have active replicas.
// We use number_of_replicas + 1 (= possible active shard copies) to bound the inSyncAllocationIds set
Expand Down Expand Up @@ -287,13 +308,6 @@ void removeAllocationId(ShardRouting shardRouting) {
}
}

/**
* Add allocation id of this shard to the set of in-sync shard copies
*/
private void addAllocationId(ShardRouting shardRouting) {
changes(shardRouting.shardId()).addedAllocationIds.add(shardRouting.allocationId().getId());
}

/**
* Increase primary term for this shard id
*/
Expand Down
@@ -0,0 +1,238 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.routing;

import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommandIT;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
public class AllocationIdIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class, InternalSettingsPlugin.class);
}

public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStalePrimary() throws Exception {
/*
* Allocation id is put on start of shard while historyUUID is adjusted after recovery is done.
*
* If during execution of AllocateStalePrimary a proper allocation id is stored in allocation id set and recovery is failed
* shard restart skips the stage where historyUUID is changed.
*
* That leads to situation where allocated stale primary and its replica belongs to the same historyUUID and
* replica will receive operations after local checkpoint while documents before checkpoints could be significant different.
*
* Therefore, on AllocateStalePrimary we put some fake allocation id (no real one could be generated like that)
* and any failure during recovery requires extra AllocateStalePrimary command to be executed.
*/

// initial set up
final String indexName = "index42";
final String master = internalCluster().startMasterOnlyNode();
String node1 = internalCluster().startNode();
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum").build());
final int numDocs = indexDocs(indexName, "foo", "bar");
final IndexSettings indexSettings = getIndexSettings(indexName, node1);
final Set<String> allocationIds = getAllocationIds(indexName);
final ShardId shardId = new ShardId(resolveIndex(indexName), 0);
final Path indexPath = getIndexPath(node1, shardId);
assertThat(allocationIds, hasSize(1));
final String historyUUID = historyUUID(node1, indexName);
String node2 = internalCluster().startNode();
ensureGreen(indexName);
internalCluster().assertSameDocIdsOnShards();
// initial set up is done

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1));

// index more docs to node2 that marks node1 as stale
int numExtraDocs = indexDocs(indexName, "foo", "bar2");
assertHitCount(client(node2).prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocs + numExtraDocs);

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node2));

// create fake corrupted marker on node1
putFakeCorruptionMarker(indexSettings, shardId, indexPath);

// thanks to master node1 is out of sync
node1 = internalCluster().startNode();

// there is only _stale_ primary
checkNoValidShardCopy(indexName, shardId);

// allocate stale primary
client(node1).admin().cluster().prepareReroute()
.add(new AllocateStalePrimaryAllocationCommand(indexName, 0, node1, true))
.get();

// allocation fails due to corruption marker
assertBusy(() -> {
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final ShardRouting shardRouting = state.routingTable().index(indexName).shard(shardId.id()).primaryShard();
assertThat(shardRouting.state(), equalTo(ShardRoutingState.UNASSIGNED));
assertThat(shardRouting.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
});

try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) {
store.removeCorruptionMarker();
}

// index is red: no any shard is allocated (allocation id is a fake id that does not match to anything)
checkHealthStatus(indexName, ClusterHealthStatus.RED);
checkNoValidShardCopy(indexName, shardId);

internalCluster().restartNode(node1, InternalTestCluster.EMPTY_CALLBACK);

// index is still red due to mismatch of allocation id
checkHealthStatus(indexName, ClusterHealthStatus.RED);
checkNoValidShardCopy(indexName, shardId);

// no any valid shard is there; have to invoke AllocateStalePrimary again
client().admin().cluster().prepareReroute()
.add(new AllocateStalePrimaryAllocationCommand(indexName, 0, node1, true))
.get();

ensureYellow(indexName);

// bring node2 back
node2 = internalCluster().startNode();
ensureGreen(indexName);

assertThat(historyUUID(node1, indexName), not(equalTo(historyUUID)));
assertThat(historyUUID(node1, indexName), equalTo(historyUUID(node2, indexName)));

internalCluster().assertSameDocIdsOnShards();
}

public void checkHealthStatus(String indexName, ClusterHealthStatus healthStatus) {
final ClusterHealthStatus indexHealthStatus = client().admin().cluster()
.health(Requests.clusterHealthRequest(indexName)).actionGet().getStatus();
assertThat(indexHealthStatus, is(healthStatus));
}

private int indexDocs(String indexName, Object ... source) throws InterruptedException, ExecutionException {
// index some docs in several segments
int numDocs = 0;
for (int k = 0, attempts = randomIntBetween(5, 10); k < attempts; k++) {
final int numExtraDocs = between(10, 100);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numExtraDocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex(indexName, "type").setSource(source);
}

indexRandom(true, false, true, Arrays.asList(builders));
numDocs += numExtraDocs;
}

return numDocs;
}

private Path getIndexPath(String nodeName, ShardId shardId) {
final Set<Path> indexDirs = RemoveCorruptedShardDataCommandIT.getDirs(nodeName, shardId, ShardPath.INDEX_FOLDER_NAME);
assertThat(indexDirs, hasSize(1));
return indexDirs.iterator().next();
}

private Set<String> getAllocationIds(String indexName) {
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final Set<String> allocationIds = state.metaData().index(indexName).inSyncAllocationIds(0);
return allocationIds;
}

private IndexSettings getIndexSettings(String indexName, String nodeName) {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
final IndexService indexService = indicesService.indexService(resolveIndex(indexName));
return indexService.getIndexSettings();
}

private String historyUUID(String node, String indexName) {
final ShardStats[] shards = client(node).admin().indices().prepareStats(indexName).clear().get().getShards();
assertThat(shards.length, greaterThan(0));
final Set<String> historyUUIDs = Arrays.stream(shards)
.map(shard -> shard.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY))
.collect(Collectors.toSet());
assertThat(historyUUIDs, hasSize(1));
return historyUUIDs.iterator().next();
}

private void putFakeCorruptionMarker(IndexSettings indexSettings, ShardId shardId, Path indexPath) throws IOException {
try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) {
store.markStoreCorrupted(new IOException("fake ioexception"));
}
}

private void checkNoValidShardCopy(String indexName, ShardId shardId) throws Exception {
final ClusterAllocationExplanation explanation =
client().admin().cluster().prepareAllocationExplain()
.setIndex(indexName).setShard(shardId.id()).setPrimary(true)
.get().getExplanation();

final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision();
assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true));
assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(),
equalTo(AllocationDecision.NO_VALID_SHARD_COPY));
}

}

0 comments on commit f789d49

Please sign in to comment.