Skip to content

Commit

Permalink
policy with retaining seqno
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Dec 19, 2019
1 parent 84d9c01 commit bda80cd
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
Expand Down Expand Up @@ -189,11 +190,7 @@ public InternalEngine(EngineConfig engineConfig) {
final EngineConfig engineConfig,
final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
super(engineConfig);
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
);
final TranslogDeletionPolicy translogDeletionPolicy = newTranslogDeletionPolicy(engineConfig);
store.incRef();
IndexWriter writer = null;
Translog translog = null;
Expand Down Expand Up @@ -488,6 +485,22 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery
translog.trimUnreferencedReaders();
}

private static TranslogDeletionPolicy newTranslogDeletionPolicy(EngineConfig config) {
final LongSupplier retainingSeqNo;
if (config.getIndexSettings().isSoftDeleteEnabled()) {
retainingSeqNo = () -> Long.MAX_VALUE;
} else {
retainingSeqNo = () -> config.retentionLeasesSupplier().get().leases().stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
.max().orElse(Long.MAX_VALUE);
};
return new TranslogDeletionPolicy(
config.getIndexSettings().getTranslogRetentionSize().getBytes(),
config.getIndexSettings().getTranslogRetentionAge().getMillis(),
config.getIndexSettings().getTranslogRetentionTotalFiles(),
retainingSeqNo);
}

private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;

public class TranslogDeletionPolicy {

Expand Down Expand Up @@ -65,10 +66,14 @@ public void assertNoOpenTranslogRefs() {

private int retentionTotalFiles;

public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) {
private final LongSupplier retainingSeqNoSupplier;

public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles,
LongSupplier retainingSeqNoSupplier) {
this.retentionSizeInBytes = retentionSizeInBytes;
this.retentionAgeInMillis = retentionAgeInMillis;
this.retentionTotalFiles = retentionTotalFiles;
this.retainingSeqNoSupplier = retainingSeqNoSupplier;
if (Assertions.ENABLED) {
openTranslogRef = new ConcurrentHashMap<>();
} else {
Expand Down Expand Up @@ -172,7 +177,9 @@ synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogW
minByAgeAndSize = Math.max(minByAge, minBySize);
}
long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles);
return Math.min(Math.max(minByAgeAndSize, minByNumFiles), Math.min(minByLocks, minTranslogGenerationForRecovery));
long minByPolicy = Math.max(minByAgeAndSize, minByNumFiles);
long minByRetainingSeqNo = getMinTranslogGenByRetainingSeqNo(readers, writer, retainingSeqNoSupplier.getAsLong());
return Math.min(Math.max(minByPolicy, minByRetainingSeqNo), Math.min(minByLocks, minTranslogGenerationForRecovery));
}

static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
Expand Down Expand Up @@ -214,6 +221,15 @@ static long getMinTranslogGenByTotalFiles(List<TranslogReader> readers, Translog
return minGen;
}

static long getMinTranslogGenByRetainingSeqNo(List<TranslogReader> readers, TranslogWriter writer, long seqNo) {
for (TranslogReader reader : readers) {
if (reader.getCheckpoint().maxEffectiveSeqNo() >= seqNo) {
return reader.generation;
}
}
return writer.generation;
}

protected long currentTime() {
return System.currentTimeMillis();
}
Expand Down

0 comments on commit bda80cd

Please sign in to comment.