Skip to content

Commit

Permalink
Replace primaryPostAllocated flag and use UnassignedInfo
Browse files Browse the repository at this point in the history
There is no need to maintain additional state as to if a primary was allocated post api creation on the index routing table, we hold all this information already in the UnassignedInfo class.
closes #12374
  • Loading branch information
kimchy authored and rmuir committed Jul 22, 2015
1 parent 43080bf commit de299b9
Show file tree
Hide file tree
Showing 25 changed files with 92 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -92,8 +93,10 @@ public ClusterState execute(ClusterState currentState) {
if (indexMetaData.state() != IndexMetaData.State.CLOSE) {
IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index);
for (IndexShardRoutingTable shard : indexRoutingTable) {
if (!shard.primaryAllocatedPostApi()) {
throw new IndexPrimaryShardNotAllocatedException(new Index(index));
for (ShardRouting shardRouting : shard) {
if (shardRouting.primary() == true && shardRouting.allocatedPostIndexCreate() == false) {
throw new IndexPrimaryShardNotAllocatedException(new Index(index));
}
}
}
indicesToClose.add(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,28 +383,28 @@ public static IndexRoutingTable readFrom(StreamInput in) throws IOException {
* Initializes a new empty index, as if it was created from an API.
*/
public Builder initializeAsNew(IndexMetaData indexMetaData) {
return initializeEmpty(indexMetaData, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
}

/**
* Initializes a new empty index, as if it was created from an API.
*/
public Builder initializeAsRecovery(IndexMetaData indexMetaData) {
return initializeEmpty(indexMetaData, false, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
}

/**
* Initializes a new index caused by dangling index imported.
*/
public Builder initializeAsFromDangling(IndexMetaData indexMetaData) {
return initializeEmpty(indexMetaData, false, new UnassignedInfo(UnassignedInfo.Reason.DANGLING_INDEX_IMPORTED, null));
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.DANGLING_INDEX_IMPORTED, null));
}

/**
* Initializes a new empty index, as as a result of opening a closed index.
*/
public Builder initializeAsFromCloseToOpen(IndexMetaData indexMetaData) {
return initializeEmpty(indexMetaData, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, null));
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, null));
}

/**
Expand All @@ -429,7 +429,7 @@ private Builder initializeAsRestore(IndexMetaData indexMetaData, RestoreSource r
throw new IllegalStateException("trying to initialize an index with fresh shards, but already has shards created");
}
for (int shardId = 0; shardId < indexMetaData.numberOfShards(); shardId++) {
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(new ShardId(indexMetaData.index(), shardId), asNew ? false : true);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(new ShardId(indexMetaData.index(), shardId));
for (int i = 0; i <= indexMetaData.numberOfReplicas(); i++) {
if (asNew && ignoreShards.contains(shardId)) {
// This shards wasn't completely snapshotted - restore it as new shard
Expand All @@ -446,12 +446,12 @@ private Builder initializeAsRestore(IndexMetaData indexMetaData, RestoreSource r
/**
* Initializes a new empty index, with an option to control if its from an API or not.
*/
private Builder initializeEmpty(IndexMetaData indexMetaData, boolean asNew, UnassignedInfo unassignedInfo) {
private Builder initializeEmpty(IndexMetaData indexMetaData, UnassignedInfo unassignedInfo) {
if (!shards.isEmpty()) {
throw new IllegalStateException("trying to initialize an index with fresh shards, but already has shards created");
}
for (int shardId = 0; shardId < indexMetaData.numberOfShards(); shardId++) {
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(new ShardId(indexMetaData.index(), shardId), asNew ? false : true);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(new ShardId(indexMetaData.index(), shardId));
for (int i = 0; i <= indexMetaData.numberOfReplicas(); i++) {
indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(index, shardId, null, i == 0, unassignedInfo));
}
Expand Down Expand Up @@ -481,7 +481,7 @@ public Builder removeReplica() {
return this;
}
// re-add all the current ones
IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(indexShard.shardId(), indexShard.primaryAllocatedPostApi());
IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(indexShard.shardId());
for (ShardRouting shardRouting : indexShard) {
builder.addShard(new ShardRouting(shardRouting));
}
Expand Down Expand Up @@ -513,24 +513,14 @@ public Builder addIndexShard(IndexShardRoutingTable indexShard) {
return this;
}

/**
* Clears the post allocation flag for the specified shard
*/
public Builder clearPostAllocationFlag(ShardId shardId) {
assert this.index.equals(shardId.index().name());
IndexShardRoutingTable indexShard = shards.get(shardId.id());
shards.put(indexShard.shardId().id(), new IndexShardRoutingTable(indexShard.shardId(), indexShard.shards(), false));
return this;
}

/**
* Adds a new shard routing (makes a copy of it), with reference data used from the index shard routing table
* if it needs to be created.
*/
public Builder addShard(IndexShardRoutingTable refData, ShardRouting shard) {
IndexShardRoutingTable indexShard = shards.get(shard.id());
if (indexShard == null) {
indexShard = new IndexShardRoutingTable.Builder(refData.shardId(), refData.primaryAllocatedPostApi()).addShard(new ShardRouting(shard)).build();
indexShard = new IndexShardRoutingTable.Builder(refData.shardId()).addShard(new ShardRouting(shard)).build();
} else {
indexShard = new IndexShardRoutingTable.Builder(indexShard).addShard(new ShardRouting(shard)).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,10 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
*/
final ImmutableList<ShardRouting> allInitializingShards;

final boolean primaryAllocatedPostApi;

IndexShardRoutingTable(ShardId shardId, List<ShardRouting> shards, boolean primaryAllocatedPostApi) {
IndexShardRoutingTable(ShardId shardId, List<ShardRouting> shards) {
this.shardId = shardId;
this.shuffler = new RotationShardShuffler(ThreadLocalRandom.current().nextInt());
this.shards = ImmutableList.copyOf(shards);
this.primaryAllocatedPostApi = primaryAllocatedPostApi;

ShardRouting primary = null;
ImmutableList.Builder<ShardRouting> replicas = ImmutableList.builder();
Expand Down Expand Up @@ -144,15 +141,7 @@ public IndexShardRoutingTable normalizeVersions() {
shardRoutings.add(new ShardRouting(shards.get(i), highestVersion));
}
}
return new IndexShardRoutingTable(shardId, ImmutableList.copyOf(shardRoutings), primaryAllocatedPostApi);
}

/**
* Has this shard group primary shard been allocated post API creation. Will be set to
* <code>true</code> if it was created because of recovery action.
*/
public boolean primaryAllocatedPostApi() {
return primaryAllocatedPostApi;
return new IndexShardRoutingTable(shardId, ImmutableList.copyOf(shardRoutings));
}

/**
Expand Down Expand Up @@ -434,7 +423,6 @@ public boolean equals(Object o) {

IndexShardRoutingTable that = (IndexShardRoutingTable) o;

if (primaryAllocatedPostApi != that.primaryAllocatedPostApi) return false;
if (!shardId.equals(that.shardId)) return false;
if (!shards.equals(that.shards)) return false;

Expand All @@ -445,7 +433,6 @@ public boolean equals(Object o) {
public int hashCode() {
int result = shardId.hashCode();
result = 31 * result + shards.hashCode();
result = 31 * result + (primaryAllocatedPostApi ? 1 : 0);
return result;
}

Expand Down Expand Up @@ -594,21 +581,16 @@ public List<ShardRouting> shardsWithState(ShardRoutingState state) {
public static class Builder {

private ShardId shardId;

private final List<ShardRouting> shards;

private boolean primaryAllocatedPostApi;

public Builder(IndexShardRoutingTable indexShard) {
this.shardId = indexShard.shardId;
this.shards = newArrayList(indexShard.shards);
this.primaryAllocatedPostApi = indexShard.primaryAllocatedPostApi();
}

public Builder(ShardId shardId, boolean primaryAllocatedPostApi) {
public Builder(ShardId shardId) {
this.shardId = shardId;
this.shards = newArrayList();
this.primaryAllocatedPostApi = primaryAllocatedPostApi;
}

public Builder addShard(ShardRouting shardEntry) {
Expand All @@ -630,15 +612,7 @@ public Builder removeShard(ShardRouting shardEntry) {
}

public IndexShardRoutingTable build() {
// we can automatically set allocatedPostApi to true if the primary is active
if (!primaryAllocatedPostApi) {
for (ShardRouting shardRouting : shards) {
if (shardRouting.primary() && shardRouting.active()) {
primaryAllocatedPostApi = true;
}
}
}
return new IndexShardRoutingTable(shardId, ImmutableList.copyOf(shards), primaryAllocatedPostApi);
return new IndexShardRoutingTable(shardId, ImmutableList.copyOf(shards));
}

public static IndexShardRoutingTable readFrom(StreamInput in) throws IOException {
Expand All @@ -648,8 +622,7 @@ public static IndexShardRoutingTable readFrom(StreamInput in) throws IOException

public static IndexShardRoutingTable readFromThin(StreamInput in, String index) throws IOException {
int iShardId = in.readVInt();
boolean allocatedPostApi = in.readBoolean();
Builder builder = new Builder(new ShardId(index, iShardId), allocatedPostApi);
Builder builder = new Builder(new ShardId(index, iShardId));

int size = in.readVInt();
for (int i = 0; i < size; i++) {
Expand All @@ -667,7 +640,6 @@ public static void writeTo(IndexShardRoutingTable indexShard, StreamOutput out)

public static void writeToThin(IndexShardRoutingTable indexShard, StreamOutput out) throws IOException {
out.writeVInt(indexShard.shardId.id());
out.writeBoolean(indexShard.primaryAllocatedPostApi());

out.writeVInt(indexShard.shards.size());
for (ShardRouting entry : indexShard) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {

private int relocatingShards = 0;

private Set<ShardId> clearPostAllocationFlag;

private final Map<String, ObjectIntHashMap<String>> nodesPerAttributeNames = new HashMap<>();

public RoutingNodes(ClusterState clusterState) {
Expand Down Expand Up @@ -191,25 +189,6 @@ public RoutingNodesIterator nodes() {
return new RoutingNodesIterator(nodesToShards.values().iterator());
}

/**
* Clears the post allocation flag for the provided shard id. NOTE: this should be used cautiously
* since it will lead to data loss of the primary shard is not allocated, as it will allocate
* the primary shard on a node and *not* expect it to have an existing valid index there.
*/
public void addClearPostAllocationFlag(ShardId shardId) {
if (clearPostAllocationFlag == null) {
clearPostAllocationFlag = Sets.newHashSet();
}
clearPostAllocationFlag.add(shardId);
}

public Iterable<ShardId> getShardsToClearPostAllocationFlag() {
if (clearPostAllocationFlag == null) {
return ImmutableSet.of();
}
return clearPostAllocationFlag;
}

public RoutingNode node(String nodeId) {
return nodesToShards.get(nodeId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,6 @@ public Builder updateNodes(RoutingNodes routingNodes) {
indexBuilder.addShard(refData, shardRoutingEntry);
}

for (ShardId shardId : routingNodes.getShardsToClearPostAllocationFlag()) {
IndexRoutingTable.Builder indexRoutingBuilder = indexRoutingTableBuilders.get(shardId.index().name());
if (indexRoutingBuilder != null) {
indexRoutingBuilder.clearPostAllocationFlag(shardId);
}
}

for (IndexRoutingTable.Builder indexBuilder : indexRoutingTableBuilders.values()) {
add(indexBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,21 @@ public ShardId shardId() {
return shardIdentifier;
}

public boolean allocatedPostIndexCreate() {
if (active()) {
return true;
}

// unassigned info is only cleared when a shard moves to started, so
// for unassigned and initializing (we checked for active() before),
// we can safely assume it is there
if (unassignedInfo.getReason() == UnassignedInfo.Reason.INDEX_CREATED) {
return false;
}

return true;
}

/**
* A shard iterator with just this shard in it.
*/
Expand Down Expand Up @@ -362,6 +377,11 @@ public void writeTo(StreamOutput out) throws IOException {
writeToThin(out);
}

public void updateUnassignedInfo(UnassignedInfo unassignedInfo) {
ensureNotFrozen();
assert this.unassignedInfo != null : "can only update unassign info if they are already set";
this.unassignedInfo = unassignedInfo;
}

// package private mutators start here

Expand Down Expand Up @@ -431,6 +451,7 @@ void reinitializeShard() {
version++;
state = ShardRoutingState.INITIALIZING;
allocationId = AllocationId.newInitializing();
this.unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ public enum Reason {
/**
* Unassigned as a result of explicit cancel reroute command.
*/
REROUTE_CANCELLED;
REROUTE_CANCELLED,
/**
* When a shard moves from started back to initializing, for example, during shadow replica
*/
REINITIALIZED;
}

private final Reason reason;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
Expand All @@ -35,7 +36,6 @@
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.Iterator;

/**
* Allocates an unassigned shard to a specific node. Note, primary allocation will "force"
Expand Down Expand Up @@ -221,15 +221,17 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
}
// go over and remove it from the unassigned
for (RoutingNodes.UnassignedShards.UnassignedIterator it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
if (it.next() != shardRouting) {
ShardRouting unassigned = it.next();
if (unassigned != shardRouting) {
continue;
}
it.initialize(routingNode.nodeId());
if (shardRouting.primary()) {
// we need to clear the post allocation flag, since its an explicit allocation of the primary shard
// and we want to force allocate it (and create a new index for it)
routingNodes.addClearPostAllocationFlag(shardRouting.shardId());
// if we force allocation of a primary, we need to move the unassigned info back to treat it as if
// it was index creation
if (unassigned.primary() && unassigned.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED) {
unassigned.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
"force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(), unassigned.unassignedInfo().getFailure()));
}
it.initialize(routingNode.nodeId());
break;
}
return new RerouteExplanation(this, decision);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return allocation.decision(Decision.YES, NAME, "allocation disabling is ignored");
}
Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).settings();
if (shardRouting.primary() && !allocation.routingNodes().routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi()) {
if (shardRouting.primary() && shardRouting.allocatedPostIndexCreate() == false) {
// if its primary, and it hasn't been allocated post API (meaning its a "fresh newly created shard"), only disable allocation
// on a special disable allocation flag
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, disableNewAllocation)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
}

// a flag for whether the primary shard has been previously allocated
boolean primaryHasBeenAllocated = allocation.routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi();
boolean primaryHasBeenAllocated = shardRouting.primary() && shardRouting.allocatedPostIndexCreate();

// checks for exact byte comparisons
if (freeBytes < freeBytesThresholdLow.bytes()) {
Expand Down

0 comments on commit de299b9

Please sign in to comment.