Skip to content

Commit

Permalink
Introduce translog size and age based retention policies (#25147)
Browse files Browse the repository at this point in the history
This PR extends the TranslogDeletionPolicy to allow keeping the translog files longer than what is needed for recovery from lucene. Specifically, we allow specifying the total size of the files and their maximum age (i.e., keep up to 512MB but no longer than 12 hours). This will allow making ops based recoveries more common. 

Note that the default size and age still set to 0, maintaining current behavior. This is needed as the other components in the system are not yet ready for a longer translog retention. I will adapt those in follow up PRs.

Relates to #10708
  • Loading branch information
bleskes committed Jun 16, 2017
1 parent 50db8cb commit 9ddea53
Show file tree
Hide file tree
Showing 12 changed files with 418 additions and 52 deletions.
Expand Up @@ -127,6 +127,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
43 changes: 43 additions & 0 deletions core/src/main/java/org/elasticsearch/index/IndexSettings.java
Expand Up @@ -37,6 +37,7 @@

import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -111,6 +112,24 @@ public final class IndexSettings {
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);

/**
* Controls how long translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. A longer retention policy is useful to increase
* the chance of ops based recoveries.
**/
public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueMillis(-1), TimeValue.timeValueMillis(-1), Property.Dynamic,
Property.IndexScope);

/**
* Controls how many translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. Keeping more files is useful to increase
* the chance of ops based recoveries.
**/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(-1, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);

/**
* The maximum size of a translog generation. This is independent of the maximum size of
* translog operations that have not been flushed.
Expand Down Expand Up @@ -168,6 +187,8 @@ public final class IndexSettings {
private final TimeValue syncInterval;
private volatile TimeValue refreshInterval;
private volatile ByteSizeValue flushThresholdSize;
private volatile TimeValue translogRetentionAge;
private volatile ByteSizeValue translogRetentionSize;
private volatile ByteSizeValue generationThresholdSize;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final MergePolicyConfig mergePolicyConfig;
Expand Down Expand Up @@ -265,6 +286,8 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
translogRetentionAge = scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING);
translogRetentionSize = scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
Expand Down Expand Up @@ -302,6 +325,8 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(
INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
this::setGenerationThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
Expand All @@ -311,6 +336,14 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
this.flushThresholdSize = byteSizeValue;
}

private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
this.translogRetentionSize = byteSizeValue;
}

private void setTranslogRetentionAge(TimeValue age) {
this.translogRetentionAge = age;
}

private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) {
this.generationThresholdSize = generationThresholdSize;
}
Expand Down Expand Up @@ -469,6 +502,16 @@ public TimeValue getRefreshInterval() {
*/
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; }

/**
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
*/
public ByteSizeValue getTranslogRetentionSize() { return translogRetentionSize; }

/**
* Returns the transaction log retention age which controls the maximum age (time from creation) that translog files will be kept around
*/
public TimeValue getTranslogRetentionAge() { return translogRetentionAge; }

/**
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
* be preserved for rollback purposes, we want to keep the size of individual generations from
Expand Down
Expand Up @@ -150,7 +150,10 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
}
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
this.versionMap = new LiveVersionMap();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
);
this.deletionPolicy = new CombinedDeletionPolicy(
new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
store.incRef();
Expand Down Expand Up @@ -1854,6 +1857,10 @@ public void onSettingsChanged() {
// the setting will be re-interpreted if it's set to true
this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
}
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
final IndexSettings indexSettings = engineConfig.getIndexSettings();
translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis());
translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes());
}

public MergeStats getMergeStats() {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;

/**
Expand Down Expand Up @@ -121,4 +122,8 @@ public int compareTo(BaseTranslogReader o) {
public Path path() {
return path;
}

public long getLastModifiedTime() throws IOException {
return Files.getLastModifiedTime(path).toMillis();
}
}
Expand Up @@ -1522,7 +1522,7 @@ public void trimUnreferencedReaders() throws IOException {
// we're shutdown potentially on some tragic event, don't delete anything
return;
}
long minReferencedGen = deletionPolicy.minTranslogGenRequired();
long minReferencedGen = deletionPolicy.minTranslogGenRequired(readers, current);
assert minReferencedGen >= getMinFileGeneration() :
"deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is ["
+ getMinFileGeneration() + "]";
Expand Down
Expand Up @@ -21,13 +21,17 @@

import org.apache.lucene.util.Counter;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TranslogDeletionPolicy {

/** Records how many views are held against each
* translog generation */
/**
* Records how many views are held against each
* translog generation
*/
private final Map<Long, Counter> translogRefCounts = new HashMap<>();

/**
Expand All @@ -36,14 +40,31 @@ public class TranslogDeletionPolicy {
*/
private long minTranslogGenerationForRecovery = 1;

private long retentionSizeInBytes;

private long retentionAgeInMillis;

public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis) {
this.retentionSizeInBytes = retentionSizeInBytes;
this.retentionAgeInMillis = retentionAgeInMillis;
}

public synchronized void setMinTranslogGenerationForRecovery(long newGen) {
if (newGen < minTranslogGenerationForRecovery) {
throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" +
minTranslogGenerationForRecovery+ "]");
minTranslogGenerationForRecovery + "]");
}
minTranslogGenerationForRecovery = newGen;
}

public synchronized void setRetentionSizeInBytes(long bytes) {
retentionSizeInBytes = bytes;
}

public synchronized void setRetentionAgeInMillis(long ageInMillis) {
retentionAgeInMillis = ageInMillis;
}

/**
* acquires the basis generation for a new view. Any translog generation above, and including, the returned generation
* will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called.
Expand Down Expand Up @@ -74,10 +95,59 @@ synchronized void releaseTranslogGenView(long translogGen) {
/**
* returns the minimum translog generation that is still required by the system. Any generation below
* the returned value may be safely deleted
*
* @param readers current translog readers
* @param writer current translog writer
*/
synchronized long minTranslogGenRequired() {
long viewRefs = translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE);
return Math.min(viewRefs, minTranslogGenerationForRecovery);
synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) throws IOException {
long minByView = getMinTranslogGenRequiredByViews();
long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime());
long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes);
final long minByAgeAndSize;
if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) {
// both size and age are disabled;
minByAgeAndSize = Long.MAX_VALUE;
} else {
minByAgeAndSize = Math.max(minByAge, minBySize);
}
return Math.min(minByAgeAndSize, Math.min(minByView, minTranslogGenerationForRecovery));
}

static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
if (retentionSizeInBytes >= 0) {
long totalSize = writer.sizeInBytes();
long minGen = writer.getGeneration();
for (int i = readers.size() - 1; i >= 0 && totalSize < retentionSizeInBytes; i--) {
final TranslogReader reader = readers.get(i);
totalSize += reader.sizeInBytes();
minGen = reader.getGeneration();
}
return minGen;
} else {
return Long.MIN_VALUE;
}
}

static long getMinTranslogGenByAge(List<TranslogReader> readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now)
throws IOException {
if (maxRetentionAgeInMillis >= 0) {
for (TranslogReader reader: readers) {
if (now - reader.getLastModifiedTime() <= maxRetentionAgeInMillis) {
return reader.getGeneration();
}
}
return writer.getGeneration();
} else {
return Long.MIN_VALUE;
}
}

protected long currentTime() {
return System.currentTimeMillis();
}

private long getMinTranslogGenRequiredByViews() {
return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE);
}

/** returns the translog generation that will be used as a basis of a future store/peer recovery */
Expand Down
Expand Up @@ -116,7 +116,7 @@ public static TranslogReader open(
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
case TranslogWriter.VERSION_CHECKPOINTS:
assert path.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX) : "new file ends with old suffix: " + path;
assert checkpoint.numOps >= 0 : "expected at least 0 operatin but got: " + checkpoint.numOps;
assert checkpoint.numOps >= 0 : "expected at least 0 operation but got: " + checkpoint.numOps;
assert checkpoint.offset <= channel.size() : "checkpoint is inconsistent with channel length: " + channel.size() + " " + checkpoint;
int len = headerStream.readInt();
if (len > channel.size()) {
Expand All @@ -130,8 +130,8 @@ public static TranslogReader open(
throw new TranslogCorruptedException("expected shard UUID " + uuidBytes + " but got: " + ref +
" this translog file belongs to a different translog. path:" + path);
}
final long firstOperationOffset =
ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES;
final long firstOperationOffset;
firstOperationOffset = ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES;
return new TranslogReader(checkpoint, channel, path, firstOperationOffset);

default:
Expand Down
Expand Up @@ -88,6 +88,9 @@ private TranslogWriter(
final ByteSizeValue bufferSize,
final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier) throws IOException {
super(initialCheckpoint.generation, channel, path, channel.position());
assert initialCheckpoint.offset == channel.position() :
"initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel poistion ["
+ channel.position() + "]";
this.shardId = shardId;
this.channelFactory = channelFactory;
this.minTranslogGenerationSupplier = minTranslogGenerationSupplier;
Expand Down Expand Up @@ -116,31 +119,24 @@ static void writeHeader(OutputStreamDataOutput out, BytesRef ref) throws IOExcep
out.writeBytes(ref.bytes, ref.offset, ref.length);
}

public static TranslogWriter create(
ShardId shardId,
String translogUUID,
long fileGeneration,
Path file,
ChannelFactory channelFactory,
ByteSizeValue bufferSize,
final LongSupplier globalCheckpointSupplier,
final long initialMinTranslogGen,
final LongSupplier minTranslogGenerationSupplier) throws IOException {
public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory,
ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier,
final long initialMinTranslogGen, final LongSupplier minTranslogGenerationSupplier)
throws IOException {
final BytesRef ref = new BytesRef(translogUUID);
final int headerLength = getHeaderLength(ref.length);
final int firstOperationOffset = getHeaderLength(ref.length);
final FileChannel channel = channelFactory.open(file);
try {
// This OutputStreamDataOutput is intentionally not closed because
// closing it will close the FileChannel
final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel));
writeHeader(out, ref);
channel.force(true);
final Checkpoint checkpoint =
Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong(),
initialMinTranslogGen);
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(firstOperationOffset, fileGeneration,
globalCheckpointSupplier.getAsLong(), initialMinTranslogGen);
writeCheckpoint(channelFactory, file.getParent(), checkpoint);
return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier,
minTranslogGenerationSupplier);
return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize,
globalCheckpointSupplier, minTranslogGenerationSupplier);
} catch (Exception exception) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
// file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition
Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -39,7 +40,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {

public void testPassThrough() throws IOException {
SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class);
CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, new TranslogDeletionPolicy(),
CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, createTranslogDeletionPolicy(),
EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
List<IndexCommit> commitList = new ArrayList<>();
long count = randomIntBetween(1, 3);
Expand Down

0 comments on commit 9ddea53

Please sign in to comment.