Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce translog size and age based retention policies #25147

Merged
merged 17 commits into from Jun 16, 2017
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -127,6 +127,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
43 changes: 43 additions & 0 deletions core/src/main/java/org/elasticsearch/index/IndexSettings.java
Expand Up @@ -37,6 +37,7 @@

import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -111,6 +112,24 @@ public final class IndexSettings {
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);

/**
* Controls how long translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. A longer retention policy is useful to increase
* the chance of ops based recoveries.
**/
public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueMillis(-1), TimeValue.timeValueMillis(-1), Property.Dynamic,
Property.IndexScope);

/**
* Controls how many translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. Keeping more files is useful to increase
* the chance of ops based recoveries.
**/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(-1, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);

/**
* The maximum size of a translog generation. This is independent of the maximum size of
* translog operations that have not been flushed.
Expand Down Expand Up @@ -168,6 +187,8 @@ public final class IndexSettings {
private final TimeValue syncInterval;
private volatile TimeValue refreshInterval;
private volatile ByteSizeValue flushThresholdSize;
private volatile TimeValue translogRetentionAge;
private volatile ByteSizeValue translogRetentionSize;
private volatile ByteSizeValue generationThresholdSize;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final MergePolicyConfig mergePolicyConfig;
Expand Down Expand Up @@ -265,6 +286,8 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
translogRetentionAge = scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING);
translogRetentionSize = scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
Expand Down Expand Up @@ -302,6 +325,8 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(
INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
this::setGenerationThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
Expand All @@ -311,6 +336,14 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
this.flushThresholdSize = byteSizeValue;
}

private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
this.translogRetentionSize = byteSizeValue;
}

private void setTranslogRetentionAge(TimeValue age) {
this.translogRetentionAge = age;
}

private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) {
this.generationThresholdSize = generationThresholdSize;
}
Expand Down Expand Up @@ -469,6 +502,16 @@ public TimeValue getRefreshInterval() {
*/
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; }

/**
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
*/
public ByteSizeValue getTranslogRetentionSize() { return translogRetentionSize; }

/**
* Returns the transaction log retention age which controls the maximum age (time from creation) that translog files will be kept around
*/
public TimeValue getTranslogRetentionAge() { return translogRetentionAge; }

/**
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
* be preserved for rollback purposes, we want to keep the size of individual generations from
Expand Down
Expand Up @@ -150,7 +150,10 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
}
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
this.versionMap = new LiveVersionMap();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
);
this.deletionPolicy = new CombinedDeletionPolicy(
new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
store.incRef();
Expand Down Expand Up @@ -1854,6 +1857,10 @@ public void onSettingsChanged() {
// the setting will be re-interpreted if it's set to true
this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
}
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
final IndexSettings indexSettings = engineConfig.getIndexSettings();
translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis());
translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes());
}

public MergeStats getMergeStats() {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;

/**
Expand Down Expand Up @@ -121,4 +122,8 @@ public int compareTo(BaseTranslogReader o) {
public Path path() {
return path;
}

public long getLastModifiedTime() throws IOException {
return Files.getLastModifiedTime(path).toMillis();
}
}
Expand Up @@ -1522,7 +1522,7 @@ public void trimUnreferencedReaders() throws IOException {
// we're shutdown potentially on some tragic event, don't delete anything
return;
}
long minReferencedGen = deletionPolicy.minTranslogGenRequired();
long minReferencedGen = deletionPolicy.minTranslogGenRequired(readers, current);
assert minReferencedGen >= getMinFileGeneration() :
"deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is ["
+ getMinFileGeneration() + "]";
Expand Down
Expand Up @@ -21,13 +21,17 @@

import org.apache.lucene.util.Counter;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TranslogDeletionPolicy {

/** Records how many views are held against each
* translog generation */
/**
* Records how many views are held against each
* translog generation
*/
private final Map<Long, Counter> translogRefCounts = new HashMap<>();

/**
Expand All @@ -36,14 +40,31 @@ public class TranslogDeletionPolicy {
*/
private long minTranslogGenerationForRecovery = 1;

private long retentionSizeInBytes;

private long retentionAgeInMillis;

public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis) {
this.retentionSizeInBytes = retentionSizeInBytes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why no max on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question. Will add.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will actually remove the max from the other one. Shorter.

this.retentionAgeInMillis = retentionAgeInMillis;
}

public synchronized void setMinTranslogGenerationForRecovery(long newGen) {
if (newGen < minTranslogGenerationForRecovery) {
throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" +
minTranslogGenerationForRecovery+ "]");
minTranslogGenerationForRecovery + "]");
}
minTranslogGenerationForRecovery = newGen;
}

public synchronized void setRetentionSizeInBytes(long bytes) {
retentionSizeInBytes = bytes;
}

public synchronized void setRetentionAgeInMillis(long ageInMillis) {
retentionAgeInMillis = ageInMillis;
}

/**
* acquires the basis generation for a new view. Any translog generation above, and including, the returned generation
* will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called.
Expand Down Expand Up @@ -74,10 +95,59 @@ synchronized void releaseTranslogGenView(long translogGen) {
/**
* returns the minimum translog generation that is still required by the system. Any generation below
* the returned value may be safely deleted
*
* @param readers current translog readers
* @param writer current translog writer
*/
synchronized long minTranslogGenRequired() {
long viewRefs = translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE);
return Math.min(viewRefs, minTranslogGenerationForRecovery);
synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) throws IOException {
long minByView = getMinTranslogGenRequiredByViews();
long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime());
long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes);
final long minByAgeAndSize;
if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) {
// both size and age are disabled;
minByAgeAndSize = Long.MAX_VALUE;
} else {
minByAgeAndSize = Math.max(minByAge, minBySize);
}
return Math.min(minByAgeAndSize, Math.min(minByView, minTranslogGenerationForRecovery));
}

static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
if (retentionSizeInBytes >= 0) {
long totalSize = writer.sizeInBytes();
long minGen = writer.getGeneration();
for (int i = readers.size() - 1; i >= 0 && totalSize < retentionSizeInBytes; i--) {
final TranslogReader reader = readers.get(i);
totalSize += reader.sizeInBytes();
minGen = reader.getGeneration();
}
return minGen;
} else {
return Long.MIN_VALUE;
}
}

static long getMinTranslogGenByAge(List<TranslogReader> readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now)
throws IOException {
if (maxRetentionAgeInMillis >= 0) {
for (TranslogReader reader: readers) {
if (now - reader.getLastModifiedTime() <= maxRetentionAgeInMillis) {
return reader.getGeneration();
}
}
return writer.getGeneration();
} else {
return Long.MIN_VALUE;
}
}

protected long currentTime() {
return System.currentTimeMillis();
}

private long getMinTranslogGenRequiredByViews() {
return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE);
}

/** returns the translog generation that will be used as a basis of a future store/peer recovery */
Expand Down
Expand Up @@ -116,7 +116,7 @@ public static TranslogReader open(
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
case TranslogWriter.VERSION_CHECKPOINTS:
assert path.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX) : "new file ends with old suffix: " + path;
assert checkpoint.numOps >= 0 : "expected at least 0 operatin but got: " + checkpoint.numOps;
assert checkpoint.numOps >= 0 : "expected at least 0 operation but got: " + checkpoint.numOps;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just an idea, should we use the last modification time on the file instead? I mean it should be ok to do that no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tja, maybe. I tried to avoid the entire what metadata is supported where discussion. I mean we walk to great length to avoid directory traversals. I think this is simple, solid and entirely self contained?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I agree with this. it's redundant information and no traversal needed. you have the file you just need to read the time created. I am really careful with any kind of added metadata here you know why.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am really careful with any kind of added metadata here you know why

I totally hear you. It's clearly and edge case. One other aspect of they way I set it up now is that it's easy to test. With FS based metadata we'd probably have to mock something, which is also doable but messier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lean towards using the filesystem mtime here. I do not think this would be messy to mock, only a method that you can override in tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I personally feel it might end up messier but I trust your judgement. I'll roll back the extra inlined creation date and move to using FS mtime. We can always re-add the inline creation date if we run into trouble. Do-The-Minimal-Change-Needed™️

assert checkpoint.offset <= channel.size() : "checkpoint is inconsistent with channel length: " + channel.size() + " " + checkpoint;
int len = headerStream.readInt();
if (len > channel.size()) {
Expand All @@ -130,8 +130,8 @@ public static TranslogReader open(
throw new TranslogCorruptedException("expected shard UUID " + uuidBytes + " but got: " + ref +
" this translog file belongs to a different translog. path:" + path);
}
final long firstOperationOffset =
ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES;
final long firstOperationOffset;
firstOperationOffset = ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES;
return new TranslogReader(checkpoint, channel, path, firstOperationOffset);

default:
Expand Down
Expand Up @@ -88,6 +88,9 @@ private TranslogWriter(
final ByteSizeValue bufferSize,
final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier) throws IOException {
super(initialCheckpoint.generation, channel, path, channel.position());
assert initialCheckpoint.offset == channel.position() :
"initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel poistion ["
+ channel.position() + "]";
this.shardId = shardId;
this.channelFactory = channelFactory;
this.minTranslogGenerationSupplier = minTranslogGenerationSupplier;
Expand Down Expand Up @@ -116,31 +119,24 @@ static void writeHeader(OutputStreamDataOutput out, BytesRef ref) throws IOExcep
out.writeBytes(ref.bytes, ref.offset, ref.length);
}

public static TranslogWriter create(
ShardId shardId,
String translogUUID,
long fileGeneration,
Path file,
ChannelFactory channelFactory,
ByteSizeValue bufferSize,
final LongSupplier globalCheckpointSupplier,
final long initialMinTranslogGen,
final LongSupplier minTranslogGenerationSupplier) throws IOException {
public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory,
ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier,
final long initialMinTranslogGen, final LongSupplier minTranslogGenerationSupplier)
throws IOException {
final BytesRef ref = new BytesRef(translogUUID);
final int headerLength = getHeaderLength(ref.length);
final int firstOperationOffset = getHeaderLength(ref.length);
final FileChannel channel = channelFactory.open(file);
try {
// This OutputStreamDataOutput is intentionally not closed because
// closing it will close the FileChannel
final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel));
writeHeader(out, ref);
channel.force(true);
final Checkpoint checkpoint =
Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong(),
initialMinTranslogGen);
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(firstOperationOffset, fileGeneration,
globalCheckpointSupplier.getAsLong(), initialMinTranslogGen);
writeCheckpoint(channelFactory, file.getParent(), checkpoint);
return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier,
minTranslogGenerationSupplier);
return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize,
globalCheckpointSupplier, minTranslogGenerationSupplier);
} catch (Exception exception) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
// file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition
Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -39,7 +40,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {

public void testPassThrough() throws IOException {
SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class);
CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, new TranslogDeletionPolicy(),
CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, createTranslogDeletionPolicy(),
EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
List<IndexCommit> commitList = new ArrayList<>();
long count = randomIntBetween(1, 3);
Expand Down