Skip to content

Commit

Permalink
Allow to change concurrent merge scheduling setting dynamically
Browse files Browse the repository at this point in the history
Allow to change the concurrent merge scheduler settings dynamically using the update settings API
closes #6098
  • Loading branch information
kimchy committed May 12, 2014
1 parent 29ab31b commit 78e3988
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 6 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -44,18 +45,23 @@
*/
public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {

private final int maxThreadCount;
private final int maxMergeCount;
private final IndexSettingsService indexSettingsService;
private final ApplySettings applySettings = new ApplySettings();

private volatile int maxThreadCount;
private volatile int maxMergeCount;

private Set<CustomConcurrentMergeScheduler> schedulers = new CopyOnWriteArraySet<>();

@Inject
public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) {
public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexSettingsService indexSettingsService) {
super(shardId, indexSettings, threadPool);

this.indexSettingsService = indexSettingsService;
this.maxThreadCount = componentSettings.getAsInt("max_thread_count", ConcurrentMergeScheduler.DEFAULT_MAX_THREAD_COUNT);
this.maxMergeCount = componentSettings.getAsInt("max_merge_count", ConcurrentMergeScheduler.DEFAULT_MAX_MERGE_COUNT);
logger.debug("using [concurrent] merge scheduler with max_thread_count[{}]", maxThreadCount);
logger.debug("using [concurrent] merge scheduler with max_thread_count[{}], max_merge_count[{}]", maxThreadCount, maxMergeCount);

indexSettingsService.addListener(applySettings);
}

@Override
Expand Down Expand Up @@ -84,6 +90,11 @@ public Set<OnGoingMerge> onGoingMerges() {
return ImmutableSet.of();
}

@Override
public void close() {
indexSettingsService.removeListener(applySettings);
}

public static class CustomConcurrentMergeScheduler extends TrackingConcurrentMergeScheduler {

private final ShardId shardId;
Expand Down Expand Up @@ -128,4 +139,27 @@ protected void afterMerge(OnGoingMerge merge) {
provider.afterMerge(merge);
}
}

class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
int maxThreadCount = settings.getAsInt("index.merge.scheduler.max_thread_count", ConcurrentMergeSchedulerProvider.this.maxThreadCount);
if (maxThreadCount != ConcurrentMergeSchedulerProvider.this.maxThreadCount) {
logger.info("updating [max_thread_count] from [{}] to [{}]", ConcurrentMergeSchedulerProvider.this.maxThreadCount, maxThreadCount);
ConcurrentMergeSchedulerProvider.this.maxThreadCount = maxThreadCount;
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
scheduler.setMaxMergesAndThreads(ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxThreadCount);
}
}

int maxMergeCount = settings.getAsInt("index.merge.scheduler.max_merge_count", ConcurrentMergeSchedulerProvider.this.maxMergeCount);
if (maxMergeCount != ConcurrentMergeSchedulerProvider.this.maxMergeCount) {
logger.info("updating [max_merge_count] from [{}] to [{}]", ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxMergeCount);
ConcurrentMergeSchedulerProvider.this.maxMergeCount = maxMergeCount;
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
scheduler.setMaxMergesAndThreads(maxMergeCount, ConcurrentMergeSchedulerProvider.this.maxThreadCount);
}
}
}
}
}
Expand Up @@ -127,4 +127,6 @@ public final MergeScheduler newMergeScheduler() {
public abstract MergeStats stats();

public abstract Set<OnGoingMerge> onGoingMerges();

public abstract void close();
}
Expand Up @@ -74,6 +74,11 @@ public Set<OnGoingMerge> onGoingMerges() {
return ImmutableSet.of();
}

@Override
public void close() {

}

public static class CustomSerialMergeScheduler extends TrackingSerialMergeScheduler {

private final SerialMergeSchedulerProvider provider;
Expand Down
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.percolator.PercolatorShardModule;
import org.elasticsearch.index.query.IndexQueryParserService;
Expand Down Expand Up @@ -403,6 +404,12 @@ public synchronized void removeShard(int shardId, String reason) throws Elastics
logger.debug("failed to close engine", e);
// ignore
}
try {
shardInjector.getInstance(MergeSchedulerProvider.class).close();
} catch (Throwable e) {
logger.debug("failed to close merge policy scheduler", e);
// ignore
}
try {
shardInjector.getInstance(MergePolicyProvider.class).close();
} catch (Throwable e) {
Expand Down
Expand Up @@ -324,7 +324,7 @@ public void testSegments() throws Exception {

@Test
public void testSegmentsWithMergeFlag() throws Exception {
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool);
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS));
final AtomicReference<CountDownLatch> waitTillMerge = new AtomicReference<>();
final AtomicReference<CountDownLatch> waitForMerge = new AtomicReference<>();
mergeSchedulerProvider.addListener(new MergeSchedulerProvider.Listener() {
Expand Down

0 comments on commit 78e3988

Please sign in to comment.