Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify assignToNode to only do initializing #12235

Merged
merged 1 commit into from Jul 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -363,36 +363,15 @@ public String prettyPrint() {
}

/**
* Assign a shard to a node. This will increment the inactiveShardCount counter
* and the inactivePrimaryCount counter if the shard is the primary.
* In case the shard is already assigned and started, it will be marked as
* relocating, which is accounted for, too, so the number of concurrent relocations
* can be retrieved easily.
* This method can be called several times for the same shard, only the first time
* will change the state.
*
* INITIALIZING => INITIALIZING
* UNASSIGNED => INITIALIZING
* STARTED => RELOCATING
* RELOCATING => RELOCATING
*
* @param shard the shard to be assigned
* @param nodeId the nodeId this shard should initialize on or relocate from
* Moves a shard from unassigned to initialize state
*/
public void assign(ShardRouting shard, String nodeId) {
// state will not change if the shard is already initializing.
ShardRoutingState oldState = shard.state();
shard.assignToNode(nodeId);
public void initialize(ShardRouting shard, String nodeId) {
assert shard.unassigned() : shard;
shard.initialize(nodeId);
node(nodeId).add(shard);
if (oldState == ShardRoutingState.UNASSIGNED) {
inactiveShardCount++;
if (shard.primary()) {
inactivePrimaryCount++;
}
}

if (shard.state() == ShardRoutingState.RELOCATING) {
relocatingShards++;
inactiveShardCount++;
if (shard.primary()) {
inactivePrimaryCount++;
}
assignedShardsAdd(shard);
}
Expand All @@ -406,7 +385,8 @@ public ShardRouting relocate(ShardRouting shard, String nodeId) {
relocatingShards++;
shard.relocate(nodeId);
ShardRouting target = shard.buildTargetRelocatingShard();
assign(target, target.currentNodeId());
node(target.currentNodeId()).add(target);
assignedShardsAdd(target);
return target;
}

Expand Down
Expand Up @@ -350,32 +350,23 @@ public void writeTo(StreamOutput out) throws IOException {
void moveToUnassigned(UnassignedInfo unassignedInfo) {
ensureNotFrozen();
version++;
assert state != ShardRoutingState.UNASSIGNED;
assert state != ShardRoutingState.UNASSIGNED : this;
state = ShardRoutingState.UNASSIGNED;
currentNodeId = null;
relocatingNodeId = null;
this.unassignedInfo = unassignedInfo;
}

/**
* Assign this shard to a node.
*
* @param nodeId id of the node to assign this shard to
* Initializes an unassigned shard on a node.
*/
void assignToNode(String nodeId) {
void initialize(String nodeId) {
ensureNotFrozen();
version++;
if (currentNodeId == null) {
assert state == ShardRoutingState.UNASSIGNED;
state = ShardRoutingState.INITIALIZING;
currentNodeId = nodeId;
relocatingNodeId = null;
} else if (state == ShardRoutingState.STARTED) {
state = ShardRoutingState.RELOCATING;
relocatingNodeId = nodeId;
} else if (state == ShardRoutingState.RELOCATING) {
assert nodeId.equals(relocatingNodeId);
}
assert state == ShardRoutingState.UNASSIGNED : this;
assert relocatingNodeId == null : this;
state = ShardRoutingState.INITIALIZING;
currentNodeId = nodeId;
}

/**
Expand All @@ -386,7 +377,7 @@ void assignToNode(String nodeId) {
void relocate(String relocatingNodeId) {
ensureNotFrozen();
version++;
assert state == ShardRoutingState.STARTED;
assert state == ShardRoutingState.STARTED : this;
state = ShardRoutingState.RELOCATING;
this.relocatingNodeId = relocatingNodeId;
}
Expand All @@ -398,9 +389,9 @@ void relocate(String relocatingNodeId) {
void cancelRelocation() {
ensureNotFrozen();
version++;
assert state == ShardRoutingState.RELOCATING;
assert assignedToNode();
assert relocatingNodeId != null;
assert state == ShardRoutingState.RELOCATING : this;
assert assignedToNode() : this;
assert relocatingNodeId != null : this;

state = ShardRoutingState.STARTED;
relocatingNodeId = null;
Expand All @@ -424,7 +415,7 @@ void reinitializeShard() {
void moveToStarted() {
ensureNotFrozen();
version++;
assert state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING;
assert state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING : this;
relocatingNodeId = null;
restoreSource = null;
state = ShardRoutingState.STARTED;
Expand Down
Expand Up @@ -688,7 +688,7 @@ public int compare(ShardRouting o1,
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
}
routingNodes.assign(shard, routingNodes.node(minNode.getNodeId()).nodeId());
routingNodes.initialize(shard, routingNodes.node(minNode.getNodeId()).nodeId());
changed = true;
continue; // don't add to ignoreUnassigned
} else {
Expand Down Expand Up @@ -783,8 +783,7 @@ private boolean tryRelocateShard(Operation operation, ModelNode minNode, ModelNo
routingNodes.relocate(candidate, lowRoutingNode.nodeId());

} else {
assert candidate.unassigned();
routingNodes.assign(candidate, routingNodes.node(minNode.getNodeId()).nodeId());
routingNodes.initialize(candidate, routingNodes.node(minNode.getNodeId()).nodeId());
}
return true;

Expand Down
Expand Up @@ -225,7 +225,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
continue;
}
it.remove();
routingNodes.assign(shardRouting, routingNode.nodeId());
routingNodes.initialize(shardRouting, routingNode.nodeId());
if (shardRouting.primary()) {
// we need to clear the post allocation flag, since its an explicit allocation of the primary shard
// and we want to force allocate it (and create a new index for it)
Expand Down
Expand Up @@ -339,7 +339,7 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) {
// we found a match
changed = true;
// make sure we create one with the version from the recovered state
routingNodes.assign(new ShardRouting(shard, highestVersion), node.nodeId());
routingNodes.initialize(new ShardRouting(shard, highestVersion), node.nodeId());
unassignedIterator.remove();

// found a node, so no throttling, no "no", and break out of the loop
Expand All @@ -359,7 +359,7 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) {
// we found a match
changed = true;
// make sure we create one with the version from the recovered state
routingNodes.assign(new ShardRouting(shard, highestVersion), node.nodeId());
routingNodes.initialize(new ShardRouting(shard, highestVersion), node.nodeId());
unassignedIterator.remove();
}
} else {
Expand Down Expand Up @@ -514,7 +514,7 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) {
}
// we found a match
changed = true;
routingNodes.assign(shard, lastNodeMatched.nodeId());
routingNodes.initialize(shard, lastNodeMatched.nodeId());
unassignedIterator.remove();
}
} else if (hasReplicaData == false) {
Expand Down
Expand Up @@ -82,7 +82,7 @@ public void testFrozenOnRoutingTable() {
}

try {
routing.assignToNode("boom");
routing.initialize("boom");
fail("must be frozen");
} catch (IllegalStateException ex) {
// expected
Expand Down
Expand Up @@ -189,7 +189,7 @@ public void testStateTransitionMetaHandling() {
ShardRouting shard = TestShardRouting.newShardRouting("test", 1, null, null, null, true, ShardRoutingState.UNASSIGNED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
ShardRouting mutable = new ShardRouting(shard);
assertThat(mutable.unassignedInfo(), notNullValue());
mutable.assignToNode("test_node");
mutable.initialize("test_node");
assertThat(mutable.state(), equalTo(ShardRoutingState.INITIALIZING));
assertThat(mutable.unassignedInfo(), notNullValue());
mutable.moveToStarted();
Expand Down
Expand Up @@ -369,37 +369,37 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
switch (sr.id()) {
case 0:
if (sr.primary()) {
allocation.routingNodes().assign(sr, "node1");
allocation.routingNodes().initialize(sr, "node1");
} else {
allocation.routingNodes().assign(sr, "node0");
allocation.routingNodes().initialize(sr, "node0");
}
break;
case 1:
if (sr.primary()) {
allocation.routingNodes().assign(sr, "node1");
allocation.routingNodes().initialize(sr, "node1");
} else {
allocation.routingNodes().assign(sr, "node2");
allocation.routingNodes().initialize(sr, "node2");
}
break;
case 2:
if (sr.primary()) {
allocation.routingNodes().assign(sr, "node3");
allocation.routingNodes().initialize(sr, "node3");
} else {
allocation.routingNodes().assign(sr, "node2");
allocation.routingNodes().initialize(sr, "node2");
}
break;
case 3:
if (sr.primary()) {
allocation.routingNodes().assign(sr, "node3");
allocation.routingNodes().initialize(sr, "node3");
} else {
allocation.routingNodes().assign(sr, "node1");
allocation.routingNodes().initialize(sr, "node1");
}
break;
case 4:
if (sr.primary()) {
allocation.routingNodes().assign(sr, "node2");
allocation.routingNodes().initialize(sr, "node2");
} else {
allocation.routingNodes().assign(sr, "node0");
allocation.routingNodes().initialize(sr, "node0");
}
break;
}
Expand Down