Allocate primary shards based on allocation IDs #15281

Merged
merged 1 commit into from Dec 17, 2015

Projects

None yet

2 participants

@ywelsch
Contributor
ywelsch commented Dec 7, 2015
  • Add allocation IDs to TransportNodesListGatewayStartedShards action.
  • Use the above to assign a primary shard on recovery.
  • Also add allocation id to indices shard store response (/some_index/_shard_stores)

Relates to #14739

@bleskes bleskes was assigned by ywelsch Dec 7, 2015
@ywelsch ywelsch referenced this pull request Dec 7, 2015
Closed

Allocate primary shard based on allocation IDs #14739

7 of 7 tasks complete
@bleskes bleskes commented on an outdated diff Dec 8, 2015
.../org/elasticsearch/gateway/PrimaryShardAllocator.java
- NodesAndVersions nodesAndVersions = buildNodesAndVersions(shard, recoverOnAnyNode(indexSettings), allocation.getIgnoreNodes(shard.shardId()), shardState);
- logger.debug("[{}][{}] found {} allocations of {}, highest version: [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound, shard, nodesAndVersions.highestVersion);
+ Set<String> allocationIds = indexMetaData.activeAllocationIds(shard.id());
@bleskes
bleskes Dec 8, 2015 Member

call this lastActiveAllocationIds?

@bleskes bleskes commented on an outdated diff Dec 8, 2015
.../org/elasticsearch/gateway/PrimaryShardAllocator.java
@@ -161,11 +243,11 @@ private boolean isEnoughAllocationsFound(ShardRouting shard, IndexMetaData index
/**
* Based on the nodes and versions, build the list of yes/no/throttle nodes that the shard applies to.
@bleskes
bleskes Dec 8, 2015 Member

comment should be - Split the list of nodes to lists of yes/no/throttle nodes based on allocation deciders (versions are now irrelevant)

@ywelsch
Contributor
ywelsch commented Dec 9, 2015

Pushed changes related to our discussion:

  • Use allocation ids and index creation version to determine allocatedPostIndexCreate
  • Added integration test for #15241 and #14671
@bleskes bleskes and 1 other commented on an outdated diff Dec 14, 2015
.../admin/indices/shards/IndicesShardStoresResponse.java
@@ -115,9 +116,10 @@ private void writeTo(StreamOutput out) throws IOException {
private StoreStatus() {
}
- public StoreStatus(DiscoveryNode node, long version, Allocation allocation, Throwable storeException) {
+ public StoreStatus(DiscoveryNode node, long version, String allocationId, Allocation allocation, Throwable storeException) {
@bleskes
bleskes Dec 14, 2015 Member

That Allocation enum is really confusing in this context (I stumbled on this 3 times now). This is unrelated to this change, so I'm not suggesting we change it here - but do you have suggestions for a better name? maybe AllocatedAs ?

@ywelsch
ywelsch Dec 15, 2015 Contributor

What about AllocationStatus? The JSON field should also be adapted.

@bleskes
bleskes Dec 15, 2015 Member

I liked allocatedAs better :) (note that the name of the java JSON field is ALLOCATED :))

@bleskes bleskes commented on the diff Dec 14, 2015
...indices/shards/TransportIndicesShardStoresAction.java
@@ -213,7 +213,7 @@ void finish() {
* A shard exists/existed in a node only if shard state file exists in the node
*/
private boolean shardExistsInNode(final NodeGatewayStartedShards response) {
- return response.storeException() != null || response.version() != -1;
+ return response.storeException() != null || response.version() != -1 || response.allocationId() != null;
@bleskes
bleskes Dec 14, 2015 Member

I'm not sure response.allocationId() != null is OK. What happens when we allocate old indices that don't have allocationId serialized?

@ywelsch
ywelsch Dec 14, 2015 Contributor

Note that the condition just requires any of the fields to have a value, so it will work fine in the case where we do not have allocation ids. We say that the shard exists if we get any sensible information about it. At the moment we have a version whenever we have an allocation id. But if we would remove versions in the future, that might not hold anymore. Adding response.allocationId() != null just adds a little more safety.

@bleskes
bleskes Dec 14, 2015 Member

you are right. Brain circuit breaked - interpreted the || as && . All good.

@bleskes bleskes and 1 other commented on an outdated diff Dec 14, 2015
...earch/cluster/metadata/MetaDataIndexStateService.java
@@ -95,7 +94,7 @@ public ClusterState execute(ClusterState currentState) {
IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index);
for (IndexShardRoutingTable shard : indexRoutingTable) {
for (ShardRouting shardRouting : shard) {
- if (shardRouting.primary() == true && shardRouting.allocatedPostIndexCreate() == false) {
+ if (shardRouting.primary() == true && shardRouting.allocatedPostIndexCreate(indexMetaData) == false) {
@bleskes
bleskes Dec 14, 2015 Member

I think this protection can go away now? It's a protection against an index staying red when opening it (because now primary can be found). We now do a better job with the active allocation ids and we will just assign new empty primaries.

@ywelsch
ywelsch Dec 14, 2015 Contributor

agree.

@bleskes bleskes commented on an outdated diff Dec 14, 2015
.../org/elasticsearch/gateway/PrimaryShardAllocator.java
@@ -58,7 +61,15 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
while (unassignedIterator.hasNext()) {
ShardRouting shard = unassignedIterator.next();
- if (needToFindPrimaryCopy(shard) == false) {
+ if (shard.primary() == false) {
+ continue;
+ }
+
+ IndexMetaData indexMetaData = metaData.index(shard.getIndex());
@bleskes
bleskes Dec 14, 2015 Member

nit: make these final?

@bleskes bleskes commented on an outdated diff Dec 14, 2015
.../org/elasticsearch/gateway/PrimaryShardAllocator.java
@@ -70,13 +81,29 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
continue;
}
- IndexMetaData indexMetaData = metaData.index(shard.getIndex());
- Settings indexSettings = Settings.builder().put(settings).put(indexMetaData.getSettings()).build();
+ final boolean snapshotRestore = shard.restoreSource() != null;
+ Set<String> lastActiveAllocationIds = indexMetaData.activeAllocationIds(shard.id());
+
+ final NodesAndVersions nodesAndVersions;
+ final boolean enoughAllocationsFound;
+
+ if (lastActiveAllocationIds.isEmpty() && indexSettings.getIndexVersionCreated().before(Version.V_3_0_0)) {
+ // when we load an old index (after upgrading cluster) or restore a snapshot of an old index
+ // fall back to old version-based allocation mode
+ // Note that once the shard has been active, lastActiveAllocationIds will be non-empty
+ nodesAndVersions = buildNodesAndVersions(shard, recoverOnAnyNode(indexSettings), allocation.getIgnoreNodes(shard.shardId()), shardState);
+ logger.debug("[{}][{}] found {} allocations of {}, highest version: [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound, shard, nodesAndVersions.highestVersion);
@bleskes
bleskes Dec 14, 2015 Member

can we add something about this being an old index to the log message?

@bleskes bleskes commented on an outdated diff Dec 14, 2015
.../org/elasticsearch/gateway/PrimaryShardAllocator.java
@@ -70,13 +81,29 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
continue;
}
- IndexMetaData indexMetaData = metaData.index(shard.getIndex());
- Settings indexSettings = Settings.builder().put(settings).put(indexMetaData.getSettings()).build();
+ final boolean snapshotRestore = shard.restoreSource() != null;
+ Set<String> lastActiveAllocationIds = indexMetaData.activeAllocationIds(shard.id());
+
+ final NodesAndVersions nodesAndVersions;
+ final boolean enoughAllocationsFound;
+
+ if (lastActiveAllocationIds.isEmpty() && indexSettings.getIndexVersionCreated().before(Version.V_3_0_0)) {
@bleskes
bleskes Dec 14, 2015 Member

do we need the version check here? it's folded into allocatedPostIndexCreate() ? I think it will be simpler to read if we remove this from the if and add an assert on this, explaining why we expect it like that. Something like:

            if (lastActiveAllocationIds.isEmpty()) {
                assert indexSettings.getIndexVersionCreated().before(Version.V_3_0_0) : "trying to allocated a primary with an empty allocation id set, but index is new";
@bleskes bleskes commented on an outdated diff Dec 14, 2015
.../org/elasticsearch/gateway/PrimaryShardAllocator.java
- NodesAndVersions nodesAndVersions = buildNodesAndVersions(shard, recoverOnAnyNode(indexSettings), allocation.getIgnoreNodes(shard.shardId()), shardState);
- logger.debug("[{}][{}] found {} allocations of {}, highest version: [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound, shard, nodesAndVersions.highestVersion);
+ enoughAllocationsFound = isEnoughAllocationsFound(shard, indexMetaData, nodesAndVersions);
@bleskes
bleskes Dec 14, 2015 Member

can we rename isEnoughAllocationsFound to isEnoughVersionBasedAllocationsFound?

@bleskes bleskes commented on an outdated diff Dec 14, 2015
.../org/elasticsearch/gateway/PrimaryShardAllocator.java
- // this is an API allocation, ignore since we know there is no data...
- if (shard.allocatedPostIndexCreate() == false) {
- return false;
+ if (nodeShardState.storeException() == null) {
+ if (allocationId == null) {
+ logger.trace("[{}] on node [{}] has no allocation id in shard meta data", shard, nodeShardState.getNode());
+ } else {
+ logger.trace("[{}] on node [{}] has allocation id [{}] of shard", shard, nodeShardState.getNode(), allocationId);
@bleskes
bleskes Dec 14, 2015 Member

I'm fine with logging "allocation id [null] of shard" . Will save on the if :)

@bleskes bleskes commented on the diff Dec 14, 2015
...search/action/admin/indices/create/CreateIndexIT.java
@@ -285,4 +285,11 @@ public void testMappingConflictRootCause() throws Exception {
assertThat(messages.toString(), containsString("mapper [text] is used by multiple types"));
}
}
+
+ public void testRestartIndexCreationAfterFullClusterRestart() throws Exception {
+ client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get();
+ client().admin().indices().prepareCreate("test").setSettings(indexSettings()).get();
+ internalCluster().fullRestart();
+ ensureGreen("test");
@bleskes
bleskes Dec 14, 2015 Member

nice and clean :) another variant you may want to consider is creating an index with way more shards than nodes and see that we get to yellow (now we will wait for a quorum of copies)

@ywelsch
ywelsch Dec 15, 2015 Contributor

ok, will put this into PrimaryAllocationIT

@bleskes bleskes and 1 other commented on an outdated diff Dec 14, 2015
...org/elasticsearch/cluster/routing/AllocationIdIT.java
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
+@ESIntegTestCase.SuppressLocalMode
+public class AllocationIdIT extends ESIntegTestCase {
+
+ public void testDoNotAllowStaleReplicasToBePromotedToPrimary() throws Exception {
+ logger.info("--> starting 3 nodes, 1 master, 2 data");
+ // we are shutting down nodes - make sure we don't have 2 clusters if we test network
@bleskes
bleskes Dec 14, 2015 Member

I don't get the comment?

@ywelsch
ywelsch Dec 14, 2015 Contributor

copy pasta from another test ;-) I removed it.

@bleskes bleskes commented on an outdated diff Dec 14, 2015
...org/elasticsearch/cluster/routing/AllocationIdIT.java
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
+@ESIntegTestCase.SuppressLocalMode
+public class AllocationIdIT extends ESIntegTestCase {
+
+ public void testDoNotAllowStaleReplicasToBePromotedToPrimary() throws Exception {
+ logger.info("--> starting 3 nodes, 1 master, 2 data");
+ // we are shutting down nodes - make sure we don't have 2 clusters if we test network
+ String master = internalCluster().startMasterOnlyNodeAsync().get();
@bleskes
bleskes Dec 14, 2015 Member

startMasterOnlyNode?

@bleskes bleskes commented on an outdated diff Dec 14, 2015
...org/elasticsearch/cluster/routing/AllocationIdIT.java
+import org.elasticsearch.test.disruption.NetworkDisconnectPartition;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
+@ESIntegTestCase.SuppressLocalMode
+public class AllocationIdIT extends ESIntegTestCase {
@bleskes
bleskes Dec 14, 2015 Member

Can we call this PrimaryAllocationIT ?

@bleskes bleskes and 1 other commented on an outdated diff Dec 14, 2015
...org/elasticsearch/cluster/routing/AllocationIdIT.java
+ client(replicaNode).prepareIndex("test", "type1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
+
+ logger.info("--> shut down node that has new acknowledged document");
+ internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));
+
+ ensureStableCluster(1, master);
+
+ partition.stopDisrupting();
+
+ logger.info("--> waiting for node with old primary shard to rejoin the cluster");
+ ensureStableCluster(2, master);
+
+ logger.info("--> check that old primary shard does not get promoted to primary again");
+ // wait 5 seconds and check that we timed out
+ try {
+ client(master).admin().cluster()
@bleskes
bleskes Dec 14, 2015 Member

it's shame the test times out every time (and will this be slow). How about the following sequence:

  • stop distrupting
  • wait for a two node cluster (like now).
  • ensure that the index is still red (I know this is not bullet proof but if we do something wrong it will flag it enough times)
  • start a new data node (which will reuse the old data folder)
  • search and try to find the documents we just indexed.
@ywelsch
ywelsch Dec 15, 2015 Contributor

I'll refine point 3 a bit:

  • kick reroute
  • wait for all shard states to be fetched (assertBusy(() -> assertThat(internalCluster().getInstance(GatewayAllocator.class, master).getNumberOfInFlightFetch(), equalTo(0)));)
  • kick reroute a second time
  • check that all shards are unassigned
@bleskes bleskes commented on an outdated diff Dec 14, 2015
...elasticsearch/gateway/PrimaryShardAllocatorTests.java
@@ -59,25 +62,30 @@ public void buildTestAllocator() {
this.testAllocator = new TestAllocator();
}
- /**
- * Verifies that the canProcess method of primary allocation behaves correctly
- * and processes only the applicable shard.
- */
- public void testNoProcessReplica() {
- ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, false, ShardRoutingState.UNASSIGNED, 0, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
- assertThat(testAllocator.needToFindPrimaryCopy(shard), equalTo(false));
- }
-
- public void testNoProcessPrimayNotAllcoatedBefore() {
- ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, true, ShardRoutingState.UNASSIGNED, 0, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
- assertThat(testAllocator.needToFindPrimaryCopy(shard), equalTo(false));
+ public void testNoProcessPrimaryNotAllocatedBefore() {
+ MetaData metaData = MetaData.builder()
@bleskes
bleskes Dec 14, 2015 Member

maybe reuse the routingAllocationWithOnePrimaryNoReplicas method, but give a param whether it should see it as a recovery or as new? also, I think we want a variant where we are recovering, but the active allocation id is empty?

@bleskes bleskes commented on the diff Dec 14, 2015
...elasticsearch/gateway/PrimaryShardAllocatorTests.java
*/
public void testStoreException() {
- RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
- testAllocator.addData(node1, 3, new CorruptIndexException("test", "test"));
+ final RoutingAllocation allocation;
+ if (randomBoolean()) {
@bleskes
bleskes Dec 14, 2015 Member

we can one more legal state here - old index with allocation ids.

@bleskes
bleskes Dec 15, 2015 Member

argh. Hidden by github ui. all good.

@bleskes bleskes commented on the diff Dec 14, 2015
...elasticsearch/gateway/PrimaryShardAllocatorTests.java
@@ -112,8 +158,14 @@ public void testStoreException() {
* Tests that when there is a node to allocate the shard to, it will be allocated to it.
*/
public void testFoundAllocationAndAllocating() {
- RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
- testAllocator.addData(node1, 10);
+ final RoutingAllocation allocation;
+ if (randomBoolean()) {
@bleskes
bleskes Dec 14, 2015 Member

same comment about possible state.

@bleskes
bleskes Dec 15, 2015 Member

same here :(

@bleskes bleskes commented on the diff Dec 14, 2015
...elasticsearch/gateway/PrimaryShardAllocatorTests.java
@@ -126,8 +178,14 @@ public void testFoundAllocationAndAllocating() {
* it will be moved to ignore unassigned until it can be allocated to.
*/
public void testFoundAllocationButThrottlingDecider() {
- RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders());
- testAllocator.addData(node1, 10);
+ final RoutingAllocation allocation;
+ if (randomBoolean()) {
@bleskes
bleskes Dec 14, 2015 Member

same...

@bleskes bleskes commented on an outdated diff Dec 14, 2015
...elasticsearch/gateway/PrimaryShardAllocatorTests.java
@@ -149,11 +213,11 @@ public void testFoundAllocationButNoDecider() {
}
/**
- * Tests that the highest version node is chosed for allocation.
+ * Tests that the highest version node is chosen for allocation.
*/
public void testAllocateToTheHighestVersion() {
@bleskes
bleskes Dec 14, 2015 Member

rename to testAllocateToTheHighestVersionOnLegacyIndex?

@bleskes bleskes commented on an outdated diff Dec 14, 2015
...elasticsearch/gateway/PrimaryShardAllocatorTests.java
@@ -190,7 +255,7 @@ public void testRestoreIgnoresNoNodesToAllocate() {
*/
public void testEnoughCopiesFoundForAllocation() {
@bleskes
bleskes Dec 14, 2015 Member

same - add a OnLegacyIndex suffix?

@bleskes bleskes commented on an outdated diff Dec 14, 2015
...elasticsearch/gateway/PrimaryShardAllocatorTests.java
+ new AsyncShardFetch.FetchResult(shardId, data, new HashSet<>(), ignoredNodes);
+
+ List<DiscoveryNode> nodes = testAllocator.buildAllocationIdBasedNodes(shard, false, false, ignoredNodes,
+ Collections.singleton("id1"), fetches).nodes;
+ assertThat(nodes.size(), equalTo(1));
+ assertThat(nodes, contains(node1));
+
+ nodes = testAllocator.buildAllocationIdBasedNodes(shard, false, true, ignoredNodes,
+ Collections.singleton("id1"), fetches).nodes;
+ assertThat(nodes.size(), equalTo(2));
+ // node3 should be last here
+ assertThat(nodes, contains(node1, node3));
+ }
+
+ /**
+ * if no activeAllocationIds are provided, we fallback to version-based mode
@bleskes
bleskes Dec 14, 2015 Member

I don't follow the comment?

@bleskes bleskes commented on the diff Dec 14, 2015
...t/java/org/elasticsearch/gateway/QuorumGatewayIT.java
@@ -51,72 +47,12 @@ protected int numberOfReplicas() {
return 2;
}
- public void testChangeInitialShardsRecovery() throws Exception {
@bleskes
bleskes Dec 14, 2015 Member

can we deprecate the INDEX_RECOVERY_INITIAL_SHARDS setting? also add a breaking change note + remove docs for it? I'm fine with doing this in another PR, but we need to update the meta issue

@bleskes
Member
bleskes commented Dec 14, 2015

Thanks @ywelsch . I think this looks great. I left some comments and also want to ping @dakrone to discuss the recover on any node option. I hope we can get this simpler...

@ywelsch
Contributor
ywelsch commented Dec 15, 2015

@bleskes I pushed a new set of changes that address your comments.

@bleskes bleskes commented on an outdated diff Dec 15, 2015
.../org/elasticsearch/gateway/PrimaryShardAllocator.java
continue;
}
- AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation);
+ final IndexMetaData indexMetaData = metaData.index(shard.getIndex());
+ final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings, Collections.emptyList());
+
+ if (shard.allocatedPostIndexCreate(indexMetaData) == false) {
+ // when we create a fresh index
+ continue;
+ }
+
+ if (recoverOnAnyNode(indexSettings)) {
@bleskes
bleskes Dec 15, 2015 Member

can we only skip allocation if we don't find any copy?

@bleskes bleskes commented on an outdated diff Dec 15, 2015
.../org/elasticsearch/gateway/PrimaryShardAllocator.java
- return false;
+ if (ignoreNodes.contains(node.id())) {
+ continue;
+ }
+
+ if (nodeShardState.storeException() == null) {
+ logger.trace("[{}] on node [{}] has allocation id [{}] of shard", shard, nodeShardState.getNode(), allocationId);
+ } else {
+ logger.trace("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", nodeShardState.storeException(), shard, nodeShardState.getNode(), allocationId);
+ allocationId = null;
+ }
+
+ if (allocationId != null) {
+ if (lastActiveAllocationIds.contains(allocationId)) {
+ matchingNodes.add(node);
+ if (nodeShardState.version() > highestVersion) {
@bleskes
bleskes Dec 15, 2015 Member

highestVersion = Math.max(highestVersion, nodeShardState.version()); is faster and cleaner

@bleskes bleskes commented on an outdated diff Dec 15, 2015
.../org/elasticsearch/gateway/PrimaryShardAllocator.java
@@ -108,23 +142,55 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
return changed;
}
- /**
- * Does the shard need to find a primary copy?
- */
- boolean needToFindPrimaryCopy(ShardRouting shard) {
- if (shard.primary() == false) {
- return false;
- }
+ protected NodesAndVersions buildAllocationIdBasedNodes(ShardRouting shard, boolean matchAnyAllocationId, Set<String> ignoreNodes,
@bleskes
bleskes Dec 15, 2015 Member

can we add java docs? I'm mostly after an explanation of matchAnyAllocationId

@bleskes bleskes commented on an outdated diff Dec 15, 2015
.../org/elasticsearch/gateway/PrimaryShardAllocator.java
*/
- private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, NodesAndVersions nodesAndVersions) {
+ NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, List<DiscoveryNode> nodes) {
@bleskes
bleskes Dec 15, 2015 Member

why was the private removed?

@bleskes bleskes commented on the diff Dec 15, 2015
...lasticsearch/cluster/routing/PrimaryAllocationIT.java
+ client(replicaNode).prepareIndex("test", "type1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
+
+ logger.info("--> shut down node that has new acknowledged document");
+ internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));
+
+ ensureStableCluster(1, master);
+
+ partition.stopDisrupting();
+
+ logger.info("--> waiting for node with old primary shard to rejoin the cluster");
+ ensureStableCluster(2, master);
+
+ logger.info("--> check that old primary shard does not get promoted to primary again");
+ // kick reroute and wait for all shard states to be fetched
+ client(master).admin().cluster().prepareReroute().get();
+ assertBusy(() -> assertThat(internalCluster().getInstance(GatewayAllocator.class, master).getNumberOfInFlightFetch(), equalTo(0)));
@bleskes
bleskes Dec 15, 2015 Member

nice one.

@bleskes bleskes commented on the diff Dec 15, 2015
...elasticsearch/gateway/PrimaryShardAllocatorTests.java
@@ -59,25 +61,51 @@ public void buildTestAllocator() {
this.testAllocator = new TestAllocator();
}
- /**
- * Verifies that the canProcess method of primary allocation behaves correctly
- * and processes only the applicable shard.
- */
- public void testNoProcessReplica() {
- ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, false, ShardRoutingState.UNASSIGNED, 0, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
- assertThat(testAllocator.needToFindPrimaryCopy(shard), equalTo(false));
+ public void testNoProcessPrimaryNotAllocatedBefore() {
+ final RoutingAllocation allocation;
+ if (randomBoolean()) {
+ allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomBoolean(), Version.CURRENT);
+ } else {
+ allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), true, Version.V_2_1_0);
@bleskes
bleskes Dec 15, 2015 Member

new is not possible with an older version...

@ywelsch
ywelsch Dec 15, 2015 Contributor

Edge case: force allocation of a primary for a pre-3.0 index with reroute command.

@bleskes
bleskes Dec 16, 2015 Member

I think we can work around it on the node side of things - it should expect to find an existing copy.

@bleskes
Member
bleskes commented Dec 15, 2015

This looks great. Left some minor comment and one important one about the recover on any node settings. Also let's have another discussion with @dakrone about the failing test.

@ywelsch
Contributor
ywelsch commented Dec 16, 2015

Pushed another set of changes, dealing with recover_on_any_node.

@bleskes bleskes commented on an outdated diff Dec 17, 2015
.../org/elasticsearch/gateway/PrimaryShardAllocator.java
- logger.debug("[{}][{}] found {} allocations of {}, highest version: [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound, shard, nodesAndVersions.highestVersion);
+ final Set<String> lastActiveAllocationIds = indexMetaData.activeAllocationIds(shard.id());
+ final boolean snapshotRestore = shard.restoreSource() != null;
+ final boolean recoverOnAnyNode = recoverOnAnyNode(indexSettings);
+
+ final NodesAndVersions nodesAndVersions;
+ final boolean enoughAllocationsFound;
+
+ if (lastActiveAllocationIds.isEmpty()) {
+ assert indexSettings.getIndexVersionCreated().before(Version.V_3_0_0) : "trying to allocated a primary with an empty allocation id set, but index is new";
+ // when we load an old index (after upgrading cluster) or restore a snapshot of an old index
+ // fall back to old version-based allocation mode
+ // Note that once the shard has been active, lastActiveAllocationIds will be non-empty
+ nodesAndVersions = buildNodesAndVersions(shard, snapshotRestore || recoverOnAnyNode, allocation.getIgnoreNodes(shard.shardId()), shardState);
+ if (snapshotRestore || recoverOnAnyNode) {
+ enoughAllocationsFound = nodesAndVersions.nodes.isEmpty() == false;
@bleskes
bleskes Dec 17, 2015 Member

nit: can we be consistent and use allocationsFound > 0?

@bleskes bleskes commented on an outdated diff Dec 17, 2015
.../org/elasticsearch/gateway/PrimaryShardAllocator.java
+ Set<String> lastActiveAllocationIds, AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState) {
+ List<DiscoveryNode> matchingNodes = new ArrayList<>();
+ List<DiscoveryNode> nonMatchingNodes = new ArrayList<>();
+ long highestVersion = -1;
+ for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
+ DiscoveryNode node = nodeShardState.getNode();
+ String allocationId = nodeShardState.allocationId();
+
+ if (ignoreNodes.contains(node.id())) {
+ continue;
+ }
+
+ if (nodeShardState.storeException() == null) {
+ if (allocationId == null && nodeShardState.version() != -1) {
+ // old shard with no allocation id, assign dummy so that it gets added below in case of matchAnyShard
+ allocationId = "dummy";
@bleskes
bleskes Dec 17, 2015 Member

can we use "_n/a_"?

@bleskes bleskes commented on an outdated diff Dec 17, 2015
docs/reference/migration/migrate_3_0.asciidoc
@@ -472,3 +473,15 @@ from `OsStats.Cpu#getPercent`.
=== Fields option
Only stored fields are retrievable with this option.
The fields option won't be able to load non stored fields from _source anymore.
+
+[[breaking_30_allocation]]
+=== Primary shard allocation
+
+Previously, primary shards were only assigned if a quorum of shard copies were found (configurable using
+`index.recovery.initial_shards`, now deprecated). In case where a primary had only a single replica, quorum was defined
+to be a single shard. This meant that any shard copy of an index with replication factor 1 could become primary, even it
+was a stale copy of the data on disk. This is fixed now by introducing allocation IDs.
@bleskes
bleskes Dec 17, 2015 Member

This is now fixed by using allocation IDs. (the are already there)

@bleskes bleskes commented on an outdated diff Dec 17, 2015
.../reference/modules/cluster/shards_allocation.asciidoc
@@ -22,9 +22,8 @@ Enable or disable allocation for specific kinds of shards:
This setting does not affect the recovery of local primary shards when
restarting a node. A restarted node that has a copy of an unassigned primary
-shard will recover that primary immediately, assuming that the
-<<index.recovery.initial_shards,`index.recovery.initial_shards`>> setting is
-satisfied.
+shard will recover that primary immediately, assuming that its allocation id matches
+one of the active allocation ids in the cluster.
@bleskes
bleskes Dec 17, 2015 Member

cluster state.

@bleskes bleskes commented on an outdated diff Dec 17, 2015
docs/reference/migration/migrate_3_0.asciidoc
@@ -472,3 +473,15 @@ from `OsStats.Cpu#getPercent`.
=== Fields option
Only stored fields are retrievable with this option.
The fields option won't be able to load non stored fields from _source anymore.
+
+[[breaking_30_allocation]]
+=== Primary shard allocation
+
+Previously, primary shards were only assigned if a quorum of shard copies were found (configurable using
+`index.recovery.initial_shards`, now deprecated). In case where a primary had only a single replica, quorum was defined
+to be a single shard. This meant that any shard copy of an index with replication factor 1 could become primary, even it
+was a stale copy of the data on disk. This is fixed now by introducing allocation IDs.
+
+Allocation IDs assign unique identifiers to shard copies. This allows the cluster to differentiate between multiple
+copies of the same data and track which shards have been active, so that after a cluster restart, shard copies
+containing only the most recent data can become primaries.
@bleskes
bleskes Dec 17, 2015 Member

I miss a note here about the change in recover on any node?

@bleskes
Member
bleskes commented Dec 17, 2015

LGTM. Left some extremely minor comments. No need for another review. Just merge after they are addressed. Thanks @ywelsch ! I can't tell you how happy I am for having this.

@ywelsch ywelsch merged commit 8f14b10 into elastic:master Dec 17, 2015

1 check passed

CLA Commit author is a member of Elasticsearch
Details
@bleskes bleskes added a commit that referenced this pull request Apr 7, 2016
@bleskes bleskes Update resliency page
#14252 , #7572 , #15900, #12573, #14671, #15281 and #9126 have all been closed/merged and will be part of 5.0.0.
557a3d1
@bleskes bleskes referenced this pull request Apr 7, 2016
Merged

Update resliency page #17586

@bleskes bleskes added a commit that referenced this pull request Apr 7, 2016
@bleskes bleskes Update resiliency page (#17586)
#14252 , #7572 , #15900, #12573, #14671, #15281 and #9126 have all been closed/merged and will be part of 5.0.0.
8eee28e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment