Skip to content

Commit

Permalink
Avoid unnecessary persistence of retention leases (#42299)
Browse files Browse the repository at this point in the history
Today we are persisting the retention leases at least every thirty
seconds by a scheduled background sync. This sync causes an fsync to
disk and when there are a large number of shards allocated to slow
disks, these fsyncs can pile up and can severely impact the system. This
commit addresses this by only persisting and fsyncing the retention
leases if they have changed since the last time that we persisted and
fsynced the retention leases.
  • Loading branch information
jasontedor committed May 21, 2019
1 parent ae5e171 commit 8e933f5
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 11 deletions.
Expand Up @@ -31,6 +31,7 @@
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
Expand Down Expand Up @@ -246,15 +247,16 @@ long findMaxStateId(final String prefix, Path... locations) throws IOException {
}

/**
* Tries to load the latest state from the given data-locations. It tries to load the latest state determined by
* the states version from one or more data directories and if none of the latest states can be loaded an exception
* is thrown to prevent accidentally loading a previous state and silently omitting the latest state.
* Tries to load the latest state from the given data-locations.
*
* @param logger a logger instance
* @param logger a logger instance.
* @param dataLocations the data-locations to try.
* @return the latest state or <code>null</code> if no state was found.
* @return tuple of the latest state and generation. (-1, null) if no state is found.
*/
public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations) throws IOException {
public Tuple<T, Long> loadLatestStateWithGeneration(
final Logger logger,
final NamedXContentRegistry namedXContentRegistry,
final Path... dataLocations) throws IOException {
List<PathAndStateId> files = new ArrayList<>();
long maxStateId = -1;
if (dataLocations != null) { // select all eligible files first
Expand Down Expand Up @@ -293,7 +295,7 @@ public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegis
try {
T state = read(namedXContentRegistry, pathAndStateId.file);
logger.trace("state id [{}] read from [{}]", pathAndStateId.id, pathAndStateId.file.getFileName());
return state;
return Tuple.tuple(state, pathAndStateId.id);
} catch (Exception e) {
exceptions.add(new IOException("failed to read " + pathAndStateId.toString(), e));
logger.debug(() -> new ParameterizedMessage(
Expand All @@ -309,6 +311,19 @@ public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegis
return null;
}

/**
* Tries to load the latest state from the given data-locations. It tries to load the latest state determined by
* the states version from one or more data directories and if none of the latest states can be loaded an exception
* is thrown to prevent accidentally loading a previous state and silently omitting the latest state.
*
* @param logger a logger instance
* @param dataLocations the data-locations to try.
* @return the latest state or <code>null</code> if no state was found.
*/
public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations) throws IOException {
return loadLatestStateWithGeneration(logger, namedXContentRegistry, dataLocations).v1();
}

/**
* Internal struct-like class that holds the parsed state id and the file
*/
Expand Down
Expand Up @@ -179,6 +179,18 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;

/**
* The primary term of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
* leases.
*/
private long persistedRetentionLeasesPrimaryTerm;

/**
* The version of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
* leases.
*/
private long persistedRetentionLeasesVersion;

/**
* Get all retention leases tracked on this shard.
*
Expand Down Expand Up @@ -341,7 +353,8 @@ public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
private final Object retentionLeasePersistenceLock = new Object();

/**
* Persists the current retention leases to their dedicated state file.
* Persists the current retention leases to their dedicated state file. If this version of the retention leases are already persisted
* then persistence is skipped.
*
* @param path the path to the directory containing the state file
* @throws IOException if an exception occurs writing the state file
Expand All @@ -350,10 +363,16 @@ public void persistRetentionLeases(final Path path) throws IOException {
synchronized (retentionLeasePersistenceLock) {
final RetentionLeases currentRetentionLeases;
synchronized (this) {
if (retentionLeases.supersedes(persistedRetentionLeasesPrimaryTerm, persistedRetentionLeasesVersion) == false) {
logger.trace("skipping persisting retention leases [{}], already persisted", retentionLeases);
return;
}
currentRetentionLeases = retentionLeases;
}
logger.trace("persisting retention leases [{}]", currentRetentionLeases);
RetentionLeases.FORMAT.write(currentRetentionLeases, path);
persistedRetentionLeasesPrimaryTerm = currentRetentionLeases.primaryTerm();
persistedRetentionLeasesVersion = currentRetentionLeases.version();
}
}

Expand Down
Expand Up @@ -69,13 +69,27 @@ public long version() {

/**
* Checks if this retention leases collection supersedes the specified retention leases collection. A retention leases collection
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher.
*
* @param that the retention leases collection to test against
* @return true if this retention leases collection supercedes the specified retention lease collection, otherwise false
*/
public boolean supersedes(final RetentionLeases that) {
return primaryTerm > that.primaryTerm || primaryTerm == that.primaryTerm && version > that.version;
boolean supersedes(final RetentionLeases that) {
return supersedes(that.primaryTerm, that.version);
}

/**
* Checks if this retention leases collection would supersede a retention leases collection with the specified primary term and version.
* A retention leases collection supersedes another retention leases collection if its primary term is higher, or if for equal primary
* terms its version is higher.
*
* @param primaryTerm the primary term
* @param version the version
* @return true if this retention leases collection would supercedes a retention lease collection with the specified primary term and
* version
*/
boolean supersedes(final long primaryTerm, final long version) {
return this.primaryTerm > primaryTerm || this.primaryTerm == primaryTerm && this.version > version;
}

private final Map<String, RetentionLease> leases;
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.IndexSettingsModule;
Expand Down Expand Up @@ -498,6 +499,49 @@ public void testLoadAndPersistRetentionLeases() throws IOException {
assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
}

public void testUnnecessaryPersistenceOfRetentionLeases() throws IOException {
final AllocationId allocationId = AllocationId.newInitializing();
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
primaryTerm,
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
for (int i = 0; i < length; i++) {
if (rarely() && primaryTerm < Long.MAX_VALUE) {
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
replicationTracker.setOperationPrimaryTerm(primaryTerm);
}
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addRetentionLease(
Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {}));
}

final Path path = createTempDir();
replicationTracker.persistRetentionLeases(path);

final Tuple<RetentionLeases, Long> retentionLeasesWithGeneration =
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);

replicationTracker.persistRetentionLeases(path);
final Tuple<RetentionLeases, Long> retentionLeasesWithGenerationAfterUnnecessaryPersistence =
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);

assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v1(), equalTo(retentionLeasesWithGeneration.v1()));
assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v2(), equalTo(retentionLeasesWithGeneration.v2()));
}

/**
* Test that we correctly synchronize writing the retention lease state file in {@link ReplicationTracker#persistRetentionLeases(Path)}.
* This test can fail without the synchronization block in that method.
Expand Down
Expand Up @@ -60,7 +60,9 @@ public void testSupersedesByPrimaryTerm() {
final long higherPrimaryTerm = randomLongBetween(lowerPrimaryTerm + 1, Long.MAX_VALUE);
final RetentionLeases right = new RetentionLeases(higherPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList());
assertTrue(right.supersedes(left));
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
assertFalse(left.supersedes(right));
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
}

public void testSupersedesByVersion() {
Expand All @@ -70,7 +72,9 @@ public void testSupersedesByVersion() {
final RetentionLeases left = new RetentionLeases(primaryTerm, lowerVersion, Collections.emptyList());
final RetentionLeases right = new RetentionLeases(primaryTerm, higherVersion, Collections.emptyList());
assertTrue(right.supersedes(left));
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
assertFalse(left.supersedes(right));
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
}

public void testRetentionLeasesRejectsDuplicates() {
Expand Down

0 comments on commit 8e933f5

Please sign in to comment.