Skip to content

Commit

Permalink
Move to use serial merge schedule by default
Browse files Browse the repository at this point in the history
Today, we use ConcurrentMergeScheduler, and this can be painful since it is concurrent on a shard level, with a max of 3 threads doing concurrent merges. If there are several shards being indexed, then there will be a minor explosion of threads trying to do merges, all being throttled by our merge throttling.
Moving to serial merge scheduler will still maintain concurrency of merges across shards, as we have the merge thread pool that schedules those merges. It will just be a serial one on a specific shard.
Also, on serial merge scheduler, we now have a limit of how many merges it will do at one go, so it will let other shards get their fair chance of merging. We use the pending merges on IW to check if merges are needed or not for it.
Note, that if a merge is happening, it will not block due to a sync on the maybeMerge call at indexing (flush) time, since we wrap our merge scheduler with the EnabledMergeScheduler, where maybeMerge is not activated during indexing, only with explicit calls to IW#maybeMerge (see Merges).
closes #5447
  • Loading branch information
kimchy committed Mar 18, 2014
1 parent 5f9e77a commit 67460c1
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 7 deletions.
6 changes: 5 additions & 1 deletion docs/reference/index-modules/merge.asciidoc
Expand Up @@ -187,7 +187,11 @@ Defaults to unbounded.

The merge schedule controls the execution of merge operations once they
are needed (according to the merge policy). The following types are
supported, with the default being the `ConcurrentMergeScheduler`.
supported, with the default being the `SerialMergeScheduler`.

Note, the default is the serial merge scheduler since there is a merge
thread pool that explicitly schedules merges, and it makes sure that
merges are serial within a shard, yet concurrent across multiple shards.

[float]
==== ConcurrentMergeScheduler
Expand Down
Expand Up @@ -46,8 +46,11 @@ public class TrackingSerialMergeScheduler extends MergeScheduler {
private final Set<OnGoingMerge> onGoingMerges = ConcurrentCollections.newConcurrentSet();
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);

public TrackingSerialMergeScheduler(ESLogger logger) {
private final int maxMergeAtOnce;

public TrackingSerialMergeScheduler(ESLogger logger, int maxMergeAtOnce) {
this.logger = logger;
this.maxMergeAtOnce = maxMergeAtOnce;
}

public long totalMerges() {
Expand Down Expand Up @@ -89,7 +92,8 @@ public Set<OnGoingMerge> onGoingMerges() {
*/
@Override
synchronized public void merge(IndexWriter writer) throws CorruptIndexException, IOException {
while (true) {
int cycle = 0;
while (cycle++ < maxMergeAtOnce) {
MergePolicy.OneMerge merge = writer.getNextMerge();
if (merge == null)
break;
Expand Down
Expand Up @@ -674,7 +674,13 @@ public boolean refreshNeeded() {

@Override
public boolean possibleMergeNeeded() {
return this.possibleMergeNeeded;
IndexWriter writer = this.indexWriter;
if (writer == null) {
return false;
}
// a merge scheduler might bail without going through all its pending merges
// so make sure we also check if there are pending merges
return this.possibleMergeNeeded || writer.hasPendingMerges();
}

@Override
Expand Down
Expand Up @@ -28,7 +28,7 @@
public class MergeSchedulerModule extends AbstractModule {

public static final String MERGE_SCHEDULER_TYPE_KEY = "index.merge.scheduler.type";
public static final Class<? extends MergeSchedulerProvider> DEFAULT = ConcurrentMergeSchedulerProvider.class;
public static final Class<? extends MergeSchedulerProvider> DEFAULT = SerialMergeSchedulerProvider.class;

private final Settings settings;

Expand Down
Expand Up @@ -40,11 +40,13 @@
public class SerialMergeSchedulerProvider extends MergeSchedulerProvider {

private Set<CustomSerialMergeScheduler> schedulers = new CopyOnWriteArraySet<CustomSerialMergeScheduler>();
private final int maxMergeAtOnce;

@Inject
public SerialMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) {
super(shardId, indexSettings, threadPool);
logger.trace("using [serial] merge scheduler");
this.maxMergeAtOnce = componentSettings.getAsInt("max_merge_at_once", 5);
logger.trace("using [serial] merge scheduler, max_merge_at_once [{}]", maxMergeAtOnce);
}

@Override
Expand Down Expand Up @@ -77,7 +79,7 @@ public static class CustomSerialMergeScheduler extends TrackingSerialMergeSchedu
private final SerialMergeSchedulerProvider provider;

public CustomSerialMergeScheduler(ESLogger logger, SerialMergeSchedulerProvider provider) {
super(logger);
super(logger, provider.maxMergeAtOnce);
this.provider = provider;
}

Expand Down
@@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.lucene;

import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.index.TrackingSerialMergeScheduler;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.merge.EnableMergeScheduler;
import org.elasticsearch.index.merge.Merges;
import org.elasticsearch.test.ElasticsearchLuceneTestCase;
import org.junit.Test;

/**
*/
public class TrackingSerialMergeSchedulerTests extends ElasticsearchLuceneTestCase {

@Test
public void testMaxMergeAtOnce() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
// create a tracking merge scheduler, but enabled one, so we can control when it merges
EnableMergeScheduler mergeScheduler = new EnableMergeScheduler(new TrackingSerialMergeScheduler(Loggers.getLogger(getTestClass()), 2));
iwc.setMergeScheduler(mergeScheduler);
TieredMergePolicy mergePolicy = new TieredMergePolicy();
mergePolicy.setMaxMergeAtOnceExplicit(3);
mergePolicy.setMaxMergeAtOnce(3);
iwc.setMergePolicy(mergePolicy);
IndexWriter iw = new IndexWriter(dir, iwc);
// create 20 segments
for (int i = 0; i < 20; i++) {
Document doc = new Document();
doc.add(new StringField("id", Integer.toString(i), Field.Store.NO));
iw.addDocument(doc);
iw.commit(); // create a segment, no merge will happen, its disabled
}
// based on the merge policy maxMerge, and the fact that we allow only for 2 merges to run
// per maybeMerge in our configuration of the serial merge scheduler, the we expect to need
// 4 merge runs to work out through the pending merges
for (int i = 0; i < 4; i++) {
assertTrue(iw.hasPendingMerges());
Merges.maybeMerge(iw);
assertTrue(iw.hasPendingMerges());
}
Merges.maybeMerge(iw);
assertFalse(iw.hasPendingMerges());

iw.close(false);
dir.close();
}
}

0 comments on commit 67460c1

Please sign in to comment.