Skip to content

Commit

Permalink
Soft-deletes policy should always fetch latest leases (#37940)
Browse files Browse the repository at this point in the history
If a new retention lease is added while a primary's soft-deletes policy
is locked for peer-recovery, that lease won't be baked into the Lucene
commit.

Relates #37165
Relates #37375
  • Loading branch information
dnhatn committed Jan 31, 2019
1 parent 4cdc4bd commit aebf197
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
Expand Up @@ -46,7 +46,6 @@ final class SoftDeletesPolicy {
private long retentionOperations;
// The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
private long minRetainedSeqNo;
private Collection<RetentionLease> retentionLeases;
// provides the retention leases used to calculate the minimum sequence number to retain
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;

Expand All @@ -59,7 +58,6 @@ final class SoftDeletesPolicy {
this.retentionOperations = retentionOperations;
this.minRetainedSeqNo = minRetainedSeqNo;
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
retentionLeases = retentionLeasesSupplier.get();
this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
this.retentionLockCount = 0;
}
Expand Down Expand Up @@ -113,6 +111,11 @@ synchronized long getMinRetainedSeqNo() {
}

public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy() {
/*
* When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is
* locked for peer recovery.
*/
final Collection<RetentionLease> retentionLeases = retentionLeasesSupplier.get();
// do not advance if the retention lock is held
if (retentionLockCount == 0) {
/*
Expand All @@ -126,7 +129,6 @@ public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy()
*/

// calculate the minimum sequence number to retain based on retention leases
retentionLeases = retentionLeasesSupplier.get();
final long minimumRetainingSequenceNumber = retentionLeases
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
Expand Down
Expand Up @@ -24,18 +24,21 @@
import org.apache.lucene.search.Query;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

Expand All @@ -46,7 +49,7 @@ public class SoftDeletesPolicyTests extends ESTestCase {
*/
public void testSoftDeletesRetentionLock() {
long retainedOps = between(0, 10000);
AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
final AtomicLong[] retainingSequenceNumbers = new AtomicLong[randomIntBetween(0, 8)];
for (int i = 0; i < retainingSequenceNumbers.length; i++) {
retainingSequenceNumbers[i] = new AtomicLong();
Expand Down Expand Up @@ -116,4 +119,23 @@ public void testSoftDeletesRetentionLock() {
assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo));
}

public void testAlwaysFetchLatestRetentionLeases() {
final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
final Collection<RetentionLease> leases = new ArrayList<>();
final int numLeases = randomIntBetween(0, 10);
for (int i = 0; i < numLeases; i++) {
leases.add(new RetentionLease(Integer.toString(i), randomLongBetween(0, 1000), randomNonNegativeLong(), "test"));
}
final Supplier<Collection<RetentionLease>> leasesSupplier = () -> Collections.unmodifiableCollection(new ArrayList<>(leases));
final SoftDeletesPolicy policy =
new SoftDeletesPolicy(globalCheckpoint::get, randomIntBetween(1, 1000), randomIntBetween(0, 1000), leasesSupplier);
if (randomBoolean()) {
policy.acquireRetentionLock();
}
if (numLeases == 0) {
assertThat(policy.getRetentionPolicy().v2(), empty());
} else {
assertThat(policy.getRetentionPolicy().v2(), contains(leases.toArray(new RetentionLease[0])));
}
}
}
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -71,8 +72,11 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception {
final String source = randomAlphaOfLength(8);
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
// simulate a peer-recovery which locks the soft-deletes policy on the primary.
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
latch.await();
retentionLock.close();

// check retention leases have been committed on the primary
final Collection<RetentionLease> primaryCommittedRetentionLeases = RetentionLease.decodeRetentionLeases(
Expand Down

0 comments on commit aebf197

Please sign in to comment.