Skip to content

Commit

Permalink
Limit shard realocation retries (#90296)
Browse files Browse the repository at this point in the history
This change ensures that elasticsearch would not indefinitely retry relocating shard if operation fails.
  • Loading branch information
idegtiarenko committed Sep 27, 2022
1 parent 09eafed commit 24cf871
Show file tree
Hide file tree
Showing 17 changed files with 466 additions and 139 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/90296.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 90296
summary: Limit shard realocation retries
area: Allocation
type: enhancement
issues: []
35 changes: 28 additions & 7 deletions docs/reference/search/search-shards.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,55 +87,70 @@ The API returns the following result:
{
"index": "my-index-000001",
"node": "JklnKbD7Tyqi9TP3_Q_tBg",
"relocating_node": null,
"primary": true,
"shard": 0,
"state": "STARTED",
"allocation_id": {"id":"0TvkCyF7TAmM1wHP4a42-A"},
"relocating_node": null
"relocation_failure_info" : {
"failed_attempts" : 0
}
}
],
[
{
"index": "my-index-000001",
"node": "JklnKbD7Tyqi9TP3_Q_tBg",
"relocating_node": null,
"primary": true,
"shard": 1,
"state": "STARTED",
"allocation_id": {"id":"fMju3hd1QHWmWrIgFnI4Ww"},
"relocating_node": null
"relocation_failure_info" : {
"failed_attempts" : 0
}
}
],
[
{
"index": "my-index-000001",
"node": "JklnKbD7Tyqi9TP3_Q_tBg",
"relocating_node": null,
"primary": true,
"shard": 2,
"state": "STARTED",
"allocation_id": {"id":"Nwl0wbMBTHCWjEEbGYGapg"},
"relocating_node": null
"relocation_failure_info" : {
"failed_attempts" : 0
}
}
],
[
{
"index": "my-index-000001",
"node": "JklnKbD7Tyqi9TP3_Q_tBg",
"relocating_node": null,
"primary": true,
"shard": 3,
"state": "STARTED",
"allocation_id": {"id":"bU_KLGJISbW0RejwnwDPKw"},
"relocating_node": null
"relocation_failure_info" : {
"failed_attempts" : 0
}
}
],
[
{
"index": "my-index-000001",
"node": "JklnKbD7Tyqi9TP3_Q_tBg",
"relocating_node": null,
"primary": true,
"shard": 4,
"state": "STARTED",
"allocation_id": {"id":"DMs7_giNSwmdqVukF7UydA"},
"relocating_node": null
"relocation_failure_info" : {
"failed_attempts" : 0
}
}
]
]
Expand Down Expand Up @@ -171,22 +186,28 @@ The API returns the following result:
{
"index": "my-index-000001",
"node": "JklnKbD7Tyqi9TP3_Q_tBg",
"relocating_node": null,
"primary": true,
"shard": 2,
"state": "STARTED",
"allocation_id": {"id":"fMju3hd1QHWmWrIgFnI4Ww"},
"relocating_node": null
"relocation_failure_info" : {
"failed_attempts" : 0
}
}
],
[
{
"index": "my-index-000001",
"node": "JklnKbD7Tyqi9TP3_Q_tBg",
"relocating_node": null,
"primary": true,
"shard": 3,
"state": "STARTED",
"allocation_id": {"id":"0TvkCyF7TAmM1wHP4a42-A"},
"relocating_node": null
"relocation_failure_info" : {
"failed_attempts" : 0
}
}
]
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.index.shard.IndexShardState.CLOSED;
Expand Down Expand Up @@ -137,6 +138,32 @@ public void beforeIndexCreated(Index index, Settings indexSettings) {
assertThat(state.nodes().resolveNode(shard.get(0).currentNodeId()).getName(), Matchers.equalTo(node1));
}

public void testRelocationFailureNotRetriedForever() {
String node1 = internalCluster().startNode();
client().admin()
.indices()
.prepareCreate("index1")
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))
.get();
ensureGreen("index1");
for (int i = 0; i < 2; i++) {
internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, internalCluster().startNode())
.setNewDelegate(new IndexShardStateChangeListener() {
@Override
public void beforeIndexCreated(Index index, Settings indexSettings) {
throw new RuntimeException("FAIL");
}
});
}
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings("index1")
.setSettings(Settings.builder().put(INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", node1))
);
ensureGreen("index1");
}

public void testIndexStateShardChanged() throws Throwable {
// start with a single node
String node1 = internalCluster().startNode();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.routing;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Holds additional information as to why the shard failed to relocate.
*/
public record RelocationFailureInfo(int failedRelocations) implements ToXContentFragment, Writeable {

public static final RelocationFailureInfo NO_FAILURES = new RelocationFailureInfo(0);

public RelocationFailureInfo {
assert failedRelocations >= 0 : "Expect non-negative failures count, got: " + failedRelocations;
}

public static RelocationFailureInfo readFrom(StreamInput in) throws IOException {
int failures = in.readVInt();
return failures == 0 ? NO_FAILURES : new RelocationFailureInfo(failures);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(failedRelocations);
}

public RelocationFailureInfo incFailedRelocations() {
return new RelocationFailureInfo(failedRelocations + 1);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("relocation_failure_info");
builder.field("failed_attempts", failedRelocations);
builder.endObject();
return builder;
}

@Override
public String toString() {
return "failed_attempts[" + failedRelocations + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ default void relocationStarted(ShardRouting startedShard, ShardRouting targetRel
*/
default void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) {}

/**
* Called when a relocating shard's failure information was updated
*/
default void relocationFailureInfoUpdated(ShardRouting relocatedShard, RelocationFailureInfo relocationFailureInfo) {}

/**
* Called when a shard is failed or cancelled.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -990,30 +990,6 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R
ignored.add(shard);
}

public void resetFailedAllocationCounter(RoutingChangesObserver routingChangesObserver) {
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = iterator();
while (unassignedIterator.hasNext()) {
ShardRouting shardRouting = unassignedIterator.next();
UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
unassignedIterator.updateUnassigned(
new UnassignedInfo(
unassignedInfo.getNumFailedAllocations() > 0 ? UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(),
unassignedInfo.getMessage(),
unassignedInfo.getFailure(),
0,
unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(),
unassignedInfo.isDelayed(),
unassignedInfo.getLastAllocationStatus(),
Collections.emptySet(),
unassignedInfo.getLastAllocatedNodeId()
),
shardRouting.recoverySource(),
routingChangesObserver
);
}
}

public class UnassignedIterator implements Iterator<ShardRouting>, ExistingShardsAllocator.UnassignedAllocationHandler {

private final ListIterator<ShardRouting> iterator;
Expand Down Expand Up @@ -1293,6 +1269,46 @@ private void ensureMutable() {
}
}

public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) {
final var unassignedIterator = unassigned().iterator();
while (unassignedIterator.hasNext()) {
ShardRouting shardRouting = unassignedIterator.next();
UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
unassignedIterator.updateUnassigned(
new UnassignedInfo(
unassignedInfo.getNumFailedAllocations() > 0 ? UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(),
unassignedInfo.getMessage(),
unassignedInfo.getFailure(),
0,
unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(),
unassignedInfo.isDelayed(),
unassignedInfo.getLastAllocationStatus(),
Collections.emptySet(),
unassignedInfo.getLastAllocatedNodeId()
),
shardRouting.recoverySource(),
routingChangesObserver
);
}

for (RoutingNode routingNode : this) {
var shardsWithRelocationFailures = new ArrayList<ShardRouting>();
for (ShardRouting shardRouting : routingNode) {
if (shardRouting.relocationFailureInfo() != null && shardRouting.relocationFailureInfo().failedRelocations() > 0) {
shardsWithRelocationFailures.add(shardRouting);
}
}

for (ShardRouting original : shardsWithRelocationFailures) {
ShardRouting updated = original.updateRelocationFailure(RelocationFailureInfo.NO_FAILURES);
routingNode.update(original, updated);
assignedShardsRemove(original);
assignedShardsAdd(updated);
}
}
}

/**
* Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from
* the first node, then the first shard of the second node, etc. until one shard from each node has been returned.
Expand Down

0 comments on commit 24cf871

Please sign in to comment.