From 443f31b7aa036ba6b1ba4676526356b7211803ac Mon Sep 17 00:00:00 2001 From: liketic Date: Sat, 12 Jan 2019 19:43:16 +0800 Subject: [PATCH 1/5] Make setting index.translog.sync_interval be dynamic --- .../elasticsearch/index/IndexSettings.java | 2 +- .../index/IndexServiceTests.java | 33 +++++++++++++++++-- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 4d9a8f7d37b70..7422545c675b2 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -62,7 +62,7 @@ public final class IndexSettings { Setting.boolSetting("index.query.parse.allow_unmapped_fields", true, Property.IndexScope); public static final Setting INDEX_TRANSLOG_SYNC_INTERVAL_SETTING = Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100), - Property.IndexScope); + Property.Dynamic, Property.IndexScope); public static final Setting INDEX_SEARCH_IDLE_AFTER = Setting.timeSetting("index.search.idle.after", TimeValue.timeValueSeconds(30), TimeValue.timeValueMinutes(0), Property.IndexScope, Property.Dynamic); diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index a47d4db2a2579..c4f8bd3d37785 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -21,6 +21,7 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TopDocs; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; @@ -246,9 +247,7 @@ public void testAsyncFsyncActuallyWorks() throws Exception { assertTrue(indexService.getRefreshTask().mustReschedule()); client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); IndexShard shard = indexService.getShard(0); - assertBusy(() -> { - assertFalse(shard.isSyncNeeded()); - }); + assertBusy(() -> assertFalse(shard.isSyncNeeded())); } public void testRescheduleAsyncFsync() throws Exception { @@ -320,4 +319,32 @@ public void testIllegalFsyncInterval() { assertEquals("failed to parse value [0ms] for setting [index.translog.sync_interval], must be >= [100ms]", ex.getMessage()); } } + + public void testUpdateSyncIntervalDynamically() { + Settings settings = Settings.builder() + .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s") // very often :) + .build(); + createIndex("test", settings); + ensureGreen("test"); + + client() + .admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "5s")) + .get(); + + IndexMetaData indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test"); + assertEquals("5s", indexMetaData.getSettings().get(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey())); + + client().admin().indices().prepareClose("test").get(); + client() + .admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "20s")) + .get(); + indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test"); + assertEquals("20s", indexMetaData.getSettings().get(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey())); + } } From f5a6343a7695d36cc81627027378e0f3c3ee5ce2 Mon Sep 17 00:00:00 2001 From: liketic Date: Sat, 19 Jan 2019 22:56:05 +0800 Subject: [PATCH 2/5] Make sync interval updates work --- .../org/elasticsearch/index/IndexService.java | 15 ++++++++++++--- .../org/elasticsearch/index/IndexSettings.java | 7 ++++++- .../elasticsearch/index/IndexServiceTests.java | 5 ++++- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 54bf5fa1aa18e..02ba2043c0fef 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -619,6 +619,7 @@ public IndexMetaData getMetaData() { @Override public synchronized void updateMetaData(final IndexMetaData currentIndexMetaData, final IndexMetaData newIndexMetaData) { final Translog.Durability oldTranslogDurability = indexSettings.getTranslogDurability(); + final TimeValue oldSyncInterval = indexSettings.getTranslogSyncInterval(); final boolean updateIndexMetaData = indexSettings.updateIndexMetaData(newIndexMetaData); @@ -651,7 +652,7 @@ public synchronized void updateMetaData(final IndexMetaData currentIndexMetaData // once we change the refresh interval we schedule yet another refresh // to ensure we are in a clean and predictable state. // it doesn't matter if we move from or to -1 in both cases we want - // docs to become visible immediately. This also flushes all pending indexing / search reqeusts + // docs to become visible immediately. This also flushes all pending indexing / search requests // that are waiting for a refresh. threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() { @Override @@ -672,19 +673,27 @@ public boolean isForceExecution() { rescheduleRefreshTasks(); } final Translog.Durability durability = indexSettings.getTranslogDurability(); - if (durability != oldTranslogDurability) { + final TimeValue syncInterval = indexSettings.getTranslogSyncInterval(); + if (syncInterval.equals(oldSyncInterval) == false) { + rescheduleFsyncTask(() -> new AsyncTranslogFSync(IndexService.this)); + } + else if (durability != oldTranslogDurability) { rescheduleFsyncTask(durability); } } } private void rescheduleFsyncTask(Translog.Durability durability) { + rescheduleFsyncTask(() -> durability == Translog.Durability.REQUEST ? null : new AsyncTranslogFSync(IndexService.this)); + } + + private void rescheduleFsyncTask(Supplier fsyncTaskSupplier) { try { if (fsyncTask != null) { fsyncTask.close(); } } finally { - fsyncTask = durability == Translog.Durability.REQUEST ? null : new AsyncTranslogFSync(this); + fsyncTask = fsyncTaskSupplier.get(); } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 7422545c675b2..1effc845509d1 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -314,7 +314,7 @@ public final class IndexSettings { private final boolean queryStringAllowLeadingWildcard; private final boolean defaultAllowUnmappedFields; private volatile Translog.Durability durability; - private final TimeValue syncInterval; + private volatile TimeValue syncInterval; private volatile TimeValue refreshInterval; private volatile ByteSizeValue flushThresholdSize; private volatile TimeValue translogRetentionAge; @@ -495,6 +495,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING, mergeSchedulerConfig::setMaxThreadAndMergeCount); scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval); scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow); scopedSettings.addSettingsUpdateConsumer(MAX_INNER_RESULT_WINDOW_SETTING, this::setMaxInnerResultWindow); scopedSettings.addSettingsUpdateConsumer(MAX_ADJACENCY_MATRIX_FILTERS_SETTING, this::setMaxAdjacencyMatrixFilters); @@ -694,6 +695,10 @@ public TimeValue getTranslogSyncInterval() { return syncInterval; } + public void setTranslogSyncInterval(TimeValue translogSyncInterval) { + this.syncInterval = translogSyncInterval; + } + /** * Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled. */ diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index c4f8bd3d37785..49262d45e876c 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -324,7 +324,7 @@ public void testUpdateSyncIntervalDynamically() { Settings settings = Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s") // very often :) .build(); - createIndex("test", settings); + IndexService indexService = createIndex("test", settings); ensureGreen("test"); client() @@ -334,6 +334,9 @@ public void testUpdateSyncIntervalDynamically() { .setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "5s")) .get(); + assertNotNull(indexService.getFsyncTask()); + assertTrue(indexService.getFsyncTask().mustReschedule()); + IndexMetaData indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test"); assertEquals("5s", indexMetaData.getSettings().get(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey())); From 018da3de9d006f7bc10763ec1c2db141ab8931da Mon Sep 17 00:00:00 2001 From: liketic Date: Tue, 19 Mar 2019 09:07:53 +0800 Subject: [PATCH 3/5] Address comments --- .../org/elasticsearch/index/IndexService.java | 43 +++++++++---------- .../index/IndexServiceTests.java | 8 +++- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 2553b793a432a..cd7e03144b3b3 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -199,7 +199,7 @@ public IndexService( this.trimTranslogTask = new AsyncTrimTranslogTask(this); this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); - rescheduleFsyncTask(indexSettings.getTranslogDurability()); + updateFsyncTaskIfNecessary(); } public int numberOfShards() { @@ -640,9 +640,6 @@ public IndexMetaData getMetaData() { @Override public synchronized void updateMetaData(final IndexMetaData currentIndexMetaData, final IndexMetaData newIndexMetaData) { - final Translog.Durability oldTranslogDurability = indexSettings.getTranslogDurability(); - final TimeValue oldSyncInterval = indexSettings.getTranslogSyncInterval(); - final boolean updateIndexMetaData = indexSettings.updateIndexMetaData(newIndexMetaData); if (Assertions.ENABLED @@ -694,28 +691,23 @@ public boolean isForceExecution() { }); rescheduleRefreshTasks(); } - final Translog.Durability durability = indexSettings.getTranslogDurability(); - final TimeValue syncInterval = indexSettings.getTranslogSyncInterval(); - if (syncInterval.equals(oldSyncInterval) == false) { - rescheduleFsyncTask(() -> new AsyncTranslogFSync(IndexService.this)); - } - else if (durability != oldTranslogDurability) { - rescheduleFsyncTask(durability); - } + updateFsyncTaskIfNecessary(); } } - private void rescheduleFsyncTask(Translog.Durability durability) { - rescheduleFsyncTask(() -> durability == Translog.Durability.REQUEST ? null : new AsyncTranslogFSync(IndexService.this)); - } - - private void rescheduleFsyncTask(Supplier fsyncTaskSupplier) { - try { - if (fsyncTask != null) { - fsyncTask.close(); + private void updateFsyncTaskIfNecessary() { + if (indexSettings.getTranslogDurability() == Translog.Durability.REQUEST) { + try { + if (fsyncTask != null) { + fsyncTask.close(); + } + } finally { + fsyncTask = null; } - } finally { - fsyncTask = fsyncTaskSupplier.get(); + } else if (fsyncTask == null) { + fsyncTask = new AsyncTranslogFSync(this); + } else { + fsyncTask.updateIfNeeded(); } } @@ -877,6 +869,13 @@ protected void runInternal() { indexService.maybeFSyncTranslogs(); } + void updateIfNeeded() { + final TimeValue newInterval = indexService.getIndexSettings().getTranslogSyncInterval(); + if (newInterval.equals(getInterval()) == false) { + setInterval(newInterval); + } + } + @Override public String toString() { return "translog_sync"; diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index a5b71d4b153e2..a42ddc9925272 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Assert; import java.io.IOException; import java.util.Collection; @@ -51,6 +52,7 @@ import static org.elasticsearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertNull; /** Unit test(s) for IndexService */ public class IndexServiceTests extends ESSingleNodeTestCase { @@ -400,12 +402,16 @@ public void testUpdateSyncIntervalDynamically() { .build(); IndexService indexService = createIndex("test", settings); ensureGreen("test"); + assertNull(indexService.getFsyncTask()); + + Settings.Builder builder = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "5s") + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC.name()); client() .admin() .indices() .prepareUpdateSettings("test") - .setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "5s")) + .setSettings(builder) .get(); assertNotNull(indexService.getFsyncTask()); From 79c1ef19748cbc0c2b179105d8d285915ee03fcd Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 19 Mar 2019 14:44:34 +0100 Subject: [PATCH 4/5] checkstyle fix --- .../test/java/org/elasticsearch/index/IndexServiceTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index a42ddc9925272..2d4030a51ce3d 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -40,7 +40,6 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.threadpool.ThreadPool; -import org.junit.Assert; import java.io.IOException; import java.util.Collection; @@ -52,7 +51,6 @@ import static org.elasticsearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertNull; /** Unit test(s) for IndexService */ public class IndexServiceTests extends ESSingleNodeTestCase { From 9eac4737dee9127f26acb0567943e213c1aee67d Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 20 Mar 2019 10:13:25 +0100 Subject: [PATCH 5/5] fix test --- .../xpack/ccr/action/TransportResumeFollowAction.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index 1d595e7c95b8c..cf35387c6ece4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -402,6 +402,7 @@ static String[] extractLeaderShardHistoryUUIDs(Map ccrIndexMetaD nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING); + nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING); nonReplicatedSettings.add(IndexSettings.INDEX_GC_DELETES_SETTING); nonReplicatedSettings.add(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD);