From 3e32b7aca92bffa6117aee35a65b45dbf418ec5d Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 9 Jun 2017 10:43:25 +0200 Subject: [PATCH 01/10] retention settings --- .../common/settings/IndexScopedSettings.java | 2 + .../elasticsearch/index/IndexSettings.java | 34 +++ .../elasticsearch/index/engine/Engine.java | 2 +- .../index/engine/InternalEngine.java | 12 +- .../elasticsearch/index/shard/IndexShard.java | 4 +- .../index/translog/BaseTranslogReader.java | 8 +- .../index/translog/Translog.java | 2 +- .../translog/TranslogDeletionPolicy.java | 73 ++++++- .../index/translog/TranslogReader.java | 23 ++- .../index/translog/TranslogSnapshot.java | 2 +- .../index/translog/TranslogWriter.java | 43 +++- .../translog/TruncateTranslogCommand.java | 2 +- .../engine/CombinedDeletionPolicyTests.java | 3 +- .../index/engine/InternalEngineTests.java | 5 +- .../translog/TranslogDeletionPolicyTests.java | 194 ++++++++++++++++++ .../index/translog/TranslogTests.java | 56 ++--- 16 files changed, 406 insertions(+), 59 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 9fcafcea3b2f3..ae4cf6cd41a36 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -127,6 +127,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, + IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, + IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 2764ffd38cc17..9bcd5cf3aa53c 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -37,6 +37,7 @@ import java.util.Locale; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -111,6 +112,14 @@ public final class IndexSettings { Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic, Property.IndexScope); + public static final Setting INDEX_TRANSLOG_RETENTION_SIZE_SETTING = + Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(0, ByteSizeUnit.MB), Property.Dynamic, + Property.IndexScope); + + public static final Setting INDEX_TRANSLOG_RETENTION_AGE_SETTING = + Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueHours(0), TimeValue.timeValueMillis(-1), Property.Dynamic, + Property.IndexScope); + /** * The maximum size of a translog generation. This is independent of the maximum size of * translog operations that have not been flushed. @@ -168,6 +177,8 @@ public final class IndexSettings { private final TimeValue syncInterval; private volatile TimeValue refreshInterval; private volatile ByteSizeValue flushThresholdSize; + private volatile TimeValue translogRetentionAge; + private volatile ByteSizeValue translogRetentionSize; private volatile ByteSizeValue generationThresholdSize; private final MergeSchedulerConfig mergeSchedulerConfig; private final MergePolicyConfig mergePolicyConfig; @@ -265,6 +276,9 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings); refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); + translogRetentionAge = scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING); + translogRetentionSize = scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING); + flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); @@ -302,6 +316,8 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer( INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, this::setGenerationThresholdSize); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize); scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); @@ -311,6 +327,14 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) { this.flushThresholdSize = byteSizeValue; } + private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) { + this.translogRetentionSize = byteSizeValue; + } + + private void setTranslogRetentionAge(TimeValue age) { + this.translogRetentionAge = age; + } + private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) { this.generationThresholdSize = generationThresholdSize; } @@ -469,6 +493,16 @@ public TimeValue getRefreshInterval() { */ public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; } + /** + * Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries + */ + public ByteSizeValue getTranslogRetentionSize() { return translogRetentionSize; } + + /** + * Returns the transaction log retention age which controls the maximum age (time from creation) that translog files will be kept around + */ + public TimeValue getTranslogRetentionAge() { return translogRetentionAge; } + /** * Returns the generation threshold size. As sequence numbers can cause multiple generations to * be preserved for rollback purposes, we want to keep the size of individual generations from diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 7763c8d04a4e7..9e0dc7ec06fcc 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1402,7 +1402,7 @@ public IndexCommit getIndexCommit() { } } - public void onSettingsChanged() { + public void onSettingsChanged() throws IOException { } /** diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f84f76b537e0d..40ce1eca50669 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -150,7 +150,10 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { } this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME; this.versionMap = new LiveVersionMap(); - final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( + engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), + engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() + ); this.deletionPolicy = new CombinedDeletionPolicy( new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode); store.incRef(); @@ -1844,7 +1847,7 @@ private void ensureCanFlush() { } } - public void onSettingsChanged() { + public void onSettingsChanged() throws IOException { mergeScheduler.refreshConfig(); // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed: maybePruneDeletedTombstones(); @@ -1854,6 +1857,11 @@ public void onSettingsChanged() { // the setting will be re-interpreted if it's set to true this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); } + final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); + final IndexSettings indexSettings = engineConfig.getIndexSettings(); + translogDeletionPolicy.setMaxRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis()); + translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes()); + translog.trimUnreferencedReaders(); } public MergeStats getMergeStats() { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 18f025c27c374..474604b21e43c 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1403,7 +1403,7 @@ boolean shouldRollTranslogGeneration() { return false; } - public void onSettingsChanged() { + public void onSettingsChanged() throws IOException { Engine engineOrNull = getEngineOrNull(); if (engineOrNull != null) { engineOrNull.onSettingsChanged(); @@ -1828,7 +1828,7 @@ public void onFailedEngine(String reason, @Nullable Exception failure) { } } - private Engine createNewEngine(EngineConfig config) { + private Engine createNewEngine(EngineConfig config) throws IOException { synchronized (mutex) { if (state == IndexShardState.CLOSED) { throw new AlreadyClosedException(shardId + " can't create engine - shard is closed"); diff --git a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java index 6f392c195fd19..392449192cc23 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java @@ -35,8 +35,10 @@ public abstract class BaseTranslogReader implements Comparable= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" + getMinFileGeneration() + "]"; diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 84f61a642cc8e..75885e8a598d6 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -22,12 +22,15 @@ import org.apache.lucene.util.Counter; import java.util.HashMap; +import java.util.List; import java.util.Map; public class TranslogDeletionPolicy { - /** Records how many views are held against each - * translog generation */ + /** + * Records how many views are held against each + * translog generation + */ private final Map translogRefCounts = new HashMap<>(); /** @@ -36,14 +39,31 @@ public class TranslogDeletionPolicy { */ private long minTranslogGenerationForRecovery = 1; + private long retentionSizeInBytes; + + private long maxRetentionAgeInMillis; + + public TranslogDeletionPolicy(long retentionSizeInBytes, long maxRetentionAgeInMillis) { + this.retentionSizeInBytes = retentionSizeInBytes; + this.maxRetentionAgeInMillis = maxRetentionAgeInMillis; + } + public synchronized void setMinTranslogGenerationForRecovery(long newGen) { if (newGen < minTranslogGenerationForRecovery) { throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" + - minTranslogGenerationForRecovery+ "]"); + minTranslogGenerationForRecovery + "]"); } minTranslogGenerationForRecovery = newGen; } + public synchronized void setRetentionSizeInBytes(long bytes) { + retentionSizeInBytes = bytes; + } + + public synchronized void setMaxRetentionAgeInMillis(long ageInMillis) { + maxRetentionAgeInMillis = ageInMillis; + } + /** * acquires the basis generation for a new view. Any translog generation above, and including, the returned generation * will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called. @@ -74,10 +94,51 @@ synchronized void releaseTranslogGenView(long translogGen) { /** * returns the minimum translog generation that is still required by the system. Any generation below * the returned value may be safely deleted + * + * @param readers current translog readers + * @param writer current translog writer */ - synchronized long minTranslogGenRequired() { - long viewRefs = translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE); - return Math.min(viewRefs, minTranslogGenerationForRecovery); + synchronized long minTranslogGenRequired(List readers, TranslogWriter writer) { + long minByView = getMinTranslogGenRequiredByViews(); + long minByAge = getMinTranslogGenByAge(readers, writer); + long minBySize = getMinTranslogGenBySize(readers, writer); + long minByAgeAndSize = Math.max(minByAge, minBySize); + return Math.min(minByAgeAndSize, Math.min(minByView, minTranslogGenerationForRecovery)); + } + + private long getMinTranslogGenBySize(List readers, TranslogWriter writer) { + if (retentionSizeInBytes >= 0) { + long totalSize = writer.sizeInBytes(); + long minGen = writer.getGeneration(); + for (int i = readers.size() - 1; i >= 0 && totalSize < retentionSizeInBytes; i--) { + final TranslogReader reader = readers.get(i); + totalSize += reader.sizeInBytes(); + minGen = reader.getGeneration(); + } + return minGen; + } else { + return Long.MIN_VALUE; + } + } + + private long getMinTranslogGenByAge(List readers, TranslogWriter writer) { + if (maxRetentionAgeInMillis >= 0) { + long now = currentTime(); + BaseTranslogReader firstNonExpired = readers.stream().map(r -> (BaseTranslogReader) r).filter( + r -> now - r.getCreationTimeInMillis() <= maxRetentionAgeInMillis + ).findFirst().orElse(writer); + return firstNonExpired.getGeneration(); + } else { + return Long.MIN_VALUE; + } + } + + protected long currentTime() { + return System.currentTimeMillis(); + } + + private long getMinTranslogGenRequiredByViews() { + return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE); } /** returns the translog generation that will be used as a basis of a future store/peer recovery */ diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java index 9057207501cdf..f870e4d2e9a03 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java @@ -57,9 +57,11 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { * @param channel the translog file channel to open a translog reader against * @param path the path to the translog * @param firstOperationOffset the offset to the first operation + * @param creationTimeInMillis creation time of the translog file */ - TranslogReader(final Checkpoint checkpoint, final FileChannel channel, final Path path, final long firstOperationOffset) { - super(checkpoint.generation, channel, path, firstOperationOffset); + TranslogReader(final Checkpoint checkpoint, final FileChannel channel, final Path path, final long firstOperationOffset, + long creationTimeInMillis) { + super(checkpoint.generation, channel, path, firstOperationOffset, creationTimeInMillis); this.length = checkpoint.offset; this.totalOperations = checkpoint.numOps; this.checkpoint = checkpoint; @@ -115,8 +117,9 @@ public static TranslogReader open( case TranslogWriter.VERSION_CHECKSUMS: throw new IllegalStateException("pre-2.0 translog found [" + path + "]"); case TranslogWriter.VERSION_CHECKPOINTS: + case TranslogWriter.VERSION_TIMESTAMPS: assert path.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX) : "new file ends with old suffix: " + path; - assert checkpoint.numOps >= 0 : "expected at least 0 operatin but got: " + checkpoint.numOps; + assert checkpoint.numOps >= 0 : "expected at least 0 operation but got: " + checkpoint.numOps; assert checkpoint.offset <= channel.size() : "checkpoint is inconsistent with channel length: " + channel.size() + " " + checkpoint; int len = headerStream.readInt(); if (len > channel.size()) { @@ -130,9 +133,17 @@ public static TranslogReader open( throw new TranslogCorruptedException("expected shard UUID " + uuidBytes + " but got: " + ref + " this translog file belongs to a different translog. path:" + path); } - final long firstOperationOffset = - ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES; - return new TranslogReader(checkpoint, channel, path, firstOperationOffset); + final long firstOperationOffset; + final long creationTimeInMillis; + if (version == TranslogWriter.VERSION_TIMESTAMPS) { + creationTimeInMillis = headerStream.readLong(); + firstOperationOffset = ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + + Integer.BYTES + Long.BYTES; + } else { + firstOperationOffset = ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES; + creationTimeInMillis = Long.MAX_VALUE; + } + return new TranslogReader(checkpoint, channel, path, firstOperationOffset, creationTimeInMillis); default: throw new TranslogCorruptedException("No known translog stream version: " + version + " path:" + path); diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java index 908cf511db03b..e5611986dcbe3 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java @@ -39,7 +39,7 @@ final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snap * Create a snapshot of translog file channel. */ TranslogSnapshot(final BaseTranslogReader reader, final long length) { - super(reader.generation, reader.channel, reader.path, reader.firstOperationOffset); + super(reader.generation, reader.channel, reader.path, reader.firstOperationOffset, reader.getCreationTimeInMillis()); this.length = length; this.totalOperations = reader.totalOperations(); this.checkpoint = reader.getCheckpoint(); diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index d637c9da79f65..2f43336a99e26 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -52,7 +52,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { public static final String TRANSLOG_CODEC = "translog"; public static final int VERSION_CHECKSUMS = 1; public static final int VERSION_CHECKPOINTS = 2; // since 2.0 we have checkpoints? - public static final int VERSION = VERSION_CHECKPOINTS; + public static final int VERSION_TIMESTAMPS = 3; // since 6.0 we track timestamp of file creation + public static final int VERSION = VERSION_TIMESTAMPS; private final ShardId shardId; private final ChannelFactory channelFactory; @@ -86,8 +87,12 @@ private TranslogWriter( final FileChannel channel, final Path path, final ByteSizeValue bufferSize, + final long creationTimeInMillis, final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier) throws IOException { - super(initialCheckpoint.generation, channel, path, channel.position()); + super(initialCheckpoint.generation, channel, path, channel.position(), creationTimeInMillis); + assert initialCheckpoint.offset == channel.position() : + "initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel poistion [" + + channel.position() + "]"; this.shardId = shardId; this.channelFactory = channelFactory; this.minTranslogGenerationSupplier = minTranslogGenerationSupplier; @@ -107,15 +112,15 @@ static int getHeaderLength(String translogUUID) { } static int getHeaderLength(int uuidLength) { - return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + Integer.BYTES; + return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + Integer.BYTES + Long.BYTES; } - static void writeHeader(OutputStreamDataOutput out, BytesRef ref) throws IOException { + static void writeHeader(OutputStreamDataOutput out, BytesRef ref, long creationTimeInMillis) throws IOException { CodecUtil.writeHeader(out, TRANSLOG_CODEC, VERSION); out.writeInt(ref.length); out.writeBytes(ref.bytes, ref.offset, ref.length); + out.writeLong(creationTimeInMillis); } - public static TranslogWriter create( ShardId shardId, String translogUUID, @@ -126,21 +131,36 @@ public static TranslogWriter create( final LongSupplier globalCheckpointSupplier, final long initialMinTranslogGen, final LongSupplier minTranslogGenerationSupplier) throws IOException { + return create(shardId, translogUUID, fileGeneration, file, channelFactory, bufferSize, globalCheckpointSupplier, + initialMinTranslogGen, minTranslogGenerationSupplier, System.currentTimeMillis()); + } + + static TranslogWriter create( + ShardId shardId, + String translogUUID, + long fileGeneration, + Path file, + ChannelFactory channelFactory, + ByteSizeValue bufferSize, + final LongSupplier globalCheckpointSupplier, + final long initialMinTranslogGen, + final LongSupplier minTranslogGenerationSupplier, + final long creationTimeInMillis) throws IOException { final BytesRef ref = new BytesRef(translogUUID); - final int headerLength = getHeaderLength(ref.length); + final int firstOperationOffset = getHeaderLength(ref.length); final FileChannel channel = channelFactory.open(file); try { // This OutputStreamDataOutput is intentionally not closed because // closing it will close the FileChannel final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel)); - writeHeader(out, ref); + writeHeader(out, ref, creationTimeInMillis); channel.force(true); final Checkpoint checkpoint = - Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong(), + Checkpoint.emptyTranslogCheckpoint(firstOperationOffset, fileGeneration, globalCheckpointSupplier.getAsLong(), initialMinTranslogGen); writeCheckpoint(channelFactory, file.getParent(), checkpoint); - return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier, - minTranslogGenerationSupplier); + return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, creationTimeInMillis, + globalCheckpointSupplier, minTranslogGenerationSupplier); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition @@ -292,7 +312,8 @@ public TranslogReader closeIntoReader() throws IOException { throw e; } if (closed.compareAndSet(false, true)) { - return new TranslogReader(getLastSyncedCheckpoint(), channel, path, getFirstOperationOffset()); + return + new TranslogReader(getLastSyncedCheckpoint(), channel, path, getFirstOperationOffset(), getCreationTimeInMillis()); } else { throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy); } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java index 408691692cacf..a1d49570f065b 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java @@ -183,7 +183,7 @@ public static int writeEmptyTranslog(Path filename, String translogUUID) throws final BytesRef translogRef = new BytesRef(translogUUID); try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); OutputStreamDataOutput out = new OutputStreamDataOutput(Channels.newOutputStream(fc))) { - TranslogWriter.writeHeader(out, translogRef); + TranslogWriter.writeHeader(out, translogRef, System.currentTimeMillis()); fc.force(true); } return TranslogWriter.getHeaderLength(translogRef.length); diff --git a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index d21273a7b0335..d1eef05c2efa1 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.List; +import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -39,7 +40,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testPassThrough() throws IOException { SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class); - CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, new TranslogDeletionPolicy(), + CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, createTranslogDeletionPolicy(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); List commitList = new ArrayList<>(); long count = randomIntBetween(1, 3); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 31a99063fb643..fab2f290ead9e 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -180,6 +180,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; +import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -336,7 +337,7 @@ protected Translog createTranslog() throws IOException { protected Translog createTranslog(Path translogPath) throws IOException { TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); - return new Translog(translogConfig, null, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + return new Translog(translogConfig, null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { @@ -2795,7 +2796,7 @@ public void testRecoverFromForeignTranslog() throws IOException { Translog translog = new Translog( new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), - null, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); translog.add(new Translog.Index("test", "SomeBogusId", 0, "{}".getBytes(Charset.forName("UTF-8")))); assertEquals(generation.translogFileGeneration, translog.currentFileGeneration()); translog.close(); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java new file mode 100644 index 0000000000000..f5449027e75a6 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -0,0 +1,194 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import com.carrotsearch.randomizedtesting.annotations.Seed; +import org.apache.lucene.store.ByteArrayDataOutput; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + + +public class TranslogDeletionPolicyTests extends ESTestCase { + + public static TranslogDeletionPolicy createTranslogDeletionPolicy() { + return new TranslogDeletionPolicy( + IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), + IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis() + ); + } + + public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) { + return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(), + indexSettings.getTranslogRetentionAge().getMillis()); + } + + public void testNoRetention() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0); + assertMinGenRequired(deletionPolicy, readersAndWriter, 1L); + final int committedReader = randomIntBetween(0, allGens.size() - 1); + final long committedGen = allGens.get(committedReader).generation; + deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); + assertMinGenRequired(deletionPolicy, readersAndWriter, committedGen); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + + @Seed("8F60F8B33644887D") + public void testBytesRetention() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, Long.MAX_VALUE); + assertMinGenRequired(deletionPolicy, readersAndWriter, 1L); + final int selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGeneration = allGens.get(selectedReader).generation; + long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); + deletionPolicy.setRetentionSizeInBytes(size); + deletionPolicy.setMinTranslogGenerationForRecovery(readersAndWriter.v2().generation); + assertMinGenRequired(deletionPolicy, readersAndWriter, selectedGeneration); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + + public void testAgeRetention() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE); + assertMinGenRequired(deletionPolicy, readersAndWriter, 1L); + final int selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGeneration = allGens.get(selectedReader).generation; + long maxAge = now - allGens.get(selectedReader).getCreationTimeInMillis(); + deletionPolicy.setMaxRetentionAgeInMillis(maxAge); + deletionPolicy.setMinTranslogGenerationForRecovery(readersAndWriter.v2().generation); + assertMinGenRequired(deletionPolicy, readersAndWriter, selectedGeneration); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + + /** + * Tests that age trumps size but recovery trumps both. + */ + public void testRetentionHierarchy() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE); + deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE); + int selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGenerationByAge = allGens.get(selectedReader).generation; + long maxAge = now - allGens.get(selectedReader).getCreationTimeInMillis(); + selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGenerationBySize = allGens.get(selectedReader).generation; + long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); + selectedReader = randomIntBetween(0, allGens.size() - 1); + final long committedGen = allGens.get(selectedReader).generation; + deletionPolicy.setMaxRetentionAgeInMillis(maxAge); + deletionPolicy.setRetentionSizeInBytes(size); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.max(selectedGenerationByAge, selectedGenerationBySize)); + // make a new policy as committed gen can't go backwards (for now0 + deletionPolicy = new MockDeletionPolicy(now, size, maxAge); + deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); + assertMinGenRequired(deletionPolicy, readersAndWriter, + Math.min(committedGen, Math.max(selectedGenerationByAge, selectedGenerationBySize))); + + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + + } + + private void assertMinGenRequired(TranslogDeletionPolicy deletionPolicy, Tuple, TranslogWriter> readersAndWriter, + long expectedGen) { + assertThat(deletionPolicy.minTranslogGenRequired(readersAndWriter.v1(), readersAndWriter.v2()), equalTo(expectedGen)); + } + + private Tuple, TranslogWriter> createReadersAndWriter(long now) throws IOException { + final Path tempDir = createTempDir(); + Files.createFile(tempDir.resolve(Translog.CHECKPOINT_FILE_NAME)); + TranslogWriter writer = null; + List readers = new ArrayList<>(); + final int numberOfReaders = randomIntBetween(0, 10); + for (long gen = 1; gen <= numberOfReaders + 1; gen++) { + if (writer != null) { + readers.add(writer.closeIntoReader()); + } + writer = TranslogWriter.create(new ShardId("index", "uuid", 0), "translog_uuid", gen, + tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, () -> 1L, 1L, () -> 1L, + now - (numberOfReaders - gen + 1) * 1000); + byte[] bytes = new byte[4]; + ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); + + for (int ops = randomIntBetween(0, 20); ops > 0; ops--) { + out.reset(bytes); + out.writeInt(ops); + writer.add(new BytesArray(bytes), ops); + } + } + return new Tuple<>(readers, writer); + } + + private static class MockDeletionPolicy extends TranslogDeletionPolicy { + + long now; + + public MockDeletionPolicy(long now, long retentionSizeInBytes, long maxRetentionAgeInMillis) { + super(retentionSizeInBytes, maxRetentionAgeInMillis); + this.now = now; + } + + @Override + protected long currentTime() { + return now; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index b911e9a5a48ac..db01bed2d2451 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -106,6 +106,7 @@ import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -140,7 +141,8 @@ protected void afterIfSuccessful() throws Exception { } protected Translog createTranslog(TranslogConfig config, String translogUUID) throws IOException { - return new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + return new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()), + () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } private void markCurrentGenAsCommitted(Translog translog) throws IOException { @@ -156,10 +158,6 @@ private void commit(Translog translog, long genToCommit) throws IOException { final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(genToCommit); translog.trimUnreferencedReaders(); - if (deletionPolicy.pendingViewsCount() == 0) { - assertThat(deletionPolicy.minTranslogGenRequired(), equalTo(genToCommit)); - } - assertThat(translog.getMinFileGeneration(), equalTo(deletionPolicy.minTranslogGenRequired())); } @Override @@ -184,7 +182,9 @@ public void tearDown() throws Exception { private Translog create(Path path) throws IOException { globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO); - return new Translog(getTranslogConfig(path), null, new TranslogDeletionPolicy(), () -> globalCheckpoint.get()); + final TranslogConfig translogConfig = getTranslogConfig(path); + final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); + return new Translog(translogConfig, null, deletionPolicy, () -> globalCheckpoint.get()); } private TranslogConfig getTranslogConfig(final Path path) { @@ -347,31 +347,31 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(1L)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(97L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(105L)); } translog.add(new Translog.Delete("test", "2", 1, newUid("2"))); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(2L)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(139L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(147L)); } translog.add(new Translog.Delete("test", "3", 2, newUid("3"))); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(3L)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(181L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(189L)); } translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16))); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4L)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(223L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(231L)); } - final long expectedSizeInBytes = 266L; + final long expectedSizeInBytes = 282L; translog.rollGeneration(); { final TranslogStats stats = stats(); @@ -1102,7 +1102,13 @@ public void testCloseIntoReader() throws IOException { } writer.sync(); final Checkpoint writerCheckpoint = writer.getCheckpoint(); - try (TranslogReader reader = writer.closeIntoReader()) { + TranslogReader reader = writer.closeIntoReader(); + try { + if (randomBoolean()) { + reader.close(); + reader = translog.openReader(reader.path(), writerCheckpoint); + } + assertThat(reader.getCreationTimeInMillis(), equalTo(writer.getCreationTimeInMillis())); for (int i = 0; i < numOps; i++) { final ByteBuffer buffer = ByteBuffer.allocate(4); reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); @@ -1112,6 +1118,8 @@ public void testCloseIntoReader() throws IOException { } final Checkpoint readerCheckpoint = reader.getCheckpoint(); assertThat(readerCheckpoint, equalTo(writerCheckpoint)); + } finally { + IOUtils.close(reader); } } } @@ -1293,7 +1301,7 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { try (Translog ignored = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { fail("corrupted"); } catch (IllegalStateException ex) { - assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, " + + assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3131, " + "numOps=55, generation=2, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-2, minTranslogGeneration=1} but got: Checkpoint{offset=0, numOps=0, " + "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2, minTranslogGeneration=0}", ex.getMessage()); } @@ -1374,7 +1382,7 @@ public void testOpenForeignTranslog() throws IOException { final String foreignTranslog = randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()); try { - new Translog(config, foreignTranslog, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + new Translog(config, foreignTranslog, createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); fail("translog doesn't belong to this UUID"); } catch (TranslogCorruptedException ex) { @@ -1600,7 +1608,7 @@ public void testTragicEventCanBeAnyException() throws IOException { Path tempDir = createTempDir(); final FailSwitch fail = new FailSwitch(); TranslogConfig config = getTranslogConfig(tempDir); - Translog translog = getFailableTranslog(fail, config, false, true, null, new TranslogDeletionPolicy()); + Translog translog = getFailableTranslog(fail, config, false, true, null, createTranslogDeletionPolicy()); LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly translog.add(new Translog.Index("test", "1", 0, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); fail.failAlways(); @@ -1695,7 +1703,7 @@ protected void afterAdd() throws IOException { iterator.remove(); } } - try (Translog tlog = new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + try (Translog tlog = new Translog(config, translogUUID, createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { Translog.Snapshot snapshot = tlog.newSnapshot(); if (writtenOperations.size() != snapshot.totalOperations()) { for (int i = 0; i < threadCount; i++) { @@ -1738,7 +1746,7 @@ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { // engine blows up, after committing the above generation translog.close(); TranslogConfig config = translog.getConfig(); - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(0, 0); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); assertThat(translog.getMinFileGeneration(), equalTo(1L)); @@ -1787,7 +1795,7 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { // expected... } } - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(0, 0); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { // we don't know when things broke exactly @@ -1801,7 +1809,7 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { } private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException { - return getFailableTranslog(fail, config, randomBoolean(), false, null, new TranslogDeletionPolicy()); + return getFailableTranslog(fail, config, randomBoolean(), false, null, createTranslogDeletionPolicy()); } private static class FailSwitch { @@ -1963,7 +1971,7 @@ public void testFailWhileCreateWriteWithRecoveredTLogs() throws IOException { translog.add(new Translog.Index("test", "boom", 0, "boom".getBytes(Charset.forName("UTF-8")))); translog.close(); try { - new Translog(config, translog.getTranslogUUID(), new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) { + new Translog(config, translog.getTranslogUUID(), createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) { @Override protected TranslogWriter createWriter(long fileGeneration) throws IOException { throw new MockDirectoryWrapper.FakeIOException(); @@ -2080,7 +2088,7 @@ public void testWithRandomException() throws IOException { } String generationUUID = null; try { - final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, new TranslogDeletionPolicy()); + final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, createTranslogDeletionPolicy()); try { LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { @@ -2128,7 +2136,7 @@ public void testWithRandomException() throws IOException { // now randomly open this failing tlog again just to make sure we can also recover from failing during recovery if (randomBoolean()) { try { - TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, deletionPolicy)); } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { @@ -2139,7 +2147,7 @@ public void testWithRandomException() throws IOException { } fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file - TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { Translog.Snapshot snapshot = translog.newSnapshot(); @@ -2204,7 +2212,7 @@ public void testPendingDelete() throws IOException { translog.rollGeneration(); TranslogConfig config = translog.getConfig(); final String translogUUID = translog.getTranslogUUID(); - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings()); translog.close(); translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); translog.add(new Translog.Index("test", "2", 1, new byte[]{2})); From da56a0d5386b886c65569d16408868eac3e829af Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 9 Jun 2017 19:17:24 +0200 Subject: [PATCH 02/10] feedback --- .../elasticsearch/index/engine/Engine.java | 2 +- .../index/engine/InternalEngine.java | 3 +- .../elasticsearch/index/shard/IndexShard.java | 4 +- .../translog/TranslogDeletionPolicy.java | 13 +++--- .../index/translog/TranslogWriter.java | 14 ++----- .../translog/TranslogDeletionPolicyTests.java | 42 +++++++++++++------ 6 files changed, 44 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index c5449162c1963..6e93d1feed5f8 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1402,7 +1402,7 @@ public IndexCommit getIndexCommit() { } } - public void onSettingsChanged() throws IOException { + public void onSettingsChanged() { } /** diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 40ce1eca50669..3ef9d4b6ecb7b 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1847,7 +1847,7 @@ private void ensureCanFlush() { } } - public void onSettingsChanged() throws IOException { + public void onSettingsChanged() { mergeScheduler.refreshConfig(); // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed: maybePruneDeletedTombstones(); @@ -1861,7 +1861,6 @@ public void onSettingsChanged() throws IOException { final IndexSettings indexSettings = engineConfig.getIndexSettings(); translogDeletionPolicy.setMaxRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis()); translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes()); - translog.trimUnreferencedReaders(); } public MergeStats getMergeStats() { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 474604b21e43c..18f025c27c374 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1403,7 +1403,7 @@ boolean shouldRollTranslogGeneration() { return false; } - public void onSettingsChanged() throws IOException { + public void onSettingsChanged() { Engine engineOrNull = getEngineOrNull(); if (engineOrNull != null) { engineOrNull.onSettingsChanged(); @@ -1828,7 +1828,7 @@ public void onFailedEngine(String reason, @Nullable Exception failure) { } } - private Engine createNewEngine(EngineConfig config) throws IOException { + private Engine createNewEngine(EngineConfig config) { synchronized (mutex) { if (state == IndexShardState.CLOSED) { throw new AlreadyClosedException(shardId + " can't create engine - shard is closed"); diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 75885e8a598d6..9dd72526ca05a 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -100,13 +100,17 @@ synchronized void releaseTranslogGenView(long translogGen) { */ synchronized long minTranslogGenRequired(List readers, TranslogWriter writer) { long minByView = getMinTranslogGenRequiredByViews(); - long minByAge = getMinTranslogGenByAge(readers, writer); - long minBySize = getMinTranslogGenBySize(readers, writer); + long minByAge = getMinTranslogGenByAge(readers, writer, maxRetentionAgeInMillis, currentTime()); + long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes); long minByAgeAndSize = Math.max(minByAge, minBySize); + if (minByAgeAndSize == Long.MIN_VALUE) { + // both size and age are disabled; + minByAgeAndSize = Long.MAX_VALUE; + } return Math.min(minByAgeAndSize, Math.min(minByView, minTranslogGenerationForRecovery)); } - private long getMinTranslogGenBySize(List readers, TranslogWriter writer) { + static long getMinTranslogGenBySize(List readers, TranslogWriter writer, long retentionSizeInBytes) { if (retentionSizeInBytes >= 0) { long totalSize = writer.sizeInBytes(); long minGen = writer.getGeneration(); @@ -121,9 +125,8 @@ private long getMinTranslogGenBySize(List readers, TranslogWrite } } - private long getMinTranslogGenByAge(List readers, TranslogWriter writer) { + static long getMinTranslogGenByAge(List readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now) { if (maxRetentionAgeInMillis >= 0) { - long now = currentTime(); BaseTranslogReader firstNonExpired = readers.stream().map(r -> (BaseTranslogReader) r).filter( r -> now - r.getCreationTimeInMillis() <= maxRetentionAgeInMillis ).findFirst().orElse(writer); diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 2f43336a99e26..bcfdbb9bafac4 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -135,17 +135,9 @@ public static TranslogWriter create( initialMinTranslogGen, minTranslogGenerationSupplier, System.currentTimeMillis()); } - static TranslogWriter create( - ShardId shardId, - String translogUUID, - long fileGeneration, - Path file, - ChannelFactory channelFactory, - ByteSizeValue bufferSize, - final LongSupplier globalCheckpointSupplier, - final long initialMinTranslogGen, - final LongSupplier minTranslogGenerationSupplier, - final long creationTimeInMillis) throws IOException { + static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, + ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, final long initialMinTranslogGen, + final LongSupplier minTranslogGenerationSupplier, final long creationTimeInMillis) throws IOException { final BytesRef ref = new BytesRef(translogUUID); final int firstOperationOffset = getHeaderLength(ref.length); final FileChannel channel = channelFactory.open(file); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 97c840c6eb0a1..463fdf2807207 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -76,14 +76,13 @@ public void testBytesRetention() throws IOException { List allGens = new ArrayList<>(readersAndWriter.v1()); allGens.add(readersAndWriter.v2()); try { - TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, Long.MAX_VALUE); - assertMinGenRequired(deletionPolicy, readersAndWriter, 1L); final int selectedReader = randomIntBetween(0, allGens.size() - 1); final long selectedGeneration = allGens.get(selectedReader).generation; long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); - deletionPolicy.setRetentionSizeInBytes(size); - deletionPolicy.setMinTranslogGenerationForRecovery(readersAndWriter.v2().generation); - assertMinGenRequired(deletionPolicy, readersAndWriter, selectedGeneration); + assertThat(TranslogDeletionPolicy.getMinTranslogGenBySize(readersAndWriter.v1(), readersAndWriter.v2(), size), + equalTo(selectedGeneration)); + assertThat(TranslogDeletionPolicy.getMinTranslogGenBySize(readersAndWriter.v1(), readersAndWriter.v2(), -1), + equalTo(Long.MIN_VALUE)); } finally { IOUtils.close(readersAndWriter.v1()); IOUtils.close(readersAndWriter.v2()); @@ -96,14 +95,13 @@ public void testAgeRetention() throws IOException { List allGens = new ArrayList<>(readersAndWriter.v1()); allGens.add(readersAndWriter.v2()); try { - TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE); - assertMinGenRequired(deletionPolicy, readersAndWriter, 1L); final int selectedReader = randomIntBetween(0, allGens.size() - 1); final long selectedGeneration = allGens.get(selectedReader).generation; long maxAge = now - allGens.get(selectedReader).getCreationTimeInMillis(); - deletionPolicy.setMaxRetentionAgeInMillis(maxAge); - deletionPolicy.setMinTranslogGenerationForRecovery(readersAndWriter.v2().generation); - assertMinGenRequired(deletionPolicy, readersAndWriter, selectedGeneration); + assertThat(TranslogDeletionPolicy.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), maxAge, now), + equalTo(selectedGeneration)); + assertThat(TranslogDeletionPolicy.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), -1, now), + equalTo(Long.MIN_VALUE)); } finally { IOUtils.close(readersAndWriter.v1()); IOUtils.close(readersAndWriter.v2()); @@ -128,16 +126,34 @@ public void testRetentionHierarchy() throws IOException { final long selectedGenerationBySize = allGens.get(selectedReader).generation; long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); selectedReader = randomIntBetween(0, allGens.size() - 1); - final long committedGen = allGens.get(selectedReader).generation; + long committedGen = allGens.get(selectedReader).generation; deletionPolicy.setMaxRetentionAgeInMillis(maxAge); deletionPolicy.setRetentionSizeInBytes(size); assertMinGenRequired(deletionPolicy, readersAndWriter, Math.max(selectedGenerationByAge, selectedGenerationBySize)); - // make a new policy as committed gen can't go backwards (for now0 + // make a new policy as committed gen can't go backwards (for now) deletionPolicy = new MockDeletionPolicy(now, size, maxAge); deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, Math.max(selectedGenerationByAge, selectedGenerationBySize))); - + long viewGen = deletionPolicy.acquireTranslogGenForView(); + selectedReader = randomIntBetween(selectedReader, allGens.size() - 1); + committedGen = allGens.get(selectedReader).generation; + deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); + assertMinGenRequired(deletionPolicy, readersAndWriter, + Math.min( + Math.min(committedGen, viewGen), + Math.max(selectedGenerationByAge, selectedGenerationBySize))); + // disable age + deletionPolicy.setMaxRetentionAgeInMillis(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationBySize)); + // disable size + deletionPolicy.setMaxRetentionAgeInMillis(maxAge); + deletionPolicy.setRetentionSizeInBytes(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationByAge)); + // disable both + deletionPolicy.setMaxRetentionAgeInMillis(-1); + deletionPolicy.setRetentionSizeInBytes(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); } finally { IOUtils.close(readersAndWriter.v1()); IOUtils.close(readersAndWriter.v2()); From 9d0cf47a362271103248a9c5ed5be8a3b8846f5e Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 9 Jun 2017 19:20:13 +0200 Subject: [PATCH 03/10] better indication of disabling size checks --- .../java/org/elasticsearch/index/translog/TranslogTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index ff21b6e2bc265..54859857d1160 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -1746,7 +1746,7 @@ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { // engine blows up, after committing the above generation translog.close(); TranslogConfig config = translog.getConfig(); - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(0, 0); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); assertThat(translog.getMinFileGeneration(), equalTo(1L)); @@ -1795,7 +1795,7 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { // expected... } } - final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(0, 0); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { // we don't know when things broke exactly From a52040797855792a93dcb4305510d3a0a2956586 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 9 Jun 2017 19:21:30 +0200 Subject: [PATCH 04/10] fix stats --- .../java/org/elasticsearch/index/translog/TranslogTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 54859857d1160..cb66373bc0d5b 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -371,7 +371,7 @@ public void testStats() throws IOException { assertThat(stats.getTranslogSizeInBytes(), equalTo(245L)); } - final long expectedSizeInBytes = 288L; + final long expectedSizeInBytes = 296L; translog.rollGeneration(); { final TranslogStats stats = stats(); From 659fb6cc0c1bd0527dfd0d6e0e0467fb2ea8b58a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 9 Jun 2017 20:14:21 +0200 Subject: [PATCH 05/10] clearer disable value --- core/src/main/java/org/elasticsearch/index/IndexSettings.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 9bcd5cf3aa53c..681b77ae76b3a 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -113,11 +113,11 @@ public final class IndexSettings { Property.IndexScope); public static final Setting INDEX_TRANSLOG_RETENTION_SIZE_SETTING = - Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(0, ByteSizeUnit.MB), Property.Dynamic, + Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(-1, ByteSizeUnit.MB), Property.Dynamic, Property.IndexScope); public static final Setting INDEX_TRANSLOG_RETENTION_AGE_SETTING = - Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueHours(0), TimeValue.timeValueMillis(-1), Property.Dynamic, + Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueHours(-1), TimeValue.timeValueMillis(-1), Property.Dynamic, Property.IndexScope); /** From 3b622546e18c3a2415ad40ebc8c0d0fad01793bc Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 15 Jun 2017 11:05:59 +0200 Subject: [PATCH 06/10] moved to relying on the FS mtime for translog age --- .../index/translog/BaseTranslogReader.java | 9 +++-- .../translog/TranslogDeletionPolicy.java | 16 +++++---- .../index/translog/TranslogReader.java | 19 +++------- .../index/translog/TranslogSnapshot.java | 2 +- .../index/translog/TranslogWriter.java | 36 ++++++------------- .../translog/TruncateTranslogCommand.java | 2 +- .../translog/TranslogDeletionPolicyTests.java | 20 +++++++---- .../index/translog/TranslogTests.java | 13 ++++--- 8 files changed, 49 insertions(+), 68 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java index 392449192cc23..7f8b7f3fb2c76 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.nio.file.Files; import java.nio.file.Path; /** @@ -35,10 +36,8 @@ public abstract class BaseTranslogReader implements Comparable readers, TranslogWriter writer) { + synchronized long minTranslogGenRequired(List readers, TranslogWriter writer) throws IOException { long minByView = getMinTranslogGenRequiredByViews(); long minByAge = getMinTranslogGenByAge(readers, writer, maxRetentionAgeInMillis, currentTime()); long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes); @@ -125,12 +126,15 @@ static long getMinTranslogGenBySize(List readers, TranslogWriter } } - static long getMinTranslogGenByAge(List readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now) { + static long getMinTranslogGenByAge(List readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now) + throws IOException { if (maxRetentionAgeInMillis >= 0) { - BaseTranslogReader firstNonExpired = readers.stream().map(r -> (BaseTranslogReader) r).filter( - r -> now - r.getCreationTimeInMillis() <= maxRetentionAgeInMillis - ).findFirst().orElse(writer); - return firstNonExpired.getGeneration(); + for (TranslogReader reader: readers) { + if (now - reader.getLastModifiedTime() <= maxRetentionAgeInMillis) { + return reader.getGeneration(); + } + } + return writer.getGeneration(); } else { return Long.MIN_VALUE; } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java index f870e4d2e9a03..46439afead10a 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java @@ -57,11 +57,9 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { * @param channel the translog file channel to open a translog reader against * @param path the path to the translog * @param firstOperationOffset the offset to the first operation - * @param creationTimeInMillis creation time of the translog file */ - TranslogReader(final Checkpoint checkpoint, final FileChannel channel, final Path path, final long firstOperationOffset, - long creationTimeInMillis) { - super(checkpoint.generation, channel, path, firstOperationOffset, creationTimeInMillis); + TranslogReader(final Checkpoint checkpoint, final FileChannel channel, final Path path, final long firstOperationOffset) { + super(checkpoint.generation, channel, path, firstOperationOffset); this.length = checkpoint.offset; this.totalOperations = checkpoint.numOps; this.checkpoint = checkpoint; @@ -117,7 +115,6 @@ public static TranslogReader open( case TranslogWriter.VERSION_CHECKSUMS: throw new IllegalStateException("pre-2.0 translog found [" + path + "]"); case TranslogWriter.VERSION_CHECKPOINTS: - case TranslogWriter.VERSION_TIMESTAMPS: assert path.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX) : "new file ends with old suffix: " + path; assert checkpoint.numOps >= 0 : "expected at least 0 operation but got: " + checkpoint.numOps; assert checkpoint.offset <= channel.size() : "checkpoint is inconsistent with channel length: " + channel.size() + " " + checkpoint; @@ -134,16 +131,8 @@ public static TranslogReader open( " this translog file belongs to a different translog. path:" + path); } final long firstOperationOffset; - final long creationTimeInMillis; - if (version == TranslogWriter.VERSION_TIMESTAMPS) { - creationTimeInMillis = headerStream.readLong(); - firstOperationOffset = ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + - Integer.BYTES + Long.BYTES; - } else { - firstOperationOffset = ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES; - creationTimeInMillis = Long.MAX_VALUE; - } - return new TranslogReader(checkpoint, channel, path, firstOperationOffset, creationTimeInMillis); + firstOperationOffset = ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES; + return new TranslogReader(checkpoint, channel, path, firstOperationOffset); default: throw new TranslogCorruptedException("No known translog stream version: " + version + " path:" + path); diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java index e5611986dcbe3..908cf511db03b 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java @@ -39,7 +39,7 @@ final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snap * Create a snapshot of translog file channel. */ TranslogSnapshot(final BaseTranslogReader reader, final long length) { - super(reader.generation, reader.channel, reader.path, reader.firstOperationOffset, reader.getCreationTimeInMillis()); + super(reader.generation, reader.channel, reader.path, reader.firstOperationOffset); this.length = length; this.totalOperations = reader.totalOperations(); this.checkpoint = reader.getCheckpoint(); diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index bcfdbb9bafac4..f62a42fbe09c9 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -52,8 +52,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { public static final String TRANSLOG_CODEC = "translog"; public static final int VERSION_CHECKSUMS = 1; public static final int VERSION_CHECKPOINTS = 2; // since 2.0 we have checkpoints? - public static final int VERSION_TIMESTAMPS = 3; // since 6.0 we track timestamp of file creation - public static final int VERSION = VERSION_TIMESTAMPS; + public static final int VERSION = VERSION_CHECKPOINTS; private final ShardId shardId; private final ChannelFactory channelFactory; @@ -87,9 +86,8 @@ private TranslogWriter( final FileChannel channel, final Path path, final ByteSizeValue bufferSize, - final long creationTimeInMillis, final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier) throws IOException { - super(initialCheckpoint.generation, channel, path, channel.position(), creationTimeInMillis); + super(initialCheckpoint.generation, channel, path, channel.position()); assert initialCheckpoint.offset == channel.position() : "initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel poistion [" + channel.position() + "]"; @@ -112,32 +110,18 @@ static int getHeaderLength(String translogUUID) { } static int getHeaderLength(int uuidLength) { - return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + Integer.BYTES + Long.BYTES; + return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + Integer.BYTES; } - static void writeHeader(OutputStreamDataOutput out, BytesRef ref, long creationTimeInMillis) throws IOException { + static void writeHeader(OutputStreamDataOutput out, BytesRef ref) throws IOException { CodecUtil.writeHeader(out, TRANSLOG_CODEC, VERSION); out.writeInt(ref.length); out.writeBytes(ref.bytes, ref.offset, ref.length); - out.writeLong(creationTimeInMillis); - } - public static TranslogWriter create( - ShardId shardId, - String translogUUID, - long fileGeneration, - Path file, - ChannelFactory channelFactory, - ByteSizeValue bufferSize, - final LongSupplier globalCheckpointSupplier, - final long initialMinTranslogGen, - final LongSupplier minTranslogGenerationSupplier) throws IOException { - return create(shardId, translogUUID, fileGeneration, file, channelFactory, bufferSize, globalCheckpointSupplier, - initialMinTranslogGen, minTranslogGenerationSupplier, System.currentTimeMillis()); } - static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, - ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, final long initialMinTranslogGen, - final LongSupplier minTranslogGenerationSupplier, final long creationTimeInMillis) throws IOException { + public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, + ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, final long initialMinTranslogGen, + final LongSupplier minTranslogGenerationSupplier) throws IOException { final BytesRef ref = new BytesRef(translogUUID); final int firstOperationOffset = getHeaderLength(ref.length); final FileChannel channel = channelFactory.open(file); @@ -145,13 +129,13 @@ static TranslogWriter create(ShardId shardId, String translogUUID, long fileGene // This OutputStreamDataOutput is intentionally not closed because // closing it will close the FileChannel final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel)); - writeHeader(out, ref, creationTimeInMillis); + writeHeader(out, ref); channel.force(true); final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(firstOperationOffset, fileGeneration, globalCheckpointSupplier.getAsLong(), initialMinTranslogGen); writeCheckpoint(channelFactory, file.getParent(), checkpoint); - return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, creationTimeInMillis, + return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier, minTranslogGenerationSupplier); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that @@ -305,7 +289,7 @@ public TranslogReader closeIntoReader() throws IOException { } if (closed.compareAndSet(false, true)) { return - new TranslogReader(getLastSyncedCheckpoint(), channel, path, getFirstOperationOffset(), getCreationTimeInMillis()); + new TranslogReader(getLastSyncedCheckpoint(), channel, path, getFirstOperationOffset()); } else { throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy); } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java index a1d49570f065b..408691692cacf 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java @@ -183,7 +183,7 @@ public static int writeEmptyTranslog(Path filename, String translogUUID) throws final BytesRef translogRef = new BytesRef(translogUUID); try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); OutputStreamDataOutput out = new OutputStreamDataOutput(Channels.newOutputStream(fc))) { - TranslogWriter.writeHeader(out, translogRef, System.currentTimeMillis()); + TranslogWriter.writeHeader(out, translogRef); fc.force(true); } return TranslogWriter.getHeaderLength(translogRef.length); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 463fdf2807207..168a1acca70ba 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; +import org.mockito.Mockito; import java.io.IOException; import java.nio.channels.FileChannel; @@ -97,7 +98,7 @@ public void testAgeRetention() throws IOException { try { final int selectedReader = randomIntBetween(0, allGens.size() - 1); final long selectedGeneration = allGens.get(selectedReader).generation; - long maxAge = now - allGens.get(selectedReader).getCreationTimeInMillis(); + long maxAge = now - allGens.get(selectedReader).getLastModifiedTime(); assertThat(TranslogDeletionPolicy.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), maxAge, now), equalTo(selectedGeneration)); assertThat(TranslogDeletionPolicy.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), -1, now), @@ -121,7 +122,7 @@ public void testRetentionHierarchy() throws IOException { deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE); int selectedReader = randomIntBetween(0, allGens.size() - 1); final long selectedGenerationByAge = allGens.get(selectedReader).generation; - long maxAge = now - allGens.get(selectedReader).getCreationTimeInMillis(); + long maxAge = now - allGens.get(selectedReader).getLastModifiedTime(); selectedReader = randomIntBetween(0, allGens.size() - 1); final long selectedGenerationBySize = allGens.get(selectedReader).generation; long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); @@ -162,11 +163,11 @@ public void testRetentionHierarchy() throws IOException { } private void assertMinGenRequired(TranslogDeletionPolicy deletionPolicy, Tuple, TranslogWriter> readersAndWriter, - long expectedGen) { + long expectedGen) throws IOException { assertThat(deletionPolicy.minTranslogGenRequired(readersAndWriter.v1(), readersAndWriter.v2()), equalTo(expectedGen)); } - private Tuple, TranslogWriter> createReadersAndWriter(long now) throws IOException { + private Tuple, TranslogWriter> createReadersAndWriter(final long now) throws IOException { final Path tempDir = createTempDir(); Files.createFile(tempDir.resolve(Translog.CHECKPOINT_FILE_NAME)); TranslogWriter writer = null; @@ -174,11 +175,16 @@ private Tuple, TranslogWriter> createReadersAndWriter(long final int numberOfReaders = randomIntBetween(0, 10); for (long gen = 1; gen <= numberOfReaders + 1; gen++) { if (writer != null) { - readers.add(writer.closeIntoReader()); + final TranslogReader reader = Mockito.spy(writer.closeIntoReader()); + Mockito.doReturn(writer.getLastModifiedTime()).when(reader).getLastModifiedTime(); + readers.add(reader); } writer = TranslogWriter.create(new ShardId("index", "uuid", 0), "translog_uuid", gen, - tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, () -> 1L, 1L, () -> 1L, - now - (numberOfReaders - gen + 1) * 1000); + tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, () -> 1L, 1L, () -> 1L + ); + writer = Mockito.spy(writer); + Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); + byte[] bytes = new byte[4]; ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 5735e61bb79c7..2b4ecd0fac409 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -348,31 +348,31 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(1L)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(105L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(97L)); } translog.add(new Translog.Delete("test", "2", 1, newUid("2"))); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(2L)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(154L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(146L)); } translog.add(new Translog.Delete("test", "3", 2, newUid("3"))); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(3L)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(203L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(195L)); } translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16))); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4L)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(245L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(237L)); } - final long expectedSizeInBytes = 296L; + final long expectedSizeInBytes = 280L; translog.rollGeneration(); { final TranslogStats stats = stats(); @@ -1109,7 +1109,6 @@ public void testCloseIntoReader() throws IOException { reader.close(); reader = translog.openReader(reader.path(), writerCheckpoint); } - assertThat(reader.getCreationTimeInMillis(), equalTo(writer.getCreationTimeInMillis())); for (int i = 0; i < numOps; i++) { final ByteBuffer buffer = ByteBuffer.allocate(4); reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); @@ -1302,7 +1301,7 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { try (Translog ignored = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { fail("corrupted"); } catch (IllegalStateException ex) { - assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3131, " + + assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, " + "numOps=55, generation=2, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-2, minTranslogGeneration=1} but got: Checkpoint{offset=0, numOps=0, " + "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2, minTranslogGeneration=0}", ex.getMessage()); } From 1a638cfcebe46141c538d6952f251941e300c3e9 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 15 Jun 2017 11:21:22 +0200 Subject: [PATCH 07/10] feedback --- .../elasticsearch/index/IndexSettings.java | 19 ++++++++++++++----- .../index/engine/InternalEngine.java | 2 +- .../translog/TranslogDeletionPolicy.java | 18 ++++++++++-------- .../translog/TranslogDeletionPolicyTests.java | 8 ++++---- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 681b77ae76b3a..43ddb09e61f0a 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -112,12 +112,22 @@ public final class IndexSettings { Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic, Property.IndexScope); - public static final Setting INDEX_TRANSLOG_RETENTION_SIZE_SETTING = - Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(-1, ByteSizeUnit.MB), Property.Dynamic, + /** + * Controls how long translog files that are no longer needed for persistence reasons + * will be kept around before being deleted. A longer retention policy is useful to increase + * the chance of ops based recoveries. + **/ + public static final Setting INDEX_TRANSLOG_RETENTION_AGE_SETTING = + Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueMillis(-1), TimeValue.timeValueMillis(-1), Property.Dynamic, Property.IndexScope); - public static final Setting INDEX_TRANSLOG_RETENTION_AGE_SETTING = - Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueHours(-1), TimeValue.timeValueMillis(-1), Property.Dynamic, + /** + * Controls how many translog files that are no longer needed for persistence reasons + * will be kept around before being deleted. Keeping more files is useful to increase + * the chance of ops based recoveries. + **/ + public static final Setting INDEX_TRANSLOG_RETENTION_SIZE_SETTING = + Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(-1, ByteSizeUnit.MB), Property.Dynamic, Property.IndexScope); /** @@ -278,7 +288,6 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); translogRetentionAge = scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING); translogRetentionSize = scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING); - flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3ef9d4b6ecb7b..6d10a0290995a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1859,7 +1859,7 @@ public void onSettingsChanged() { } final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); final IndexSettings indexSettings = engineConfig.getIndexSettings(); - translogDeletionPolicy.setMaxRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis()); + translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis()); translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes()); } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 6f11594549563..732b38fcedfe4 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -42,11 +42,11 @@ public class TranslogDeletionPolicy { private long retentionSizeInBytes; - private long maxRetentionAgeInMillis; + private long retentionAgeInMillis; - public TranslogDeletionPolicy(long retentionSizeInBytes, long maxRetentionAgeInMillis) { + public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis) { this.retentionSizeInBytes = retentionSizeInBytes; - this.maxRetentionAgeInMillis = maxRetentionAgeInMillis; + this.retentionAgeInMillis = retentionAgeInMillis; } public synchronized void setMinTranslogGenerationForRecovery(long newGen) { @@ -61,8 +61,8 @@ public synchronized void setRetentionSizeInBytes(long bytes) { retentionSizeInBytes = bytes; } - public synchronized void setMaxRetentionAgeInMillis(long ageInMillis) { - maxRetentionAgeInMillis = ageInMillis; + public synchronized void setRetentionAgeInMillis(long ageInMillis) { + retentionAgeInMillis = ageInMillis; } /** @@ -101,12 +101,14 @@ synchronized void releaseTranslogGenView(long translogGen) { */ synchronized long minTranslogGenRequired(List readers, TranslogWriter writer) throws IOException { long minByView = getMinTranslogGenRequiredByViews(); - long minByAge = getMinTranslogGenByAge(readers, writer, maxRetentionAgeInMillis, currentTime()); + long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime()); long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes); - long minByAgeAndSize = Math.max(minByAge, minBySize); - if (minByAgeAndSize == Long.MIN_VALUE) { + final long minByAgeAndSize; + if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) { // both size and age are disabled; minByAgeAndSize = Long.MAX_VALUE; + } else { + minByAgeAndSize = Math.max(minByAge, minBySize); } return Math.min(minByAgeAndSize, Math.min(minByView, minTranslogGenerationForRecovery)); } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 168a1acca70ba..3ed595543f8ea 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -128,7 +128,7 @@ public void testRetentionHierarchy() throws IOException { long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); selectedReader = randomIntBetween(0, allGens.size() - 1); long committedGen = allGens.get(selectedReader).generation; - deletionPolicy.setMaxRetentionAgeInMillis(maxAge); + deletionPolicy.setRetentionAgeInMillis(maxAge); deletionPolicy.setRetentionSizeInBytes(size); assertMinGenRequired(deletionPolicy, readersAndWriter, Math.max(selectedGenerationByAge, selectedGenerationBySize)); // make a new policy as committed gen can't go backwards (for now) @@ -145,14 +145,14 @@ public void testRetentionHierarchy() throws IOException { Math.min(committedGen, viewGen), Math.max(selectedGenerationByAge, selectedGenerationBySize))); // disable age - deletionPolicy.setMaxRetentionAgeInMillis(-1); + deletionPolicy.setRetentionAgeInMillis(-1); assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationBySize)); // disable size - deletionPolicy.setMaxRetentionAgeInMillis(maxAge); + deletionPolicy.setRetentionAgeInMillis(maxAge); deletionPolicy.setRetentionSizeInBytes(-1); assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationByAge)); // disable both - deletionPolicy.setMaxRetentionAgeInMillis(-1); + deletionPolicy.setRetentionAgeInMillis(-1); deletionPolicy.setRetentionSizeInBytes(-1); assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); } finally { From f1dd56ff1174e8d6c69a9f56e591e8048ad9aab3 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 15 Jun 2017 11:25:27 +0200 Subject: [PATCH 08/10] line formatting --- .../index/translog/TranslogWriter.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index f62a42fbe09c9..2c0bd0c7d89c8 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -120,8 +120,9 @@ static void writeHeader(OutputStreamDataOutput out, BytesRef ref) throws IOExcep } public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, - ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, final long initialMinTranslogGen, - final LongSupplier minTranslogGenerationSupplier) throws IOException { + ByteSizeValue bufferSize, final LongSupplier globalCheckpointSupplier, + final long initialMinTranslogGen, final LongSupplier minTranslogGenerationSupplier) + throws IOException { final BytesRef ref = new BytesRef(translogUUID); final int firstOperationOffset = getHeaderLength(ref.length); final FileChannel channel = channelFactory.open(file); @@ -131,9 +132,8 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel)); writeHeader(out, ref); channel.force(true); - final Checkpoint checkpoint = - Checkpoint.emptyTranslogCheckpoint(firstOperationOffset, fileGeneration, globalCheckpointSupplier.getAsLong(), - initialMinTranslogGen); + final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(firstOperationOffset, fileGeneration, + globalCheckpointSupplier.getAsLong(), initialMinTranslogGen); writeCheckpoint(channelFactory, file.getParent(), checkpoint); return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier, minTranslogGenerationSupplier); @@ -288,8 +288,7 @@ public TranslogReader closeIntoReader() throws IOException { throw e; } if (closed.compareAndSet(false, true)) { - return - new TranslogReader(getLastSyncedCheckpoint(), channel, path, getFirstOperationOffset()); + return new TranslogReader(getLastSyncedCheckpoint(), channel, path, getFirstOperationOffset()); } else { throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy); } From a3c4f2f97b1830daf619fdf3ec452f8a3a728b6d Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 15 Jun 2017 11:46:46 +0200 Subject: [PATCH 09/10] add some cleaning testing code that doesn't mock --- .../index/translog/TranslogTests.java | 31 +++++++++++++++++-- .../org/elasticsearch/test/ESTestCase.java | 1 - 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 2b4ecd0fac409..ae11858a3305a 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2299,7 +2299,14 @@ public void testTranslogOpSerialization() throws Exception { assertEquals("my_id", serializedDelete.id()); } - public void testRollGeneration() throws IOException { + public void testRollGeneration() throws Exception { + // make sure we keep some files around + final boolean longRetention = randomBoolean(); + if (longRetention) { + translog.getDeletionPolicy().setRetentionAgeInMillis(3600 * 1000); + } else { + translog.getDeletionPolicy().setRetentionAgeInMillis(-1); + } final long generation = translog.currentFileGeneration(); final int rolls = randomIntBetween(1, 16); int totalOperations = 0; @@ -2322,8 +2329,26 @@ public void testRollGeneration() throws IOException { commit(translog, generation + rolls); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls )); assertThat(translog.totalOperations(), equalTo(0)); - for (int i = 0; i < rolls; i++) { - assertFileDeleted(translog, generation + i); + if (longRetention) { + for (int i = 0; i <= rolls; i++) { + assertFileIsPresent(translog, generation + i); + } + translog.getDeletionPolicy().setRetentionAgeInMillis(randomBoolean() ? 100 : -1); + assertBusy(() -> { + try { + translog.trimUnreferencedReaders(); + } catch (IOException e) { + throw new RuntimeException(e); + } + for (int i = 0; i < rolls; i++) { + assertFileDeleted(translog, generation + i); + } + }); + } else { + // immediate cleanup + for (int i = 0; i < rolls; i++) { + assertFileDeleted(translog, generation + i); + } } assertFileIsPresent(translog, generation + rolls); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 0d3e8131ab25c..99bab0c43bf33 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -29,7 +29,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; import com.carrotsearch.randomizedtesting.rules.TestRuleAdapter; - import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; From 5d97182affe53d712a353628c86dbb320844371d Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 15 Jun 2017 13:26:14 +0200 Subject: [PATCH 10/10] remove unneeded runtime exception --- .../org/elasticsearch/index/translog/TranslogTests.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index ae11858a3305a..21bc1a14bc55f 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2335,11 +2335,7 @@ public void testRollGeneration() throws Exception { } translog.getDeletionPolicy().setRetentionAgeInMillis(randomBoolean() ? 100 : -1); assertBusy(() -> { - try { - translog.trimUnreferencedReaders(); - } catch (IOException e) { - throw new RuntimeException(e); - } + translog.trimUnreferencedReaders(); for (int i = 0; i < rolls; i++) { assertFileDeleted(translog, generation + i); }