From 78e39882ee0176946c0da0f6b05a343af6ffc5da Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Thu, 8 May 2014 19:13:35 +0200 Subject: [PATCH] Allow to change concurrent merge scheduling setting dynamically Allow to change the concurrent merge scheduler settings dynamically using the update settings API closes #6098 --- .../ConcurrentMergeSchedulerProvider.java | 44 ++++++++++++++++--- .../scheduler/MergeSchedulerProvider.java | 2 + .../SerialMergeSchedulerProvider.java | 5 +++ .../index/service/InternalIndexService.java | 7 +++ .../engine/internal/InternalEngineTests.java | 2 +- 5 files changed, 54 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java index 538066cedc8ba..d8117749d9b89 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; @@ -44,18 +45,23 @@ */ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider { - private final int maxThreadCount; - private final int maxMergeCount; + private final IndexSettingsService indexSettingsService; + private final ApplySettings applySettings = new ApplySettings(); + + private volatile int maxThreadCount; + private volatile int maxMergeCount; private Set schedulers = new CopyOnWriteArraySet<>(); @Inject - public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) { + public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexSettingsService indexSettingsService) { super(shardId, indexSettings, threadPool); - + this.indexSettingsService = indexSettingsService; this.maxThreadCount = componentSettings.getAsInt("max_thread_count", ConcurrentMergeScheduler.DEFAULT_MAX_THREAD_COUNT); this.maxMergeCount = componentSettings.getAsInt("max_merge_count", ConcurrentMergeScheduler.DEFAULT_MAX_MERGE_COUNT); - logger.debug("using [concurrent] merge scheduler with max_thread_count[{}]", maxThreadCount); + logger.debug("using [concurrent] merge scheduler with max_thread_count[{}], max_merge_count[{}]", maxThreadCount, maxMergeCount); + + indexSettingsService.addListener(applySettings); } @Override @@ -84,6 +90,11 @@ public Set onGoingMerges() { return ImmutableSet.of(); } + @Override + public void close() { + indexSettingsService.removeListener(applySettings); + } + public static class CustomConcurrentMergeScheduler extends TrackingConcurrentMergeScheduler { private final ShardId shardId; @@ -128,4 +139,27 @@ protected void afterMerge(OnGoingMerge merge) { provider.afterMerge(merge); } } + + class ApplySettings implements IndexSettingsService.Listener { + @Override + public void onRefreshSettings(Settings settings) { + int maxThreadCount = settings.getAsInt("index.merge.scheduler.max_thread_count", ConcurrentMergeSchedulerProvider.this.maxThreadCount); + if (maxThreadCount != ConcurrentMergeSchedulerProvider.this.maxThreadCount) { + logger.info("updating [max_thread_count] from [{}] to [{}]", ConcurrentMergeSchedulerProvider.this.maxThreadCount, maxThreadCount); + ConcurrentMergeSchedulerProvider.this.maxThreadCount = maxThreadCount; + for (CustomConcurrentMergeScheduler scheduler : schedulers) { + scheduler.setMaxMergesAndThreads(ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxThreadCount); + } + } + + int maxMergeCount = settings.getAsInt("index.merge.scheduler.max_merge_count", ConcurrentMergeSchedulerProvider.this.maxMergeCount); + if (maxMergeCount != ConcurrentMergeSchedulerProvider.this.maxMergeCount) { + logger.info("updating [max_merge_count] from [{}] to [{}]", ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxMergeCount); + ConcurrentMergeSchedulerProvider.this.maxMergeCount = maxMergeCount; + for (CustomConcurrentMergeScheduler scheduler : schedulers) { + scheduler.setMaxMergesAndThreads(maxMergeCount, ConcurrentMergeSchedulerProvider.this.maxThreadCount); + } + } + } + } } diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java index 92d2e05c640ec..c548c7181fef9 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java @@ -127,4 +127,6 @@ public final MergeScheduler newMergeScheduler() { public abstract MergeStats stats(); public abstract Set onGoingMerges(); + + public abstract void close(); } diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java index 92686a5a824be..553bbf92e7aed 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/SerialMergeSchedulerProvider.java @@ -74,6 +74,11 @@ public Set onGoingMerges() { return ImmutableSet.of(); } + @Override + public void close() { + + } + public static class CustomSerialMergeScheduler extends TrackingSerialMergeScheduler { private final SerialMergeSchedulerProvider provider; diff --git a/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index 486acd88ec43c..2f0358f43f565 100644 --- a/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.merge.policy.MergePolicyModule; import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule; +import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; import org.elasticsearch.index.percolator.PercolatorQueriesRegistry; import org.elasticsearch.index.percolator.PercolatorShardModule; import org.elasticsearch.index.query.IndexQueryParserService; @@ -403,6 +404,12 @@ public synchronized void removeShard(int shardId, String reason) throws Elastics logger.debug("failed to close engine", e); // ignore } + try { + shardInjector.getInstance(MergeSchedulerProvider.class).close(); + } catch (Throwable e) { + logger.debug("failed to close merge policy scheduler", e); + // ignore + } try { shardInjector.getInstance(MergePolicyProvider.class).close(); } catch (Throwable e) { diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index 4d1c555e8746a..1aff856da2feb 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -324,7 +324,7 @@ public void testSegments() throws Exception { @Test public void testSegmentsWithMergeFlag() throws Exception { - ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool); + ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS)); final AtomicReference waitTillMerge = new AtomicReference<>(); final AtomicReference waitForMerge = new AtomicReference<>(); mergeSchedulerProvider.addListener(new MergeSchedulerProvider.Listener() {