New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make primary-replica resync failures less lenient #28534
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not safe as is, and requires another important prerequisite step, namely for ReplicationOperation not to be lenient against exceptions that match TransportActions.isShardNotAvailableException(e)
.
Assume that a primary-replica resync fails the replica shard but does not mark it as stale. While the node that has the replica is removing the failed shard, a replication request comes from the primary, and throws a ShardNotFoundException on the replica. If that exception just gets ignored and the indexing request acknowledged, then this can potentially cause data loss as the replica shard is still in the in-sync set.
@ywelsch Thank you for the great input. I've updated the PR to strictly handle all exceptions in the write and resync proxy. Please have another look. |
@dnhatn I'd prefer https://github.com/elastic/elasticsearch/pull/28534/files#diff-adf05e61cbc020ee1ff99be6c50fd11fL189 to be in it's own PR. It's a Big Thing ™️ . |
Currently the master node logs a warning message whenever it receives a failed shard request. However, this can be noisy because - Multiple failed shard requests can be issued for a single shard - Failed shard requests can be still issued for an already failed shard This commit moves the log-warn to AllocationService in which the failing shard action actually happens. This is another prerequisite step in order to not ignore the shard not-available exceptions in the replication. Relates #28534
The shard not-available exceptions are currently ignored in the replication as the best effort avoids failing not-yet-ready shards. However these exceptions can also happen from fully active shards. If this is the case, we may have skipped important failures from replicas. Since #28049, only fully initialized shards are received write requests. This restriction allows us to handle all exceptions in the replication. There is a side-effect with this change. If a replica retries its peer recovery second time after being tracked in the replication group, it can receive replication requests even though it's not-yet-ready. That shard may be failed and allocated to another node even though it has a good lucene index on that node. This PR does not change the way we report replication errors to users, hence the shard not-available exceptions won't be reported as before. Relates #28049 Relates #28534
Currently the master node logs a warning message whenever it receives a failed shard request. However, this can be noisy because - Multiple failed shard requests can be issued for a single shard - Failed shard requests can be still issued for an already failed shard This commit moves the log-warn to AllocationService in which the failing shard action actually happens. This is another prerequisite step in order to not ignore the shard not-available exceptions in the replication. Relates #28534
I've merged the prerequisites. Would you please take another look? Thank you. |
The shard not-available exceptions are currently ignored in the replication as the best effort avoids failing not-yet-ready shards. However these exceptions can also happen from fully active shards. If this is the case, we may have skipped important failures from replicas. Since #28049, only fully initialized shards are received write requests. This restriction allows us to handle all exceptions in the replication. There is a side-effect with this change. If a replica retries its peer recovery second time after being tracked in the replication group, it can receive replication requests even though it's not-yet-ready. That shard may be failed and allocated to another node even though it has a good lucene index on that node. This PR does not change the way we report replication errors to users, hence the shard not-available exceptions won't be reported as before. Relates #28049 Relates #28534
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of mucking with the engine, it might be nicer to setup a network disruption between the newly promoted primary and the replica, see also PrimaryAllocationIT. Maybe even put the test into that class, I think it would fit there very well.
* Replica shards which fail to execute these resync operation will be failed but won't be marked as stale | ||
* (eg. keep in the in sync set). This is a best-effort to avoid marking shards as stale during cluster restart. | ||
*/ | ||
class RsyncActionReplicasProxy extends ReplicasProxy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean Resync, not Rsync ;-)
/** | ||
* A proxy for primary-replica resync operations which are performed on replicas when a new primary is promoted. | ||
* Replica shards which fail to execute these resync operation will be failed but won't be marked as stale | ||
* (eg. keep in the in sync set). This is a best-effort to avoid marking shards as stale during cluster restart. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the comment "(eg. keep in the in sync set)". Also why is this best-effort now? By failing the shard, we're forcing the shard copies to resync.
@@ -83,8 +85,8 @@ protected ResyncReplicationResponse newResponseInstance() { | |||
|
|||
@Override | |||
protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { | |||
// We treat the resync as best-effort for now and don't mark unavailable shard copies as stale. | |||
return new ReplicasProxy(primaryTerm); | |||
// We treat the resync as best-effort for now and don't mark unavailable and failed shard copies as stale. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the crucial part is that we're now failing shards, forcing another resync.
final AtomicReference<String> failedNode = new AtomicReference<>(); | ||
for (String replicaNode : replicaNodes) { | ||
getEngine(replicaNode, shardId).setPreIndexingInterceptor((op) -> { | ||
if (failedNode.compareAndSet(null, replicaNode)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not triggered on the new primary?
@@ -384,41 +384,16 @@ void run() { | |||
@Override | |||
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, | |||
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) { | |||
|
|||
logger.warn((org.apache.logging.log4j.util.Supplier<?>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interested in creating a PR to remove all those ugly casts? Jason will love it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do :)
@ywelsch I've addressed your comments. Can you have another look? Thank you. |
String master = internalCluster().startMasterOnlyNode(Settings.EMPTY); | ||
final int numberOfReplicas = between(2, 3); | ||
assertAcked( | ||
prepareCreate("test", Settings.builder().put(indexSettings()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start the data node first? There's no need to wait on yellow then (index creation waits for primaries to be allocated by default).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ywelsch Sorry, I did not see your second comment. This was addressed but the change (eg. removing ensureYellow) was several lines away from the comment, GH did not pick it up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 0329f5b
final Set<String> connectedReplicas = Sets.difference(replicaNodes, isolatedReplicas); | ||
final Set<String> connectedNodes = Sets.newHashSet(master, oldPrimary); | ||
connectedNodes.addAll(connectedReplicas); | ||
NetworkDisruption partition = new NetworkDisruption(new TwoPartitions(connectedNodes, isolatedReplicas), new NetworkDisconnect()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it's easier just to isolate some of the replicas among each other, i.e., take a subset of replicas and isolate them from the remaining replicas. This means that all replicas stay connected to the master.
assertThat(state.routingTable().shardRoutingTable(shardId).activeShards(), | ||
anyOf(hasSize(replicasSide1.size()), hasSize(replicasSide2.size()))); | ||
assertThat(state.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1)); | ||
}, 2, TimeUnit.MINUTES); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why 2 minutes? This sounds awfully long? Why is this taking so long?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was pessimistic. This test should not take too long.
assertBusy(() -> { | ||
ClusterState state = client(master).admin().cluster().prepareState().get().getState(); | ||
assertThat(state.routingTable().shardRoutingTable(shardId).activeShards(), | ||
anyOf(hasSize(replicasSide1.size()), hasSize(replicasSide2.size()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe, to be more precise, it would be good to check the partition that included the new primary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Today, failures from the primary-replica resync are ignored as the best effort to not mark shards as stale during the cluster restart. However this can be problematic if replicas failed to execute resync operations but just fine in the subsequent write operations. When this happens, replica will miss some operations from the new primary. There are some implications if the local checkpoint on replica can't advance because of the missing operations. 1. The global checkpoint won't advance - this causes both primary and replicas keep many index commits 2. Engine on replica won't flush periodically because uncommitted stats is calculated based on the local checkpoint 3. Replica can use a large number of bitsets to keep track operations seqno However we can prevent this issue but still reserve the best-effort by failing replicas which fail to execute resync operations but not mark them as stale. We have prepared to the required infrastructure in #28049 and #28054 for this change. Relates #24841
Today, failures from the primary-replica resync are ignored as the best effort to not mark shards as stale during the cluster restart. However this can be problematic if replicas failed to execute resync operations but just fine in the subsequent write operations. When this happens, replica will miss some operations from the new primary. There are some implications if the local checkpoint on replica can't advance because of the missing operations. 1. The global checkpoint won't advance - this causes both primary and replicas keep many index commits 2. Engine on replica won't flush periodically because uncommitted stats is calculated based on the local checkpoint 3. Replica can use a large number of bitsets to keep track operations seqno However we can prevent this issue but still reserve the best-effort by failing replicas which fail to execute resync operations but not mark them as stale. We have prepared to the required infrastructure in elastic#28049 and elastic#28054 for this change. Relates elastic#24841
* master: (28 commits) Maybe die before failing engine (elastic#28973) Remove special handling for _all in nodes info Remove Booleans use from XContent and ToXContent (elastic#28768) Update Gradle Testing Docs (elastic#28970) Make primary-replica resync failures less lenient (elastic#28534) Remove temporary file 10_basic.yml~ Use different pipeline id in test. (pipelines do not get removed between tests extending from ESIntegTestCase) Use fixture to test the repository-gcs plugin (elastic#28788) Use String.join() to describe a list of tasks (elastic#28941) Fixed incorrect test try-catch statement Plugins: Consolidate plugin and module loading code (elastic#28815) percolator: Take `matchAllDocs` and `verified` of the sub result into account when analyzing a function_score query. Build: Remove rest tests on archive distribution projects (elastic#28952) Remove FastStringReader in favor of vanilla StringReader (elastic#28944) Remove FastCharArrayReader and FastCharArrayWriter (elastic#28951) Continue registering pipelines after one pipeline parse failure. (elastic#28752) Build: Fix ability to ignore when no tests are run (elastic#28930) [rest-api-spec] update doc link for /_rank_eval Switch XContentBuilder from BytesStreamOutput to ByteArrayOutputStream (elastic#28945) Factor UnknownNamedObjectException into its own class (elastic#28931) ...
Today, failures from the primary-replica resync are ignored as the best effort to not mark shards as stale during the cluster restart. However this can be problematic if replicas failed to execute resync operations but just fine in the subsequent write operations. When this happens, replica will miss some operations from the new primary. There are some implications if the local checkpoint on replica can't advance because of the missing operations.
However we can prevent this issue but still reserve the best-effort by failing replicas which fail to execute resync operations but not mark them as stale. We have prepared to the required infrastructure in #28049 and #28054 for this change.
Relates #24841