Skip to content

Commit

Permalink
Revert translog changes introduced for CCR (#31947)
Browse files Browse the repository at this point in the history
We introduced these changes in #26708 because for CCR.
However, CCR now uses Lucene instead of translog.

This commit reverts these changes so that we can
minimize differences between the ccr and the master branch.

Relates ##26708
  • Loading branch information
dnhatn committed Jul 11, 2018
1 parent 815faf3 commit 4d18017
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ public enum SearcherScope {
* Creates a new translog snapshot from this engine for reading translog operations whose seq# in the provided range.
* The caller has to close the returned snapshot after finishing the reading.
*/
public abstract Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException;
public abstract Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException;

public abstract TranslogStats getTranslogStats();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public void restoreLocalCheckpointFromTranslog() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFrom(localCheckpoint + 1)) {
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() > localCheckpoint) {
Expand Down Expand Up @@ -480,8 +480,8 @@ public void syncTranslog() throws IOException {
}

@Override
public Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException {
return getTranslog().getSnapshotBetween(minSeqNo, maxSeqNo);
public Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
}

/**
Expand All @@ -493,7 +493,7 @@ public Translog.Snapshot readHistoryOperations(String source, MapperService mapp
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return newLuceneChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
} else {
return getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE);
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
}
}

Expand Down Expand Up @@ -2483,7 +2483,7 @@ public boolean hasCompleteOperationHistory(String source, MapperService mapperSe
} else {
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE)) {
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,7 @@ public Closeable acquireRetentionLockForPeerRecovery() {
*/
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
// TODO: Remove this method after primary-replica resync use soft-deletes
return getEngine().newTranslogSnapshotBetween(minSeqNo, Long.MAX_VALUE);
return getEngine().newSnapshotFromMinSeqNo(minSeqNo);
}

/**
Expand Down
34 changes: 10 additions & 24 deletions server/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,7 @@ public int totalOperationsByMinGen(long minGeneration) {
public int estimateTotalOperationsFromMinSeq(long minSeqNo) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return readersBetweenMinAndMaxSeqNo(minSeqNo, Long.MAX_VALUE)
.mapToInt(BaseTranslogReader::totalOperations)
.sum();
return readersAboveMinSeqNo(minSeqNo).mapToInt(BaseTranslogReader::totalOperations).sum();
}
}

Expand Down Expand Up @@ -602,23 +600,11 @@ public Operation readOperation(Location location) throws IOException {
return null;
}

/**
* Returns a snapshot with operations having a sequence number equal to or greater than <code>minSeqNo</code>.
*/
public Snapshot newSnapshotFrom(long minSeqNo) throws IOException {
return getSnapshotBetween(minSeqNo, Long.MAX_VALUE);
}

/**
* Returns a snapshot with operations having a sequence number equal to or greater than <code>minSeqNo</code> and
* equal to or lesser than <code>maxSeqNo</code>.
*/
public Snapshot getSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException {
public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
TranslogSnapshot[] snapshots = readersBetweenMinAndMaxSeqNo(minSeqNo, maxSeqNo)
.map(BaseTranslogReader::newSnapshot)
.toArray(TranslogSnapshot[]::new);
TranslogSnapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot)
.toArray(TranslogSnapshot[]::new);
return newMultiSnapshot(snapshots);
}
}
Expand All @@ -644,14 +630,14 @@ private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOExcepti
}
}

private Stream<? extends BaseTranslogReader> readersBetweenMinAndMaxSeqNo(long minSeqNo, long maxSeqNo) {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() ;

private Stream<? extends BaseTranslogReader> readersAboveMinSeqNo(long minSeqNo) {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() :
"callers of readersAboveMinSeqNo must hold a lock: readLock ["
+ readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]";
return Stream.concat(readers.stream(), Stream.of(current))
.filter(reader -> {
final Checkpoint checkpoint = reader.getCheckpoint();
return checkpoint.maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO ||
checkpoint.minSeqNo <= maxSeqNo && checkpoint.maxSeqNo >= minSeqNo;
final long maxSeqNo = reader.getCheckpoint().maxSeqNo;
return maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ protected void doRun() throws Exception {
// these are what we expect the snapshot to return (and potentially some more).
Set<Translog.Operation> expectedOps = new HashSet<>(writtenOps.keySet());
expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView);
try (Translog.Snapshot snapshot = translog.newSnapshotFrom(committedLocalCheckpointAtView + 1L)) {
try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(committedLocalCheckpointAtView + 1L)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
expectedOps.remove(op);
Expand Down Expand Up @@ -2814,7 +2814,7 @@ public void testMinSeqNoBasedAPI() throws IOException {
}
assertThat(translog.estimateTotalOperationsFromMinSeq(seqNo), equalTo(expectedSnapshotOps));
int readFromSnapshot = 0;
try (Translog.Snapshot snapshot = translog.newSnapshotFrom(seqNo)) {
try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(seqNo)) {
assertThat(snapshot.totalOperations(), equalTo(expectedSnapshotOps));
Translog.Operation op;
while ((op = snapshot.next()) != null) {
Expand All @@ -2831,38 +2831,6 @@ public void testMinSeqNoBasedAPI() throws IOException {
}
}

public void testGetSnapshotBetween() throws IOException {
final int numOperations = randomIntBetween(2, 8196);
final List<Integer> sequenceNumbers = IntStream.range(0, numOperations).boxed().collect(Collectors.toList());
Collections.shuffle(sequenceNumbers, random());
for (Integer sequenceNumber : sequenceNumbers) {
translog.add(new Translog.NoOp(sequenceNumber, 0, "test"));
if (rarely()) {
translog.rollGeneration();
}
}
translog.rollGeneration();

final int iters = randomIntBetween(8, 32);
for (int iter = 0; iter < iters; iter++) {
int min = randomIntBetween(0, numOperations - 1);
int max = randomIntBetween(min, numOperations);
try (Translog.Snapshot snapshot = translog.getSnapshotBetween(min, max)) {
final List<Translog.Operation> operations = new ArrayList<>();
for (Translog.Operation operation = snapshot.next(); operation != null; operation = snapshot.next()) {
if (operation.seqNo() >= min && operation.seqNo() <= max) {
operations.add(operation);
}
}
operations.sort(Comparator.comparingLong(Translog.Operation::seqNo));
Iterator<Translog.Operation> iterator = operations.iterator();
for (long expectedSeqNo = min; expectedSeqNo < max; expectedSeqNo++) {
assertThat(iterator.next().seqNo(), equalTo(expectedSeqNo));
}
}
}
}

public void testSimpleCommit() throws IOException {
final int operations = randomIntBetween(1, 4096);
long seqNo = 0;
Expand Down

0 comments on commit 4d18017

Please sign in to comment.