From 9ddea539f509165a4cd9a713d240b9ddedb1ea08 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 16 Jun 2017 09:09:51 +0200 Subject: [PATCH] Introduce translog size and age based retention policies (#25147) This PR extends the TranslogDeletionPolicy to allow keeping the translog files longer than what is needed for recovery from lucene. Specifically, we allow specifying the total size of the files and their maximum age (i.e., keep up to 512MB but no longer than 12 hours). This will allow making ops based recoveries more common. Note that the default size and age still set to 0, maintaining current behavior. This is needed as the other components in the system are not yet ready for a longer translog retention. I will adapt those in follow up PRs. Relates to #10708 --- .../common/settings/IndexScopedSettings.java | 2 + .../elasticsearch/index/IndexSettings.java | 43 ++++ .../index/engine/InternalEngine.java | 9 +- .../index/translog/BaseTranslogReader.java | 5 + .../index/translog/Translog.java | 2 +- .../translog/TranslogDeletionPolicy.java | 82 ++++++- .../index/translog/TranslogReader.java | 6 +- .../index/translog/TranslogWriter.java | 28 +-- .../engine/CombinedDeletionPolicyTests.java | 3 +- .../index/engine/InternalEngineTests.java | 5 +- .../translog/TranslogDeletionPolicyTests.java | 214 ++++++++++++++++++ .../index/translog/TranslogTests.java | 71 ++++-- 12 files changed, 418 insertions(+), 52 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 9fcafcea3b2f3..ae4cf6cd41a36 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -127,6 +127,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, + IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, + IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 2764ffd38cc17..43ddb09e61f0a 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -37,6 +37,7 @@ import java.util.Locale; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -111,6 +112,24 @@ public final class IndexSettings { Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic, Property.IndexScope); + /** + * Controls how long translog files that are no longer needed for persistence reasons + * will be kept around before being deleted. A longer retention policy is useful to increase + * the chance of ops based recoveries. + **/ + public static final Setting INDEX_TRANSLOG_RETENTION_AGE_SETTING = + Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueMillis(-1), TimeValue.timeValueMillis(-1), Property.Dynamic, + Property.IndexScope); + + /** + * Controls how many translog files that are no longer needed for persistence reasons + * will be kept around before being deleted. Keeping more files is useful to increase + * the chance of ops based recoveries. + **/ + public static final Setting INDEX_TRANSLOG_RETENTION_SIZE_SETTING = + Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(-1, ByteSizeUnit.MB), Property.Dynamic, + Property.IndexScope); + /** * The maximum size of a translog generation. This is independent of the maximum size of * translog operations that have not been flushed. @@ -168,6 +187,8 @@ public final class IndexSettings { private final TimeValue syncInterval; private volatile TimeValue refreshInterval; private volatile ByteSizeValue flushThresholdSize; + private volatile TimeValue translogRetentionAge; + private volatile ByteSizeValue translogRetentionSize; private volatile ByteSizeValue generationThresholdSize; private final MergeSchedulerConfig mergeSchedulerConfig; private final MergePolicyConfig mergePolicyConfig; @@ -265,6 +286,8 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings); refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); + translogRetentionAge = scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING); + translogRetentionSize = scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING); generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); @@ -302,6 +325,8 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer( INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, this::setGenerationThresholdSize); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize); scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); @@ -311,6 +336,14 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) { this.flushThresholdSize = byteSizeValue; } + private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) { + this.translogRetentionSize = byteSizeValue; + } + + private void setTranslogRetentionAge(TimeValue age) { + this.translogRetentionAge = age; + } + private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) { this.generationThresholdSize = generationThresholdSize; } @@ -469,6 +502,16 @@ public TimeValue getRefreshInterval() { */ public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; } + /** + * Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries + */ + public ByteSizeValue getTranslogRetentionSize() { return translogRetentionSize; } + + /** + * Returns the transaction log retention age which controls the maximum age (time from creation) that translog files will be kept around + */ + public TimeValue getTranslogRetentionAge() { return translogRetentionAge; } + /** * Returns the generation threshold size. As sequence numbers can cause multiple generations to * be preserved for rollback purposes, we want to keep the size of individual generations from diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f84f76b537e0d..6d10a0290995a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -150,7 +150,10 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { } this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME; this.versionMap = new LiveVersionMap(); - final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( + engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), + engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() + ); this.deletionPolicy = new CombinedDeletionPolicy( new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode); store.incRef(); @@ -1854,6 +1857,10 @@ public void onSettingsChanged() { // the setting will be re-interpreted if it's set to true this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); } + final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); + final IndexSettings indexSettings = engineConfig.getIndexSettings(); + translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis()); + translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes()); } public MergeStats getMergeStats() { diff --git a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java index 6f392c195fd19..7f8b7f3fb2c76 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.Files; import java.nio.file.Path; /** @@ -121,4 +122,8 @@ public int compareTo(BaseTranslogReader o) { public Path path() { return path; } + + public long getLastModifiedTime() throws IOException { + return Files.getLastModifiedTime(path).toMillis(); + } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index d4a5fe0d99fd7..66d370c121f2c 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1522,7 +1522,7 @@ public void trimUnreferencedReaders() throws IOException { // we're shutdown potentially on some tragic event, don't delete anything return; } - long minReferencedGen = deletionPolicy.minTranslogGenRequired(); + long minReferencedGen = deletionPolicy.minTranslogGenRequired(readers, current); assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" + getMinFileGeneration() + "]"; diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 84f61a642cc8e..732b38fcedfe4 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -21,13 +21,17 @@ import org.apache.lucene.util.Counter; +import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; public class TranslogDeletionPolicy { - /** Records how many views are held against each - * translog generation */ + /** + * Records how many views are held against each + * translog generation + */ private final Map translogRefCounts = new HashMap<>(); /** @@ -36,14 +40,31 @@ public class TranslogDeletionPolicy { */ private long minTranslogGenerationForRecovery = 1; + private long retentionSizeInBytes; + + private long retentionAgeInMillis; + + public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis) { + this.retentionSizeInBytes = retentionSizeInBytes; + this.retentionAgeInMillis = retentionAgeInMillis; + } + public synchronized void setMinTranslogGenerationForRecovery(long newGen) { if (newGen < minTranslogGenerationForRecovery) { throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" + - minTranslogGenerationForRecovery+ "]"); + minTranslogGenerationForRecovery + "]"); } minTranslogGenerationForRecovery = newGen; } + public synchronized void setRetentionSizeInBytes(long bytes) { + retentionSizeInBytes = bytes; + } + + public synchronized void setRetentionAgeInMillis(long ageInMillis) { + retentionAgeInMillis = ageInMillis; + } + /** * acquires the basis generation for a new view. Any translog generation above, and including, the returned generation * will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called. @@ -74,10 +95,59 @@ synchronized void releaseTranslogGenView(long translogGen) { /** * returns the minimum translog generation that is still required by the system. Any generation below * the returned value may be safely deleted + * + * @param readers current translog readers + * @param writer current translog writer */ - synchronized long minTranslogGenRequired() { - long viewRefs = translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE); - return Math.min(viewRefs, minTranslogGenerationForRecovery); + synchronized long minTranslogGenRequired(List readers, TranslogWriter writer) throws IOException { + long minByView = getMinTranslogGenRequiredByViews(); + long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime()); + long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes); + final long minByAgeAndSize; + if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) { + // both size and age are disabled; + minByAgeAndSize = Long.MAX_VALUE; + } else { + minByAgeAndSize = Math.max(minByAge, minBySize); + } + return Math.min(minByAgeAndSize, Math.min(minByView, minTranslogGenerationForRecovery)); + } + + static long getMinTranslogGenBySize(List readers, TranslogWriter writer, long retentionSizeInBytes) { + if (retentionSizeInBytes >= 0) { + long totalSize = writer.sizeInBytes(); + long minGen = writer.getGeneration(); + for (int i = readers.size() - 1; i >= 0 && totalSize < retentionSizeInBytes; i--) { + final TranslogReader reader = readers.get(i); + totalSize += reader.sizeInBytes(); + minGen = reader.getGeneration(); + } + return minGen; + } else { + return Long.MIN_VALUE; + } + } + + static long getMinTranslogGenByAge(List readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now) + throws IOException { + if (maxRetentionAgeInMillis >= 0) { + for (TranslogReader reader: readers) { + if (now - reader.getLastModifiedTime() <= maxRetentionAgeInMillis) { + return reader.getGeneration(); + } + } + return writer.getGeneration(); + } else { + return Long.MIN_VALUE; + } + } + + protected long currentTime() { + return System.currentTimeMillis(); + } + + private long getMinTranslogGenRequiredByViews() { + return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE); } /** returns the translog generation that will be used as a basis of a future store/peer recovery */ diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java index 9057207501cdf..46439afead10a 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java @@ -116,7 +116,7 @@ public static TranslogReader open( throw new IllegalStateException("pre-2.0 translog found [" + path + "]"); case TranslogWriter.VERSION_CHECKPOINTS: assert path.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX) : "new file ends with old suffix: " + path; - assert checkpoint.numOps >= 0 : "expected at least 0 operatin but got: " + checkpoint.numOps; + assert checkpoint.numOps >= 0 : "expected at least 0 operation but got: " + checkpoint.numOps; assert checkpoint.offset <= channel.size() : "checkpoint is inconsistent with channel length: " + channel.size() + " " + checkpoint; int len = headerStream.readInt(); if (len > channel.size()) { @@ -130,8 +130,8 @@ public static TranslogReader open( throw new TranslogCorruptedException("expected shard UUID " + uuidBytes + " but got: " + ref + " this translog file belongs to a different translog. path:" + path); } - final long firstOperationOffset = - ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES; + final long firstOperationOffset; + firstOperationOffset = ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES; return new TranslogReader(checkpoint, channel, path, firstOperationOffset); default: diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index d637c9da79f65..2c0bd0c7d89c8 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -88,6 +88,9 @@ private TranslogWriter( final ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier) throws IOException { super(initialCheckpoint.generation, channel, path, channel.position()); + assert initialCheckpoint.offset == channel.position() : + "initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel poistion [" + + channel.position() + "]"; this.shardId = shardId; this.channelFactory = channelFactory; this.minTranslogGenerationSupplier = minTranslogGenerationSupplier; @@ -116,18 +119,12 @@ static void writeHeader(OutputStreamDataOutput out, BytesRef ref) throws IOExcep out.writeBytes(ref.bytes, ref.offset, ref.length); } - public static TranslogWriter create( - ShardId shardId, - String translogUUID, - long fileGeneration, - Path file, - ChannelFactory channelFactory, - ByteSizeValue bufferSize, - final LongSupplier globalCheckpointSupplier, - final long initialMinTranslogGen, - final LongSupplier minTranslogGenerationSupplier) throws IOException { + public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, + ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, + final long initialMinTranslogGen, final LongSupplier minTranslogGenerationSupplier) + throws IOException { final BytesRef ref = new BytesRef(translogUUID); - final int headerLength = getHeaderLength(ref.length); + final int firstOperationOffset = getHeaderLength(ref.length); final FileChannel channel = channelFactory.open(file); try { // This OutputStreamDataOutput is intentionally not closed because @@ -135,12 +132,11 @@ public static TranslogWriter create( final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel)); writeHeader(out, ref); channel.force(true); - final Checkpoint checkpoint = - Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong(), - initialMinTranslogGen); + final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(firstOperationOffset, fileGeneration, + globalCheckpointSupplier.getAsLong(), initialMinTranslogGen); writeCheckpoint(channelFactory, file.getParent(), checkpoint); - return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier, - minTranslogGenerationSupplier); + return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, + globalCheckpointSupplier, minTranslogGenerationSupplier); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition diff --git a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index d21273a7b0335..d1eef05c2efa1 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.List; +import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -39,7 +40,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testPassThrough() throws IOException { SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class); - CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, new TranslogDeletionPolicy(), + CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, createTranslogDeletionPolicy(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); List commitList = new ArrayList<>(); long count = randomIntBetween(1, 3); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 16e746a67f7bd..af18781dfa6e9 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -180,6 +180,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; +import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -336,7 +337,7 @@ protected Translog createTranslog() throws IOException { protected Translog createTranslog(Path translogPath) throws IOException { TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); - return new Translog(translogConfig, null, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + return new Translog(translogConfig, null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { @@ -2795,7 +2796,7 @@ public void testRecoverFromForeignTranslog() throws IOException { Translog translog = new Translog( new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), - null, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); translog.add(new Translog.Index("test", "SomeBogusId", 0, "{}".getBytes(Charset.forName("UTF-8")))); assertEquals(generation.translogFileGeneration, translog.currentFileGeneration()); translog.close(); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java new file mode 100644 index 0000000000000..3ed595543f8ea --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -0,0 +1,214 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.apache.lucene.store.ByteArrayDataOutput; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + + +public class TranslogDeletionPolicyTests extends ESTestCase { + + public static TranslogDeletionPolicy createTranslogDeletionPolicy() { + return new TranslogDeletionPolicy( + IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), + IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis() + ); + } + + public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) { + return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(), + indexSettings.getTranslogRetentionAge().getMillis()); + } + + public void testNoRetention() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0); + assertMinGenRequired(deletionPolicy, readersAndWriter, 1L); + final int committedReader = randomIntBetween(0, allGens.size() - 1); + final long committedGen = allGens.get(committedReader).generation; + deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); + assertMinGenRequired(deletionPolicy, readersAndWriter, committedGen); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + + public void testBytesRetention() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + final int selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGeneration = allGens.get(selectedReader).generation; + long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); + assertThat(TranslogDeletionPolicy.getMinTranslogGenBySize(readersAndWriter.v1(), readersAndWriter.v2(), size), + equalTo(selectedGeneration)); + assertThat(TranslogDeletionPolicy.getMinTranslogGenBySize(readersAndWriter.v1(), readersAndWriter.v2(), -1), + equalTo(Long.MIN_VALUE)); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + + public void testAgeRetention() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + final int selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGeneration = allGens.get(selectedReader).generation; + long maxAge = now - allGens.get(selectedReader).getLastModifiedTime(); + assertThat(TranslogDeletionPolicy.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), maxAge, now), + equalTo(selectedGeneration)); + assertThat(TranslogDeletionPolicy.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), -1, now), + equalTo(Long.MIN_VALUE)); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + + /** + * Tests that age trumps size but recovery trumps both. + */ + public void testRetentionHierarchy() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE); + deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE); + int selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGenerationByAge = allGens.get(selectedReader).generation; + long maxAge = now - allGens.get(selectedReader).getLastModifiedTime(); + selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGenerationBySize = allGens.get(selectedReader).generation; + long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); + selectedReader = randomIntBetween(0, allGens.size() - 1); + long committedGen = allGens.get(selectedReader).generation; + deletionPolicy.setRetentionAgeInMillis(maxAge); + deletionPolicy.setRetentionSizeInBytes(size); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.max(selectedGenerationByAge, selectedGenerationBySize)); + // make a new policy as committed gen can't go backwards (for now) + deletionPolicy = new MockDeletionPolicy(now, size, maxAge); + deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); + assertMinGenRequired(deletionPolicy, readersAndWriter, + Math.min(committedGen, Math.max(selectedGenerationByAge, selectedGenerationBySize))); + long viewGen = deletionPolicy.acquireTranslogGenForView(); + selectedReader = randomIntBetween(selectedReader, allGens.size() - 1); + committedGen = allGens.get(selectedReader).generation; + deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); + assertMinGenRequired(deletionPolicy, readersAndWriter, + Math.min( + Math.min(committedGen, viewGen), + Math.max(selectedGenerationByAge, selectedGenerationBySize))); + // disable age + deletionPolicy.setRetentionAgeInMillis(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationBySize)); + // disable size + deletionPolicy.setRetentionAgeInMillis(maxAge); + deletionPolicy.setRetentionSizeInBytes(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationByAge)); + // disable both + deletionPolicy.setRetentionAgeInMillis(-1); + deletionPolicy.setRetentionSizeInBytes(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + + } + + private void assertMinGenRequired(TranslogDeletionPolicy deletionPolicy, Tuple, TranslogWriter> readersAndWriter, + long expectedGen) throws IOException { + assertThat(deletionPolicy.minTranslogGenRequired(readersAndWriter.v1(), readersAndWriter.v2()), equalTo(expectedGen)); + } + + private Tuple, TranslogWriter> createReadersAndWriter(final long now) throws IOException { + final Path tempDir = createTempDir(); + Files.createFile(tempDir.resolve(Translog.CHECKPOINT_FILE_NAME)); + TranslogWriter writer = null; + List readers = new ArrayList<>(); + final int numberOfReaders = randomIntBetween(0, 10); + for (long gen = 1; gen <= numberOfReaders + 1; gen++) { + if (writer != null) { + final TranslogReader reader = Mockito.spy(writer.closeIntoReader()); + Mockito.doReturn(writer.getLastModifiedTime()).when(reader).getLastModifiedTime(); + readers.add(reader); + } + writer = TranslogWriter.create(new ShardId("index", "uuid", 0), "translog_uuid", gen, + tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, () -> 1L, 1L, () -> 1L + ); + writer = Mockito.spy(writer); + Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); + + byte[] bytes = new byte[4]; + ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); + + for (int ops = randomIntBetween(0, 20); ops > 0; ops--) { + out.reset(bytes); + out.writeInt(ops); + writer.add(new BytesArray(bytes), ops); + } + } + return new Tuple<>(readers, writer); + } + + private static class MockDeletionPolicy extends TranslogDeletionPolicy { + + long now; + + MockDeletionPolicy(long now, long retentionSizeInBytes, long maxRetentionAgeInMillis) { + super(retentionSizeInBytes, maxRetentionAgeInMillis); + this.now = now; + } + + @Override + protected long currentTime() { + return now; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 4fe97919c38a8..21bc1a14bc55f 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -106,6 +106,7 @@ import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -141,7 +142,8 @@ protected void afterIfSuccessful() throws Exception { } protected Translog createTranslog(TranslogConfig config, String translogUUID) throws IOException { - return new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + return new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()), + () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } private void markCurrentGenAsCommitted(Translog translog) throws IOException { @@ -157,11 +159,6 @@ private void commit(Translog translog, long genToCommit) throws IOException { final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(genToCommit); translog.trimUnreferencedReaders(); - if (deletionPolicy.pendingViewsCount() == 0) { - assertThat(deletionPolicy.minTranslogGenRequired(), equalTo(genToCommit)); - } - // we may have some views closed concurrently causing the deletion policy to increase it's minTranslogGenRequired - assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(deletionPolicy.minTranslogGenRequired())); } @Override @@ -186,7 +183,9 @@ public void tearDown() throws Exception { private Translog create(Path path) throws IOException { globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO); - return new Translog(getTranslogConfig(path), null, new TranslogDeletionPolicy(), () -> globalCheckpoint.get()); + final TranslogConfig translogConfig = getTranslogConfig(path); + final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); + return new Translog(translogConfig, null, deletionPolicy, () -> globalCheckpoint.get()); } private TranslogConfig getTranslogConfig(final Path path) { @@ -1104,7 +1103,12 @@ public void testCloseIntoReader() throws IOException { } writer.sync(); final Checkpoint writerCheckpoint = writer.getCheckpoint(); - try (TranslogReader reader = writer.closeIntoReader()) { + TranslogReader reader = writer.closeIntoReader(); + try { + if (randomBoolean()) { + reader.close(); + reader = translog.openReader(reader.path(), writerCheckpoint); + } for (int i = 0; i < numOps; i++) { final ByteBuffer buffer = ByteBuffer.allocate(4); reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); @@ -1114,6 +1118,8 @@ public void testCloseIntoReader() throws IOException { } final Checkpoint readerCheckpoint = reader.getCheckpoint(); assertThat(readerCheckpoint, equalTo(writerCheckpoint)); + } finally { + IOUtils.close(reader); } } } @@ -1376,7 +1382,7 @@ public void testOpenForeignTranslog() throws IOException { final String foreignTranslog = randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()); try { - new Translog(config, foreignTranslog, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + new Translog(config, foreignTranslog, createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); fail("translog doesn't belong to this UUID"); } catch (TranslogCorruptedException ex) { @@ -1602,7 +1608,7 @@ public void testTragicEventCanBeAnyException() throws IOException { Path tempDir = createTempDir(); final FailSwitch fail = new FailSwitch(); TranslogConfig config = getTranslogConfig(tempDir); - Translog translog = getFailableTranslog(fail, config, false, true, null, new TranslogDeletionPolicy()); + Translog translog = getFailableTranslog(fail, config, false, true, null, createTranslogDeletionPolicy()); LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly translog.add(new Translog.Index("test", "1", 0, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); fail.failAlways(); @@ -1697,7 +1703,7 @@ protected void afterAdd() throws IOException { iterator.remove(); } } - try (Translog tlog = new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + try (Translog tlog = new Translog(config, translogUUID, createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { Translog.Snapshot snapshot = tlog.newSnapshot(); if (writtenOperations.size() != snapshot.totalOperations()) { for (int i = 0; i < threadCount; i++) { @@ -1740,7 +1746,7 @@ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { // engine blows up, after committing the above generation translog.close(); TranslogConfig config = translog.getConfig(); - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); assertThat(translog.getMinFileGeneration(), equalTo(1L)); @@ -1789,7 +1795,7 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { // expected... } } - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { // we don't know when things broke exactly @@ -1803,7 +1809,7 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { } private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException { - return getFailableTranslog(fail, config, randomBoolean(), false, null, new TranslogDeletionPolicy()); + return getFailableTranslog(fail, config, randomBoolean(), false, null, createTranslogDeletionPolicy()); } private static class FailSwitch { @@ -1965,7 +1971,7 @@ public void testFailWhileCreateWriteWithRecoveredTLogs() throws IOException { translog.add(new Translog.Index("test", "boom", 0, "boom".getBytes(Charset.forName("UTF-8")))); translog.close(); try { - new Translog(config, translog.getTranslogUUID(), new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) { + new Translog(config, translog.getTranslogUUID(), createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) { @Override protected TranslogWriter createWriter(long fileGeneration) throws IOException { throw new MockDirectoryWrapper.FakeIOException(); @@ -2083,7 +2089,7 @@ public void testWithRandomException() throws IOException { String generationUUID = null; try { boolean committing = false; - final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, new TranslogDeletionPolicy()); + final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, createTranslogDeletionPolicy()); try { LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { @@ -2142,7 +2148,7 @@ public void testWithRandomException() throws IOException { // now randomly open this failing tlog again just to make sure we can also recover from failing during recovery if (randomBoolean()) { try { - TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, deletionPolicy)); } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { @@ -2153,7 +2159,7 @@ public void testWithRandomException() throws IOException { } fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file - TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { Translog.Snapshot snapshot = translog.newSnapshot(); @@ -2218,7 +2224,7 @@ public void testPendingDelete() throws IOException { translog.rollGeneration(); TranslogConfig config = translog.getConfig(); final String translogUUID = translog.getTranslogUUID(); - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings()); translog.close(); translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); translog.add(new Translog.Index("test", "2", 1, new byte[]{2})); @@ -2293,7 +2299,14 @@ public void testTranslogOpSerialization() throws Exception { assertEquals("my_id", serializedDelete.id()); } - public void testRollGeneration() throws IOException { + public void testRollGeneration() throws Exception { + // make sure we keep some files around + final boolean longRetention = randomBoolean(); + if (longRetention) { + translog.getDeletionPolicy().setRetentionAgeInMillis(3600 * 1000); + } else { + translog.getDeletionPolicy().setRetentionAgeInMillis(-1); + } final long generation = translog.currentFileGeneration(); final int rolls = randomIntBetween(1, 16); int totalOperations = 0; @@ -2316,8 +2329,22 @@ public void testRollGeneration() throws IOException { commit(translog, generation + rolls); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls )); assertThat(translog.totalOperations(), equalTo(0)); - for (int i = 0; i < rolls; i++) { - assertFileDeleted(translog, generation + i); + if (longRetention) { + for (int i = 0; i <= rolls; i++) { + assertFileIsPresent(translog, generation + i); + } + translog.getDeletionPolicy().setRetentionAgeInMillis(randomBoolean() ? 100 : -1); + assertBusy(() -> { + translog.trimUnreferencedReaders(); + for (int i = 0; i < rolls; i++) { + assertFileDeleted(translog, generation + i); + } + }); + } else { + // immediate cleanup + for (int i = 0; i < rolls; i++) { + assertFileDeleted(translog, generation + i); + } } assertFileIsPresent(translog, generation + rolls); }