Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not wait for advancement of checkpoint in recovery #39006

Merged
merged 7 commits into from Feb 25, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -546,7 +546,6 @@ void phase2(

final AtomicInteger skippedOps = new AtomicInteger();
final AtomicInteger totalSentOps = new AtomicInteger();
final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1);
final AtomicInteger lastBatchCount = new AtomicInteger(); // used to estimate the count of the subsequent batch.
final CheckedSupplier<List<Translog.Operation>, IOException> readNextBatch = () -> {
// We need to synchronized Snapshot#next() because it's called by different threads through sendBatch.
Expand All @@ -568,7 +567,6 @@ void phase2(
ops.add(operation);
batchSizeInBytes += operation.estimateSize();
totalSentOps.incrementAndGet();
requiredOpsTracker.markSeqNoAsCompleted(seqNo);

// check if this request is past bytes threshold, and if so, send it off
if (batchSizeInBytes >= chunkSizeInBytes) {
Expand All @@ -586,10 +584,10 @@ void phase2(
assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get()
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get());
if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
if (targetLocalCheckpoint < endingSeqNo) {
throw new IllegalStateException("translog replay failed to cover required sequence numbers" +
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
+ (requiredOpsTracker.getCheckpoint() + 1) + "]");
+ (targetLocalCheckpoint + 1) + "]");
}
stopWatch.stop();
final TimeValue tookTime = stopWatch.totalTime();
Expand Down
Expand Up @@ -62,6 +62,7 @@
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SeqNoStats;
Expand Down Expand Up @@ -235,18 +236,19 @@ public void testSendSnapshotSendsOps() throws IOException {
final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1);

final List<Translog.Operation> shippedOps = new ArrayList<>();
final AtomicLong checkpointOnTarget = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final AtomicReference<LocalCheckpointTracker> checkpointTrackerHolder = new AtomicReference<>();
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp, long msu,
RetentionLeases retentionLeases, ActionListener<Long> listener) {
shippedOps.addAll(operations);
checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE));
maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get()));
operations.forEach(op -> checkpointTrackerHolder.get().markSeqNoAsCompleted(op.seqNo()));
maybeExecuteAsync(() -> listener.onResponse(checkpointTrackerHolder.get().getCheckpoint()));
}
};
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
checkpointTrackerHolder.set(new LocalCheckpointTracker(endingSeqNo, requiredStartingSeqNo - 1));
handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future);
final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1);
Expand All @@ -257,14 +259,15 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
for (int i = 0; i < shippedOps.size(); i++) {
assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs)));
}
assertThat(result.targetLocalCheckpoint, equalTo(checkpointOnTarget.get()));
assertThat(result.targetLocalCheckpoint, equalTo(checkpointTrackerHolder.get().getCheckpoint()));
if (endingSeqNo >= requiredStartingSeqNo + 1) {
// check that missing ops blows up
List<Translog.Operation> requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker
.filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList());
List<Translog.Operation> opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps);
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> failedFuture = new PlainActionFuture<>();
expectThrows(IllegalStateException.class, () -> {
checkpointTrackerHolder.set(new LocalCheckpointTracker(endingSeqNo, requiredStartingSeqNo - 1));
handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, opsToSkip),
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, failedFuture);
failedFuture.actionGet();
Expand Down
Expand Up @@ -267,9 +267,7 @@ public synchronized int startReplicas(int numOfReplicasToStart) throws IOExcepti
}

public void startPrimary() throws IOException {
final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId());
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null));
primary.recoverFromStore();
recoverPrimary(primary);
HashSet<String> activeIds = new HashSet<>();
activeIds.addAll(activeIds());
activeIds.add(primary.routingEntry().allocationId().getId());
Expand Down Expand Up @@ -302,6 +300,11 @@ assert shardRoutings().stream()
updateAllocationIDsOnPrimary();
}

protected synchronized void recoverPrimary(IndexShard primary) {
final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId());
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null));
primary.recoverFromStore();
}

public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException {
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
Expand Down