diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index 49b8f9d3483f2..9a9c7bd0ee869 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -137,11 +137,13 @@ public synchronized Tuple getRetentionPolicy() { .orElse(Long.MAX_VALUE); /* * The minimum sequence number to retain is the minimum of the minimum based on retention leases, and the number of operations - * below the global checkpoint to retain (index.soft_deletes.retention.operations). + * below the global checkpoint to retain (index.soft_deletes.retention.operations). The additional increments on the global + * checkpoint and the local checkpoint of the safe commit are due to the fact that we want to retain all operations above + * those checkpoints. */ final long minSeqNoForQueryingChanges = - Math.min(globalCheckpointSupplier.getAsLong() - retentionOperations, minimumRetainingSequenceNumber); - final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, localCheckpointOfSafeCommit) + 1; + Math.min(1 + globalCheckpointSupplier.getAsLong() - retentionOperations, minimumRetainingSequenceNumber); + final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, 1 + localCheckpointOfSafeCommit); /* * We take the maximum as minSeqNoToRetain can go backward as the retention operations value can be changed in settings, or from diff --git a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java index 8257aa99d0486..e4da636deaf6d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java @@ -32,6 +32,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -98,7 +99,9 @@ public void testSoftDeletesRetentionLock() { .min() .orElse(Long.MAX_VALUE); long retainedSeqNo = - Math.min(safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, globalCheckpoint.get() - retainedOps)) + 1; + Math.min( + 1 + safeCommitCheckpoint, + Math.min(minimumRetainingSequenceNumber, 1 + globalCheckpoint.get() - retainedOps)); minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo); } assertThat(retentionQuery.getNumDims(), equalTo(1)); @@ -113,7 +116,7 @@ public void testSoftDeletesRetentionLock() { .min() .orElse(Long.MAX_VALUE); long retainedSeqNo = - Math.min(safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, globalCheckpoint.get() - retainedOps)) + 1; + Math.min(1 + safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, 1 + globalCheckpoint.get() - retainedOps)); minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo); assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo)); } @@ -141,4 +144,87 @@ public void testAlwaysFetchLatestRetentionLeases() { assertThat(policy.getRetentionPolicy().v2().leases(), contains(leases.toArray(new RetentionLease[0]))); } } + + public void testWhenGlobalCheckpointDictatesThePolicy() { + final int retentionOperations = randomIntBetween(0, 1024); + final AtomicLong globalCheckpoint = new AtomicLong(randomLongBetween(0, Long.MAX_VALUE - 2)); + final Collection leases = new ArrayList<>(); + final int numberOfLeases = randomIntBetween(0, 16); + for (int i = 0; i < numberOfLeases; i++) { + // setup leases where the minimum retained sequence number is more than the policy dictated by the global checkpoint + leases.add(new RetentionLease( + Integer.toString(i), + randomLongBetween(1 + globalCheckpoint.get() - retentionOperations + 1, Long.MAX_VALUE), + randomNonNegativeLong(), "test")); + } + final long primaryTerm = randomNonNegativeLong(); + final long version = randomNonNegativeLong(); + final Supplier leasesSupplier = + () -> new RetentionLeases( + primaryTerm, + version, + Collections.unmodifiableCollection(new ArrayList<>(leases))); + final SoftDeletesPolicy policy = + new SoftDeletesPolicy(globalCheckpoint::get, 0, retentionOperations, leasesSupplier); + // set the local checkpoint of the safe commit to more than the policy dicated by the global checkpoint + final long localCheckpointOfSafeCommit = randomLongBetween(1 + globalCheckpoint.get() - retentionOperations + 1, Long.MAX_VALUE); + policy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); + assertThat(policy.getMinRetainedSeqNo(), equalTo(1 + globalCheckpoint.get() - retentionOperations)); + } + + public void testWhenLocalCheckpointOfSafeCommitDictatesThePolicy() { + final int retentionOperations = randomIntBetween(0, 1024); + final long localCheckpointOfSafeCommit = randomLongBetween(-1, Long.MAX_VALUE - retentionOperations - 1); + final AtomicLong globalCheckpoint = + new AtomicLong(randomLongBetween(Math.max(0, localCheckpointOfSafeCommit + retentionOperations), Long.MAX_VALUE - 1)); + final Collection leases = new ArrayList<>(); + final int numberOfLeases = randomIntBetween(0, 16); + for (int i = 0; i < numberOfLeases; i++) { + leases.add(new RetentionLease( + Integer.toString(i), + randomLongBetween(1 + localCheckpointOfSafeCommit + 1, Long.MAX_VALUE), // leases are for more than the local checkpoint + randomNonNegativeLong(), "test")); + } + final long primaryTerm = randomNonNegativeLong(); + final long version = randomNonNegativeLong(); + final Supplier leasesSupplier = + () -> new RetentionLeases( + primaryTerm, + version, + Collections.unmodifiableCollection(new ArrayList<>(leases))); + + final SoftDeletesPolicy policy = + new SoftDeletesPolicy(globalCheckpoint::get, 0, retentionOperations, leasesSupplier); + policy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); + assertThat(policy.getMinRetainedSeqNo(), equalTo(1 + localCheckpointOfSafeCommit)); + } + + public void testWhenRetentionLeasesDictateThePolicy() { + final int retentionOperations = randomIntBetween(0, 1024); + final Collection leases = new ArrayList<>(); + final int numberOfLeases = randomIntBetween(1, 16); + for (int i = 0; i < numberOfLeases; i++) { + leases.add(new RetentionLease( + Integer.toString(i), + randomLongBetween(0, Long.MAX_VALUE - retentionOperations - 1), + randomNonNegativeLong(), "test")); + } + final OptionalLong minimumRetainingSequenceNumber = leases.stream().mapToLong(RetentionLease::retainingSequenceNumber).min(); + assert minimumRetainingSequenceNumber.isPresent() : leases; + final long localCheckpointOfSafeCommit = randomLongBetween(minimumRetainingSequenceNumber.getAsLong(), Long.MAX_VALUE - 1); + final AtomicLong globalCheckpoint = + new AtomicLong(randomLongBetween(minimumRetainingSequenceNumber.getAsLong() + retentionOperations, Long.MAX_VALUE - 1)); + final long primaryTerm = randomNonNegativeLong(); + final long version = randomNonNegativeLong(); + final Supplier leasesSupplier = + () -> new RetentionLeases( + primaryTerm, + version, + Collections.unmodifiableCollection(new ArrayList<>(leases))); + final SoftDeletesPolicy policy = + new SoftDeletesPolicy(globalCheckpoint::get, 0, retentionOperations, leasesSupplier); + policy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); + assertThat(policy.getMinRetainedSeqNo(), equalTo(minimumRetainingSequenceNumber.getAsLong())); + } + }