Skip to content

Commit

Permalink
Special case
Browse files Browse the repository at this point in the history
  • Loading branch information
jasontedor committed Jul 4, 2017
1 parent 5640e2a commit d1e0ec2
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
Expand Up @@ -127,6 +127,7 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) {
* @param checkpoint the local checkpoint to reset this tracker to
*/
synchronized void resetCheckpoint(final long checkpoint) {
assert checkpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO;
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();
firstProcessedSeqNo = checkpoint + 1;
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -2058,12 +2058,19 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";
primaryTerm = operationPrimaryTerm;
updateGlobalCheckpointOnReplica(globalCheckpoint);
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long localCheckpoint;
if (currentGlobalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
} else {
localCheckpoint = currentGlobalCheckpoint;
}
logger.trace(
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
operationPrimaryTerm,
getLocalCheckpoint(),
getGlobalCheckpoint());
getEngine().seqNoService().resetLocalCheckpoint(getGlobalCheckpoint());
localCheckpoint);
getEngine().seqNoService().resetLocalCheckpoint(localCheckpoint);
getEngine().getTranslog().rollGeneration();
});
globalCheckpointUpdated = true;
Expand Down
Expand Up @@ -80,6 +80,7 @@
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -699,15 +700,15 @@ public void testThrowBackLocalCheckpointOnReplica() throws IOException, Interrup
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));

final long globalCheckpointOnReplica =
final long globalCheckpointOnReplica = SequenceNumbersService.UNASSIGNED_SEQ_NO;
randomIntBetween(
Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED),
Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
Math.toIntExact(indexShard.getLocalCheckpoint()));
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica);

final int globalCheckpoint =
randomIntBetween(
Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED),
Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
Math.toIntExact(indexShard.getLocalCheckpoint()));
final CountDownLatch latch = new CountDownLatch(1);
indexShard.acquireReplicaOperationPermit(
Expand All @@ -727,7 +728,12 @@ public void onFailure(Exception e) {
ThreadPool.Names.SAME);

latch.await();
assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica)));
if (globalCheckpointOnReplica == SequenceNumbersService.UNASSIGNED_SEQ_NO
&& globalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
} else {
assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica)));
}

// ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()));
Expand Down

0 comments on commit d1e0ec2

Please sign in to comment.