Skip to content

Commit

Permalink
Fix excessive increments in soft delete policy (#38813)
Browse files Browse the repository at this point in the history
In this case, we were incrementing the policy too much. This means on
every iteration we actually keep increasing the minimum retained
sequence number, even with leases in place. It was a bug from when the
soft deletes policy had retention leases incorporated into it. This
commit fixes this bug by ensuring we only increment in the proper
places, and adds careful tests for the various situations.
  • Loading branch information
jasontedor committed Feb 13, 2019
1 parent acb8b43 commit 9631c1a
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,13 @@ public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
.orElse(Long.MAX_VALUE);
/*
* The minimum sequence number to retain is the minimum of the minimum based on retention leases, and the number of operations
* below the global checkpoint to retain (index.soft_deletes.retention.operations).
* below the global checkpoint to retain (index.soft_deletes.retention.operations). The additional increments on the global
* checkpoint and the local checkpoint of the safe commit are due to the fact that we want to retain all operations above
* those checkpoints.
*/
final long minSeqNoForQueryingChanges =
Math.min(globalCheckpointSupplier.getAsLong() - retentionOperations, minimumRetainingSequenceNumber);
final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, localCheckpointOfSafeCommit) + 1;
Math.min(1 + globalCheckpointSupplier.getAsLong() - retentionOperations, minimumRetainingSequenceNumber);
final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, 1 + localCheckpointOfSafeCommit);

/*
* We take the maximum as minSeqNoToRetain can go backward as the retention operations value can be changed in settings, or from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

Expand Down Expand Up @@ -98,7 +99,9 @@ public void testSoftDeletesRetentionLock() {
.min()
.orElse(Long.MAX_VALUE);
long retainedSeqNo =
Math.min(safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, globalCheckpoint.get() - retainedOps)) + 1;
Math.min(
1 + safeCommitCheckpoint,
Math.min(minimumRetainingSequenceNumber, 1 + globalCheckpoint.get() - retainedOps));
minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo);
}
assertThat(retentionQuery.getNumDims(), equalTo(1));
Expand All @@ -113,7 +116,7 @@ public void testSoftDeletesRetentionLock() {
.min()
.orElse(Long.MAX_VALUE);
long retainedSeqNo =
Math.min(safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, globalCheckpoint.get() - retainedOps)) + 1;
Math.min(1 + safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, 1 + globalCheckpoint.get() - retainedOps));
minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo);
assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo));
}
Expand Down Expand Up @@ -141,4 +144,87 @@ public void testAlwaysFetchLatestRetentionLeases() {
assertThat(policy.getRetentionPolicy().v2().leases(), contains(leases.toArray(new RetentionLease[0])));
}
}

public void testWhenGlobalCheckpointDictatesThePolicy() {
final int retentionOperations = randomIntBetween(0, 1024);
final AtomicLong globalCheckpoint = new AtomicLong(randomLongBetween(0, Long.MAX_VALUE - 2));
final Collection<RetentionLease> leases = new ArrayList<>();
final int numberOfLeases = randomIntBetween(0, 16);
for (int i = 0; i < numberOfLeases; i++) {
// setup leases where the minimum retained sequence number is more than the policy dictated by the global checkpoint
leases.add(new RetentionLease(
Integer.toString(i),
randomLongBetween(1 + globalCheckpoint.get() - retentionOperations + 1, Long.MAX_VALUE),
randomNonNegativeLong(), "test"));
}
final long primaryTerm = randomNonNegativeLong();
final long version = randomNonNegativeLong();
final Supplier<RetentionLeases> leasesSupplier =
() -> new RetentionLeases(
primaryTerm,
version,
Collections.unmodifiableCollection(new ArrayList<>(leases)));
final SoftDeletesPolicy policy =
new SoftDeletesPolicy(globalCheckpoint::get, 0, retentionOperations, leasesSupplier);
// set the local checkpoint of the safe commit to more than the policy dicated by the global checkpoint
final long localCheckpointOfSafeCommit = randomLongBetween(1 + globalCheckpoint.get() - retentionOperations + 1, Long.MAX_VALUE);
policy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
assertThat(policy.getMinRetainedSeqNo(), equalTo(1 + globalCheckpoint.get() - retentionOperations));
}

public void testWhenLocalCheckpointOfSafeCommitDictatesThePolicy() {
final int retentionOperations = randomIntBetween(0, 1024);
final long localCheckpointOfSafeCommit = randomLongBetween(-1, Long.MAX_VALUE - retentionOperations - 1);
final AtomicLong globalCheckpoint =
new AtomicLong(randomLongBetween(Math.max(0, localCheckpointOfSafeCommit + retentionOperations), Long.MAX_VALUE - 1));
final Collection<RetentionLease> leases = new ArrayList<>();
final int numberOfLeases = randomIntBetween(0, 16);
for (int i = 0; i < numberOfLeases; i++) {
leases.add(new RetentionLease(
Integer.toString(i),
randomLongBetween(1 + localCheckpointOfSafeCommit + 1, Long.MAX_VALUE), // leases are for more than the local checkpoint
randomNonNegativeLong(), "test"));
}
final long primaryTerm = randomNonNegativeLong();
final long version = randomNonNegativeLong();
final Supplier<RetentionLeases> leasesSupplier =
() -> new RetentionLeases(
primaryTerm,
version,
Collections.unmodifiableCollection(new ArrayList<>(leases)));

final SoftDeletesPolicy policy =
new SoftDeletesPolicy(globalCheckpoint::get, 0, retentionOperations, leasesSupplier);
policy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
assertThat(policy.getMinRetainedSeqNo(), equalTo(1 + localCheckpointOfSafeCommit));
}

public void testWhenRetentionLeasesDictateThePolicy() {
final int retentionOperations = randomIntBetween(0, 1024);
final Collection<RetentionLease> leases = new ArrayList<>();
final int numberOfLeases = randomIntBetween(1, 16);
for (int i = 0; i < numberOfLeases; i++) {
leases.add(new RetentionLease(
Integer.toString(i),
randomLongBetween(0, Long.MAX_VALUE - retentionOperations - 1),
randomNonNegativeLong(), "test"));
}
final OptionalLong minimumRetainingSequenceNumber = leases.stream().mapToLong(RetentionLease::retainingSequenceNumber).min();
assert minimumRetainingSequenceNumber.isPresent() : leases;
final long localCheckpointOfSafeCommit = randomLongBetween(minimumRetainingSequenceNumber.getAsLong(), Long.MAX_VALUE - 1);
final AtomicLong globalCheckpoint =
new AtomicLong(randomLongBetween(minimumRetainingSequenceNumber.getAsLong() + retentionOperations, Long.MAX_VALUE - 1));
final long primaryTerm = randomNonNegativeLong();
final long version = randomNonNegativeLong();
final Supplier<RetentionLeases> leasesSupplier =
() -> new RetentionLeases(
primaryTerm,
version,
Collections.unmodifiableCollection(new ArrayList<>(leases)));
final SoftDeletesPolicy policy =
new SoftDeletesPolicy(globalCheckpoint::get, 0, retentionOperations, leasesSupplier);
policy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
assertThat(policy.getMinRetainedSeqNo(), equalTo(minimumRetainingSequenceNumber.getAsLong()));
}

}

0 comments on commit 9631c1a

Please sign in to comment.