Skip to content

Commit

Permalink
Throw back replica local checkpoint on new primary
Browse files Browse the repository at this point in the history
This commit causes a replica to throwback its local checkpoint to the
global checkpoint when learning of a new primary through a replica
operation.

Relates #25452
  • Loading branch information
jasontedor committed Jul 5, 2017
1 parent 7c637a0 commit 7dcd81b
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 20 deletions.
Expand Up @@ -121,6 +121,19 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) {
}
}

/**
* Resets the checkpoint to the specified value.
*
* @param checkpoint the local checkpoint to reset this tracker to
*/
synchronized void resetCheckpoint(final long checkpoint) {
assert checkpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO;
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();
firstProcessedSeqNo = checkpoint + 1;
this.checkpoint = checkpoint;
}

/**
* The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}.
*
Expand Down
Expand Up @@ -106,6 +106,15 @@ public void markSeqNoAsCompleted(final long seqNo) {
localCheckpointTracker.markSeqNoAsCompleted(seqNo);
}

/**
* Resets the local checkpoint to the specified value.
*
* @param localCheckpoint the local checkpoint to reset to
*/
public void resetLocalCheckpoint(final long localCheckpoint) {
localCheckpointTracker.resetCheckpoint(localCheckpoint);
}

/**
* The current sequence number stats.
*
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -2058,6 +2058,19 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";
primaryTerm = operationPrimaryTerm;
updateGlobalCheckpointOnReplica(globalCheckpoint);
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long localCheckpoint;
if (currentGlobalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
} else {
localCheckpoint = currentGlobalCheckpoint;
}
logger.trace(
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
operationPrimaryTerm,
getLocalCheckpoint(),
localCheckpoint);
getEngine().seqNoService().resetLocalCheckpoint(localCheckpoint);
getEngine().getTranslog().rollGeneration();
});
globalCheckpointUpdated = true;
Expand Down
Expand Up @@ -38,6 +38,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf;

Expand Down Expand Up @@ -236,4 +237,23 @@ public void testWaitForOpsToComplete() throws BrokenBarrierException, Interrupte

thread.join();
}

public void testResetCheckpoint() {
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
int maxSeqNo = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
for (int i = 0; i < operations; i++) {
if (!rarely()) {
tracker.markSeqNoAsCompleted(i);
maxSeqNo = i;
}
}

final int localCheckpoint =
randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint()));
tracker.resetCheckpoint(localCheckpoint);
assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo));
assertThat(tracker.processedSeqNo, empty());
assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
}
}
131 changes: 111 additions & 20 deletions core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
Expand Up @@ -80,6 +80,7 @@
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -142,7 +143,6 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

/**
Expand Down Expand Up @@ -405,26 +405,10 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {

// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
boolean gap = false;
for (int i = 0; i < operations; i++) {
if (!rarely()) {
final String id = Integer.toString(i);
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
getMappingUpdater(indexShard, sourceToParse.type()));
max = i;
} else {
gap = true;
}
}
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));

final int maxSeqNo = max;
if (gap) {
assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo)));
}
final int maxSeqNo = result.maxSeqNo;
final boolean gap = result.gap;

// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
Expand Down Expand Up @@ -626,6 +610,12 @@ public void onFailure(Exception e) {
}
newGlobalCheckPoint = randomIntBetween((int) indexShard.getGlobalCheckpoint(), (int) localCheckPoint);
}
final long expectedLocalCheckpoint;
if (newGlobalCheckPoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
expectedLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
} else {
expectedLocalCheckpoint = newGlobalCheckPoint;
}
// but you can not increment with a new primary term until the operations on the older primary term complete
final Thread thread = new Thread(() -> {
try {
Expand All @@ -637,6 +627,7 @@ public void onFailure(Exception e) {
@Override
public void onResponse(Releasable releasable) {
assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
onResponse.set(true);
releasable.close();
Expand Down Expand Up @@ -697,6 +688,7 @@ private void finish() {
assertTrue(onResponse.get());
assertNull(onFailure.get());
assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1));
assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
}
}
Expand All @@ -707,6 +699,56 @@ private void finish() {
closeShards(indexShard);
}

public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);

// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));

final long globalCheckpointOnReplica =
randomIntBetween(
Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
Math.toIntExact(indexShard.getLocalCheckpoint()));
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica);

final int globalCheckpoint =
randomIntBetween(
Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
Math.toIntExact(indexShard.getLocalCheckpoint()));
final CountDownLatch latch = new CountDownLatch(1);
indexShard.acquireReplicaOperationPermit(
indexShard.primaryTerm + 1,
globalCheckpoint,
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
releasable.close();
latch.countDown();
}

@Override
public void onFailure(final Exception e) {

}
},
ThreadPool.Names.SAME);

latch.await();
if (globalCheckpointOnReplica == SequenceNumbersService.UNASSIGNED_SEQ_NO
&& globalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
} else {
assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica)));
}

// ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()));
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint));

closeShards(indexShard);
}

public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException {
final IndexShard indexShard = newStartedShard(false);

Expand Down Expand Up @@ -1966,6 +2008,55 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept
closeShards(newShard);
}

class Result {
private final int localCheckpoint;
private final int maxSeqNo;
private final boolean gap;

Result(final int localCheckpoint, final int maxSeqNo, final boolean gap) {
this.localCheckpoint = localCheckpoint;
this.maxSeqNo = maxSeqNo;
this.gap = gap;
}
}

/**
* Index on the specified shard while introducing sequence number gaps.
*
* @param indexShard the shard
* @param operations the number of operations
* @param offset the starting sequence number
* @return a pair of the maximum sequence number and whether or not a gap was introduced
* @throws IOException if an I/O exception occurs while indexing on the shard
*/
private Result indexOnReplicaWithGaps(
final IndexShard indexShard,
final int operations,
final int offset) throws IOException {
int localCheckpoint = offset;
int max = offset;
boolean gap = false;
for (int i = offset + 1; i < operations; i++) {
if (!rarely()) {
final String id = Integer.toString(i);
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
getMappingUpdater(indexShard, sourceToParse.type()));
if (!gap && i == localCheckpoint + 1) {
localCheckpoint++;
}
max = i;
} else {
gap = true;
}
}
assert localCheckpoint == indexShard.getLocalCheckpoint();
assert !gap || (localCheckpoint != max);
return new Result(localCheckpoint, max, gap);
}

/** A dummy repository for testing which just needs restore overridden */
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
private final String indexName;
Expand Down

0 comments on commit 7dcd81b

Please sign in to comment.