Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore translog retention policy if soft-deletes enabled #45473

Merged
merged 12 commits into from
Aug 21, 2019
65 changes: 41 additions & 24 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,24 +195,6 @@ public final class IndexSettings {
new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES),
Property.Dynamic, Property.IndexScope);

/**
* Controls how long translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. A longer retention policy is useful to increase
* the chance of ops based recoveries.
**/
public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueHours(12), TimeValue.timeValueMillis(-1),
Property.Dynamic, Property.IndexScope);

/**
* Controls how many translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. Keeping more files is useful to increase
* the chance of ops based recoveries.
**/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);

/**
* The maximum size of a translog generation. This is independent of the maximum size of
* translog operations that have not been flushed.
Expand Down Expand Up @@ -258,6 +240,23 @@ public final class IndexSettings {
Setting.longSetting("index.soft_deletes.retention.operations", 0, 0,
Property.IndexScope, Property.Dynamic);

/**
* Controls how long translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. This setting will be ignored if soft-deletes is enabled.
**/
public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
Setting.timeSetting("index.translog.retention.age",
settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12), TimeValue.MINUS_ONE,
Property.Dynamic, Property.IndexScope);

/**
* Controls how many translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. This setting will be ignored if soft-deletes is enabled.
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
**/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.retention.size", settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? "-1" : "512MB",
Property.Dynamic, Property.IndexScope);

/**
* Controls the maximum length of time since a retention lease is created or renewed before it is considered expired.
*/
Expand Down Expand Up @@ -466,8 +465,6 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
translogRetentionAge = scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING);
translogRetentionSize = scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
Expand All @@ -493,6 +490,8 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
this.indexSortConfig = new IndexSortConfig(this);
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE);
setTranslogRetentionAge(scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING));
setTranslogRetentionSize(scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING));

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING,
Expand Down Expand Up @@ -553,11 +552,23 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
}

private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
this.translogRetentionSize = byteSizeValue;
if (softDeleteEnabled && byteSizeValue.getBytes() >= 0) {
logger.info("setting [{}] is ignored for translog is no longer used in peer recoveries with soft-deletes enabled",
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey());
this.translogRetentionSize = new ByteSizeValue(-1);
} else {
this.translogRetentionSize = byteSizeValue;
}
}

private void setTranslogRetentionAge(TimeValue age) {
this.translogRetentionAge = age;
if (softDeleteEnabled && age.nanos() >= 0) {
logger.info("setting [{}] is ignored for translog is no longer used in peer recoveries with soft-deletes enabled",
INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey());
this.translogRetentionAge = TimeValue.MINUS_ONE;
} else {
this.translogRetentionAge = age;
}
}

private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) {
Expand Down Expand Up @@ -734,13 +745,19 @@ public TimeValue getRefreshInterval() {
/**
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
*/
public ByteSizeValue getTranslogRetentionSize() { return translogRetentionSize; }
public ByteSizeValue getTranslogRetentionSize() {
assert softDeleteEnabled == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize;
return translogRetentionSize;
}

/**
* Returns the transaction log retention age which controls the maximum age (time from creation) that translog files will be kept
* around
*/
public TimeValue getTranslogRetentionAge() { return translogRetentionAge; }
public TimeValue getTranslogRetentionAge() {
assert softDeleteEnabled == false || translogRetentionAge.millis() == -1L : translogRetentionSize;
return translogRetentionAge;
}

/**
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,19 @@ private boolean isTranslogClean(ShardPath shardPath, String translogUUID) throws
final TranslogConfig translogConfig = new TranslogConfig(shardPath.getShardId(), translogPath,
indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardPath.getShardId().id());
final TranslogDeletionPolicy translogDeletionPolicy =
new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(),
indexSettings.getTranslogRetentionAge().getMillis());
// We open translog to check for corruption, do not clean anything.
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
final TranslogDeletionPolicy retainAllTranslogPolicy = new TranslogDeletionPolicy(Long.MAX_VALUE, Long.MAX_VALUE) {
@Override
long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) {
long minGen = writer.generation;
for (TranslogReader reader : readers) {
minGen = Math.min(reader.generation, minGen);
}
return minGen;
}
};
try (Translog translog = new Translog(translogConfig, translogUUID,
translogDeletionPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm, seqNo -> {});
retainAllTranslogPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm, seqNo -> {});
Translog.Snapshot snapshot = translog.newSnapshot()) {
//noinspection StatementWithEmptyBody we are just checking that we can iterate through the whole snapshot
while (snapshot.next() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,20 +390,24 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception {
IndexService indexService = createIndex(indexName, Settings.builder()
.put(TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100ms")
.build());

Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0));
final Path translogPath = translog.getConfig().getTranslogPath();
final String translogUuid = translog.getTranslogUUID();

int translogOps = 0;
final int numDocs = scaledRandomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex().setIndex(indexName).setId(String.valueOf(i)).setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
translogOps++;
if (randomBoolean()) {
client().admin().indices().prepareFlush(indexName).get();
if (indexService.getIndexSettings().isSoftDeleteEnabled()) {
translogOps = 0;
}
}
}
assertThat(translog.totalOperations(), equalTo(numDocs));
assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(numDocs));
assertThat(translog.totalOperations(), equalTo(translogOps));
assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(translogOps));
assertAcked(client().admin().indices().prepareClose("test"));

indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -568,4 +569,64 @@ public void testSoftDeletesDefaultSetting() {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), createdVersion).build();
assertTrue(IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings));
}

public void testIgnoreTranslogRetentionSettingsIfSoftDeletesEnabled() {
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomIndexCompatibleVersion(random()));
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
if (randomBoolean()) {
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomPositiveTimeValue());
}
if (randomBoolean()) {
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), between(1, 1024) + "b");
}
IndexMetaData metaData = newIndexMeta("index", settings.build());
IndexSettings indexSettings = new IndexSettings(metaData, Settings.EMPTY);
assertThat(indexSettings.getTranslogRetentionAge().millis(), equalTo(-1L));
assertThat(indexSettings.getTranslogRetentionSize().getBytes(), equalTo(-1L));

Settings.Builder newSettings = Settings.builder().put(settings.build());
if (randomBoolean()) {
newSettings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomPositiveTimeValue());
}
if (randomBoolean()) {
newSettings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), between(1, 1024) + "b");
}
indexSettings.updateIndexMetaData(newIndexMeta("index", newSettings.build()));
assertThat(indexSettings.getTranslogRetentionAge().millis(), equalTo(-1L));
assertThat(indexSettings.getTranslogRetentionSize().getBytes(), equalTo(-1L));
}

public void testUpdateTranslogRetentionSettingsWithSoftDeletesDisabled() {
Settings.Builder settings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT);

TimeValue ageSetting = TimeValue.timeValueHours(12);
if (randomBoolean()) {
ageSetting = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueMillis(randomIntBetween(0, 10000));
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), ageSetting);
}
ByteSizeValue sizeSetting = new ByteSizeValue(512, ByteSizeUnit.MB);
if (randomBoolean()) {
sizeSetting = randomBoolean() ? new ByteSizeValue(-1) : new ByteSizeValue(randomIntBetween(0, 1024));
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), sizeSetting);
}
IndexMetaData metaData = newIndexMeta("index", settings.build());
IndexSettings indexSettings = new IndexSettings(metaData, Settings.EMPTY);
assertThat(indexSettings.getTranslogRetentionAge(), equalTo(ageSetting));
assertThat(indexSettings.getTranslogRetentionSize(), equalTo(sizeSetting));

Settings.Builder newSettings = Settings.builder().put(settings.build());
if (randomBoolean()) {
ageSetting = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueMillis(randomIntBetween(0, 10000));
newSettings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), ageSetting);
}
if (randomBoolean()) {
sizeSetting = randomBoolean() ? new ByteSizeValue(-1) : new ByteSizeValue(randomIntBetween(0, 1024));
newSettings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), sizeSetting);
}
indexSettings.updateIndexMetaData(newIndexMeta("index", newSettings.build()));
assertThat(indexSettings.getTranslogRetentionAge(), equalTo(ageSetting));
assertThat(indexSettings.getTranslogRetentionSize(), equalTo(sizeSetting));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void testTranslogStats() throws IOException {
try (Store store = createStore()) {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);

final boolean softDeletesEnabled = config.getIndexSettings().isSoftDeleteEnabled();
final int numDocs = frequently() ? scaledRandomIntBetween(10, 200) : 0;
int uncommittedDocs = 0;

Expand All @@ -259,16 +259,17 @@ public void testTranslogStats() throws IOException {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
globalCheckpoint.set(i);
if (rarely()) {
engine.flush();
uncommittedDocs = 0;
} else {
uncommittedDocs += 1;
}
globalCheckpoint.set(i);
}

assertThat(engine.getTranslogStats().estimatedNumberOfOperations(), equalTo(numDocs));
assertThat(engine.getTranslogStats().estimatedNumberOfOperations(),
equalTo(softDeletesEnabled ? uncommittedDocs : numDocs));
assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(uncommittedDocs));
assertThat(engine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
assertThat(engine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L));
Expand All @@ -278,7 +279,7 @@ public void testTranslogStats() throws IOException {
}

try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity())) {
assertThat(readOnlyEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(numDocs));
assertThat(readOnlyEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(softDeletesEnabled ? 0 : numDocs));
assertThat(readOnlyEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
assertThat(readOnlyEngine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,12 @@ public long addDocument(Iterable<? extends IndexableField> doc) throws IOExcepti
shards.startReplicas(nReplica);
for (IndexShard shard : shards) {
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
// we flush at the end of peer recovery
if (shard.routingEntry().primary() || shard.indexSettings().isSoftDeleteEnabled() == false) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
} else {
assertThat(snapshot.totalOperations(), equalTo(0));
}
}
try (Translog.Snapshot snapshot = shard.getHistoryOperations("test", 0)) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
Expand All @@ -476,11 +481,16 @@ public long addDocument(Iterable<? extends IndexableField> doc) throws IOExcepti
// the failure replicated directly from the replication channel.
indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON));
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
expectedTranslogOps.add(new Translog.NoOp(1, primaryTerm, indexException.toString()));
Translog.NoOp noop2 = new Translog.NoOp(1, primaryTerm, indexException.toString());
expectedTranslogOps.add(noop2);

for (IndexShard shard : shards) {
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
if (shard.routingEntry().primary() || shard.indexSettings().isSoftDeleteEnabled() == false) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
} else {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(Collections.singletonList(noop2)));
}
}
try (Translog.Snapshot snapshot = shard.getHistoryOperations("test", 0)) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2172,7 +2172,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException {
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
try (Translog.Snapshot snapshot = getTranslog(newShard).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(2));
assertThat(snapshot.totalOperations(), equalTo(newShard.indexSettings.isSoftDeleteEnabled() ? 0 : 2));
}
}
closeShards(newShard, shard);
Expand Down
Loading