Skip to content

Commit

Permalink
Make setting index.translog.sync_interval be dynamic (#37382)
Browse files Browse the repository at this point in the history
Currently, we cannot update index setting index.translog.sync_interval if index is open, because it's
not dynamic which can be updated for closed index only.

Closes #32763
  • Loading branch information
Like authored and ywelsch committed Mar 20, 2019
1 parent 5ec56d7 commit 6f64267
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 18 deletions.
34 changes: 21 additions & 13 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Expand Up @@ -199,7 +199,7 @@ public IndexService(
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
rescheduleFsyncTask(indexSettings.getTranslogDurability());
updateFsyncTaskIfNecessary();
}

public int numberOfShards() {
Expand Down Expand Up @@ -640,8 +640,6 @@ public IndexMetaData getMetaData() {

@Override
public synchronized void updateMetaData(final IndexMetaData currentIndexMetaData, final IndexMetaData newIndexMetaData) {
final Translog.Durability oldTranslogDurability = indexSettings.getTranslogDurability();

final boolean updateIndexMetaData = indexSettings.updateIndexMetaData(newIndexMetaData);

if (Assertions.ENABLED
Expand Down Expand Up @@ -693,20 +691,23 @@ public boolean isForceExecution() {
});
rescheduleRefreshTasks();
}
final Translog.Durability durability = indexSettings.getTranslogDurability();
if (durability != oldTranslogDurability) {
rescheduleFsyncTask(durability);
}
updateFsyncTaskIfNecessary();
}
}

private void rescheduleFsyncTask(Translog.Durability durability) {
try {
if (fsyncTask != null) {
fsyncTask.close();
private void updateFsyncTaskIfNecessary() {
if (indexSettings.getTranslogDurability() == Translog.Durability.REQUEST) {
try {
if (fsyncTask != null) {
fsyncTask.close();
}
} finally {
fsyncTask = null;
}
} finally {
fsyncTask = durability == Translog.Durability.REQUEST ? null : new AsyncTranslogFSync(this);
} else if (fsyncTask == null) {
fsyncTask = new AsyncTranslogFSync(this);
} else {
fsyncTask.updateIfNeeded();
}
}

Expand Down Expand Up @@ -868,6 +869,13 @@ protected void runInternal() {
indexService.maybeFSyncTranslogs();
}

void updateIfNeeded() {
final TimeValue newInterval = indexService.getIndexSettings().getTranslogSyncInterval();
if (newInterval.equals(getInterval()) == false) {
setInterval(newInterval);
}
}

@Override
public String toString() {
return "translog_sync";
Expand Down
Expand Up @@ -62,7 +62,7 @@ public final class IndexSettings {
Setting.boolSetting("index.query.parse.allow_unmapped_fields", true, Property.IndexScope);
public static final Setting<TimeValue> INDEX_TRANSLOG_SYNC_INTERVAL_SETTING =
Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100),
Property.IndexScope);
Property.Dynamic, Property.IndexScope);
public static final Setting<TimeValue> INDEX_SEARCH_IDLE_AFTER =
Setting.timeSetting("index.search.idle.after", TimeValue.timeValueSeconds(30),
TimeValue.timeValueMinutes(0), Property.IndexScope, Property.Dynamic);
Expand Down Expand Up @@ -316,7 +316,7 @@ public final class IndexSettings {
private final boolean queryStringAllowLeadingWildcard;
private final boolean defaultAllowUnmappedFields;
private volatile Translog.Durability durability;
private final TimeValue syncInterval;
private volatile TimeValue syncInterval;
private volatile TimeValue refreshInterval;
private volatile ByteSizeValue flushThresholdSize;
private volatile TimeValue translogRetentionAge;
Expand Down Expand Up @@ -501,6 +501,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING, mergeSchedulerConfig::setMaxThreadAndMergeCount);
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval);
scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow);
scopedSettings.addSettingsUpdateConsumer(MAX_INNER_RESULT_WINDOW_SETTING, this::setMaxInnerResultWindow);
scopedSettings.addSettingsUpdateConsumer(MAX_ADJACENCY_MATRIX_FILTERS_SETTING, this::setMaxAdjacencyMatrixFilters);
Expand Down Expand Up @@ -701,6 +702,10 @@ public TimeValue getTranslogSyncInterval() {
return syncInterval;
}

public void setTranslogSyncInterval(TimeValue translogSyncInterval) {
this.syncInterval = translogSyncInterval;
}

/**
* Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled.
*/
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -320,9 +321,7 @@ public void testAsyncFsyncActuallyWorks() throws Exception {
assertTrue(indexService.getRefreshTask().mustReschedule());
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
IndexShard shard = indexService.getShard(0);
assertBusy(() -> {
assertFalse(shard.isSyncNeeded());
});
assertBusy(() -> assertFalse(shard.isSyncNeeded()));
}

public void testRescheduleAsyncFsync() throws Exception {
Expand Down Expand Up @@ -394,4 +393,39 @@ public void testIllegalFsyncInterval() {
assertEquals("failed to parse value [0ms] for setting [index.translog.sync_interval], must be >= [100ms]", ex.getMessage());
}
}

public void testUpdateSyncIntervalDynamically() {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s") // very often :)
.build();
IndexService indexService = createIndex("test", settings);
ensureGreen("test");
assertNull(indexService.getFsyncTask());

Settings.Builder builder = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "5s")
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC.name());

client()
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(builder)
.get();

assertNotNull(indexService.getFsyncTask());
assertTrue(indexService.getFsyncTask().mustReschedule());

IndexMetaData indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test");
assertEquals("5s", indexMetaData.getSettings().get(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey()));

client().admin().indices().prepareClose("test").get();
client()
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "20s"))
.get();
indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test");
assertEquals("20s", indexMetaData.getSettings().get(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey()));
}
}
Expand Up @@ -402,6 +402,7 @@ static String[] extractLeaderShardHistoryUUIDs(Map<String, String> ccrIndexMetaD
nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING);
nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING);
nonReplicatedSettings.add(IndexSettings.INDEX_GC_DELETES_SETTING);
nonReplicatedSettings.add(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD);

Expand Down

0 comments on commit 6f64267

Please sign in to comment.