Skip to content

Commit

Permalink
Bootstrap a new history_uuid when force allocating a stale primary (#…
Browse files Browse the repository at this point in the history
…33432)

This commit ensures that we bootstrap a new history_uuid when force
allocating a stale primary. A stale primary should never be the source
of an operation-based recovery to another shard which exists before the
forced-allocation.

Closes #26712
  • Loading branch information
dnhatn committed Sep 8, 2018
1 parent f27c3dc commit 94e4cb6
Show file tree
Hide file tree
Showing 27 changed files with 194 additions and 164 deletions.
Expand Up @@ -27,10 +27,11 @@
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RecoverySource.EmptyStoreRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.ExistingStoreRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.LocalShardsRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -386,7 +387,7 @@ private Builder initializeAsRestore(IndexMetaData indexMetaData, SnapshotRecover
if (asNew && ignoreShards.contains(shardNumber)) {
// This shards wasn't completely snapshotted - restore it as new shard
indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, primary,
primary ? StoreRecoverySource.EMPTY_STORE_INSTANCE : PeerRecoverySource.INSTANCE, unassignedInfo));
primary ? EmptyStoreRecoverySource.INSTANCE : PeerRecoverySource.INSTANCE, unassignedInfo));
} else {
indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, primary,
primary ? recoverySource : PeerRecoverySource.INSTANCE, unassignedInfo));
Expand All @@ -410,13 +411,13 @@ private Builder initializeEmpty(IndexMetaData indexMetaData, UnassignedInfo unas
final RecoverySource primaryRecoverySource;
if (indexMetaData.inSyncAllocationIds(shardNumber).isEmpty() == false) {
// we have previous valid copies for this shard. use them for recovery
primaryRecoverySource = StoreRecoverySource.EXISTING_STORE_INSTANCE;
primaryRecoverySource = ExistingStoreRecoverySource.INSTANCE;
} else if (indexMetaData.getResizeSourceIndex() != null) {
// this is a new index but the initial shards should merged from another index
primaryRecoverySource = LocalShardsRecoverySource.INSTANCE;
} else {
// a freshly created index with no restriction
primaryRecoverySource = StoreRecoverySource.EMPTY_STORE_INSTANCE;
primaryRecoverySource = EmptyStoreRecoverySource.INSTANCE;
}
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
for (int i = 0; i <= indexMetaData.getNumberOfReplicas(); i++) {
Expand Down
Expand Up @@ -34,7 +34,8 @@
/**
* Represents the recovery source of a shard. Available recovery types are:
*
* - {@link StoreRecoverySource} recovery from the local store (empty or with existing data)
* - {@link EmptyStoreRecoverySource} recovery from an empty store
* - {@link ExistingStoreRecoverySource} recovery from an existing store
* - {@link PeerRecoverySource} recovery from a primary on another node
* - {@link SnapshotRecoverySource} recovery from a snapshot
* - {@link LocalShardsRecoverySource} recovery from other shards of another index on the same node
Expand All @@ -59,8 +60,8 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param
public static RecoverySource readFrom(StreamInput in) throws IOException {
Type type = Type.values()[in.readByte()];
switch (type) {
case EMPTY_STORE: return StoreRecoverySource.EMPTY_STORE_INSTANCE;
case EXISTING_STORE: return StoreRecoverySource.EXISTING_STORE_INSTANCE;
case EMPTY_STORE: return EmptyStoreRecoverySource.INSTANCE;
case EXISTING_STORE: return new ExistingStoreRecoverySource(in);
case PEER: return PeerRecoverySource.INSTANCE;
case SNAPSHOT: return new SnapshotRecoverySource(in);
case LOCAL_SHARDS: return LocalShardsRecoverySource.INSTANCE;
Expand Down Expand Up @@ -91,6 +92,10 @@ public enum Type {

public abstract Type getType();

public boolean shouldBootstrapNewHistoryUUID() {
return false;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -107,25 +112,68 @@ public int hashCode() {
}

/**
* recovery from an existing on-disk store or a fresh copy
* Recovery from a fresh copy
*/
public abstract static class StoreRecoverySource extends RecoverySource {
public static final StoreRecoverySource EMPTY_STORE_INSTANCE = new StoreRecoverySource() {
@Override
public Type getType() {
return Type.EMPTY_STORE;
public static final class EmptyStoreRecoverySource extends RecoverySource {
public static final EmptyStoreRecoverySource INSTANCE = new EmptyStoreRecoverySource();

@Override
public Type getType() {
return Type.EMPTY_STORE;
}

@Override
public String toString() {
return "new shard recovery";
}
}

/**
* Recovery from an existing on-disk store
*/
public static final class ExistingStoreRecoverySource extends RecoverySource {
public static final ExistingStoreRecoverySource INSTANCE = new ExistingStoreRecoverySource(false);
public static final ExistingStoreRecoverySource FORCE_STALE_PRIMARY_INSTANCE = new ExistingStoreRecoverySource(true);

private final boolean bootstrapNewHistoryUUID;

private ExistingStoreRecoverySource(boolean bootstrapNewHistoryUUID) {
this.bootstrapNewHistoryUUID = bootstrapNewHistoryUUID;
}

private ExistingStoreRecoverySource(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
bootstrapNewHistoryUUID = in.readBoolean();
} else {
bootstrapNewHistoryUUID = false;
}
};
public static final StoreRecoverySource EXISTING_STORE_INSTANCE = new StoreRecoverySource() {
@Override
public Type getType() {
return Type.EXISTING_STORE;
}

@Override
public void addAdditionalFields(XContentBuilder builder, Params params) throws IOException {
builder.field("bootstrap_new_history_uuid", bootstrapNewHistoryUUID);
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeBoolean(bootstrapNewHistoryUUID);
}
};
}

@Override
public boolean shouldBootstrapNewHistoryUUID() {
return bootstrapNewHistoryUUID;
}

@Override
public Type getType() {
return Type.EXISTING_STORE;
}

@Override
public String toString() {
return getType() == Type.EMPTY_STORE ? "new shard recovery" : "existing recovery";
return "existing store recovery; bootstrap_history_uuid=" + bootstrapNewHistoryUUID;
}
}

Expand Down
Expand Up @@ -19,14 +19,13 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.cluster.routing.RecoverySource.ExistingStoreRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -318,7 +317,7 @@ public ShardRouting moveToUnassigned(UnassignedInfo unassignedInfo) {
final RecoverySource recoverySource;
if (active()) {
if (primary()) {
recoverySource = StoreRecoverySource.EXISTING_STORE_INSTANCE;
recoverySource = ExistingStoreRecoverySource.INSTANCE;
} else {
recoverySource = PeerRecoverySource.INSTANCE;
}
Expand Down
Expand Up @@ -21,7 +21,7 @@

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.EmptyStoreRecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -136,7 +136,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
}

initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate,
StoreRecoverySource.EMPTY_STORE_INSTANCE);
EmptyStoreRecoverySource.INSTANCE);

return new RerouteExplanation(this, allocation.decision(Decision.YES, name() + " (allocation command)", "ignore deciders"));
}
Expand Down
Expand Up @@ -129,7 +129,8 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
"trying to allocate an existing primary shard [" + index + "][" + shardId + "], while no such shard has ever been active");
}

initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting);
initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, null,
RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE);
return new RerouteExplanation(this, allocation.decision(Decision.YES, name() + " (allocation command)", "ignore deciders"));
}

Expand Down
Expand Up @@ -398,6 +398,9 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
} else if (indexShouldExists) {
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
store.bootstrapNewHistory();
}
// since we recover from local, just fill the files and size
try {
final RecoveryState.Index index = recoveryState.getIndex();
Expand Down
Expand Up @@ -377,7 +377,7 @@ static GroupShardsIterator<SearchShardIterator> getShardsIter(String index, Orig
ArrayList<ShardRouting> unassigned = new ArrayList<>();

ShardRouting routing = ShardRouting.newUnassigned(new ShardId(new Index(index, "_na_"), i), true,
RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"));
RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"));
routing = routing.initialize(primaryNode.getId(), i + "p", 0);
routing.started();
started.add(routing);
Expand Down
Expand Up @@ -19,7 +19,7 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.ExistingStoreRecoverySource;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand All @@ -37,7 +37,7 @@
public class AllocationIdTests extends ESTestCase {
public void testShardToStarted() {
logger.info("-- create unassigned shard");
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, StoreRecoverySource.EXISTING_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, ExistingStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
assertThat(shard.allocationId(), nullValue());

logger.info("-- initialize the shard");
Expand All @@ -57,7 +57,7 @@ public void testShardToStarted() {

public void testSuccessfulRelocation() {
logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, StoreRecoverySource.EXISTING_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, ExistingStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard = shard.initialize("node1", null, -1);
shard = shard.moveToStarted();

Expand All @@ -80,7 +80,7 @@ public void testSuccessfulRelocation() {

public void testCancelRelocation() {
logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, StoreRecoverySource.EXISTING_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, ExistingStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard = shard.initialize("node1", null, -1);
shard = shard.moveToStarted();

Expand All @@ -100,7 +100,7 @@ public void testCancelRelocation() {

public void testMoveToUnassigned() {
logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, StoreRecoverySource.EXISTING_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, ExistingStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard = shard.initialize("node1", null, -1);
shard = shard.moveToStarted();

Expand Down
Expand Up @@ -77,7 +77,7 @@ public void testIterate() {

public ShardRouting newRouting(Index index, int id, boolean started) {
ShardRouting shardRouting = ShardRouting.newUnassigned(new ShardId(index, id), true,
RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
shardRouting = ShardRoutingHelper.initialize(shardRouting, "some node");
if (started) {
shardRouting = ShardRoutingHelper.moveToStarted(shardRouting);
Expand Down

0 comments on commit 94e4cb6

Please sign in to comment.