Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove PRRLs before performing file-based recovery #43928

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
addRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener);
}

public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
removeRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), listener);
}

/**
* Source for peer recovery retention leases; see {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
*/
Expand Down Expand Up @@ -498,9 +502,18 @@ public synchronized void renewPeerRecoveryRetentionLeases() {
final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting));
if (retentionLease != null) {
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting),
Math.max(0L, checkpointState.globalCheckpoint + 1L),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
final long newRetainedSequenceNumber = Math.max(0L, checkpointState.globalCheckpoint + 1L);
if (retentionLease.retainingSequenceNumber() <= newRetainedSequenceNumber) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we want to renew just because of timestamp (but copy is not currently tracked)? In that case, this here will be false and we won't renew.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow. If this condition is false then we can't renew because the lease can only advance. Or, put differently, why would we want to renew a lease for a copy that we're not tracking? Leases for inactive copies should be allowed to expire if the copy doesn't become active within the timeout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we not explicitly make sure not to renew those shards then that are not tracked?

Given that newRetainedSequenceNumber is always >= 0, could there be a case where we continuously extend a lease for a shard copy that is not tracked but had an existing retainingSeqNumber of 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if there's a shard that stays in the routing table but which never becomes tracked, which I don't think can happen (assuming that recoveries eventually terminate at least, thanks to the MaxRetryAllocationDecider).

Note that we don't expire leases for assigned shards anyway, tracked or not, because we create the lease in peer recovery before initiating tracking, so if we skipped untracked shards here then the lease would continue to exist until it had expired and the shard was no longer in the routing table.

Also, if we do skip untracked shards here then you'd potentially have a shard on which tracking was just initiated with a really old lease; if that shard failed before the next retention lease sync then its lease could expire, triggering another file-based recovery when in fact an ops-based recovery would have been worth attempting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

head spin

renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), newRetainedSequenceNumber,
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
} else {
// the retention lease is tied to the node, not the shard copy, so it's possible a copy was removed and now
// we are in the process of recovering it again. The recovery process will fix the lease before initiating
// tracking on this copy:
assert checkpointState.tracked == false
&& checkpointState.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO :
"cannot renew " + retentionLease + " according to " + checkpointState + " for " + shardRouting;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2503,6 +2503,11 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
}

public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) {
assert assertPrimaryMode();
replicationTracker.removePeerRecoveryRetentionLease(nodeId, listener);
}

class ShardEventListener implements Engine.EventListener {
private final CopyOnWriteArrayList<Consumer<ShardFailure>> delegates = new CopyOnWriteArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -196,7 +197,30 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
logger.warn("releasing snapshot caused exception", ex);
}
});
phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep);

final StepListener<ReplicationResponse> deleteRetentionLeaseStep = new StepListener<>();
if (shard.indexSettings().isSoftDeleteEnabled()
&& shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) {
runUnderPrimaryPermit(() -> {
try {
// If the target previously had a copy of this shard then a file-based recovery might move its global
// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
// new one later on in the recovery.
shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), deleteRetentionLeaseStep);
} catch (RetentionLeaseNotFoundException e) {
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
deleteRetentionLeaseStep.onResponse(null);
}
}, shardId + " removing retention leaes for [" + request.targetAllocationId() + "]",
shard, cancellableThreads, logger);
} else {
deleteRetentionLeaseStep.onResponse(null);
}

deleteRetentionLeaseStep.whenComplete(ignored -> {
phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep);
}, onFailure);

} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.analysis.AbstractTokenFilterFactory;
import org.elasticsearch.index.analysis.TokenFilterFactory;
import org.elasticsearch.index.mapper.MapperParsingException;
Expand All @@ -70,6 +71,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockFSIndexStore;
Expand Down Expand Up @@ -127,8 +129,12 @@ public class IndexRecoveryIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockFSIndexStore.TestPlugin.class,
RecoverySettingsChunkSizePlugin.class, TestAnalysisPlugin.class);
return Arrays.asList(
MockTransportService.TestPlugin.class,
MockFSIndexStore.TestPlugin.class,
RecoverySettingsChunkSizePlugin.class,
TestAnalysisPlugin.class,
InternalSettingsPlugin.class);
}

@After
Expand Down Expand Up @@ -1015,4 +1021,45 @@ public TokenStream create(TokenStream tokenStream) {
});
}
}

public void testRepeatedRecovery() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);

// Ensures that you can remove a replica and then add it back again without any ill effects, even if it's allocated back to the
// node that held it previously, in case that node hasn't completely cleared it up.

final String indexName = "test-index";
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 6))
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "200ms")
.build());
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(0, 10))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));

assertThat(client().admin().indices().prepareFlush(indexName).get().getFailedShards(), equalTo(0));

assertBusy(() -> {
final ShardStats[] shardsStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards();
for (final ShardStats shardStats : shardsStats) {
final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo();
assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().stream()
.allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1));
}
});

logger.info("--> remove replicas");
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put("index.number_of_replicas", 0)));
ensureGreen(indexName);

logger.info("--> index more documents");
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(0, 10))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));

logger.info("--> add replicas again");
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put("index.number_of_replicas", 1)));
ensureGreen(indexName);
}
}