Skip to content

Commit

Permalink
Core: Lucene merges must run on target shard during recovery
Browse files Browse the repository at this point in the history
This does not affect 2.0, where we let Lucene launch merges normally
(#8643).

In 1.x, every 1 sec (default), we ask Lucene to kick off any new
merges, but we unfortunately don't turn that logic on in the target
shard until after recovery has finished.

This means if you have a large translog, and/or a smallish index
buffer, way too many segments can accumulate in the target shard
during recovery, making version lookups slower and slower (OI(N^2))
and possibly causing slow recovery issues like #9226.

This fix changes IndexShard to launch merges as soon as the shard is
created, so merging runs during recovery.

Closes #10463
  • Loading branch information
mikemccand committed Apr 7, 2015
1 parent 9e1143c commit 7003959
Showing 1 changed file with 21 additions and 16 deletions.
37 changes: 21 additions & 16 deletions src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -185,6 +185,11 @@ public class IndexShard extends AbstractIndexShardComponent {
*/
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";

/**
* Controls how frequently we automatically refresh the near-real-time searcher.
*/
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";

@Inject
public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService,
Expand Down Expand Up @@ -225,12 +230,12 @@ public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexS
assert clusterService.localNode() != null : "Local node is null lifecycle state is: " + clusterService.lifecycleState();
this.localNode = clusterService.localNode();
state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
this.flushOnClose = indexSettings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true);
indexSettingsService.addListener(applyRefreshSettings);

this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
this.mergeInterval = indexSettings.getAsTime("index.merge.async_interval", TimeValue.timeValueSeconds(1));

/* create engine config */
this.config = new EngineConfig(shardId,
indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false),
Expand All @@ -241,6 +246,16 @@ public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexS
logger.debug("state: [CREATED]");

this.checkIndexOnStartup = indexSettings.get("index.shard.check_on_startup", "false");

// since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing
// so, make sure we periodically call it, this need to be a small enough value so merging will actually
// happen and reduce the number of segments
if (mergeInterval.millis() > 0) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger());
logger.debug("scheduling optimizer / merger every {}", mergeInterval);
} else {
logger.debug("scheduled optimizer / merger disabled");
}
}

public MergeSchedulerProvider mergeScheduler() {
Expand Down Expand Up @@ -802,7 +817,7 @@ public void finalizeRecovery() {
// clear unreferenced files
translog.clearUnreferenced();
engine().refresh("recovery_finalization");
startScheduledTasksIfNeeded();
startEngineRefresher();
config.setEnableGcDeletes(true);
}

Expand Down Expand Up @@ -934,22 +949,13 @@ protected final void verifyStarted() throws IllegalIndexShardStateException {
}
}

private void startScheduledTasksIfNeeded() {
private void startEngineRefresher() {
if (refreshInterval.millis() > 0) {
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher());
logger.debug("scheduling refresher every {}", refreshInterval);
} else {
logger.debug("scheduled refresher disabled");
}
// since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing
// so, make sure we periodically call it, this need to be a small enough value so mergine will actually
// happen and reduce the number of segments
if (mergeInterval.millis() > 0) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger());
logger.debug("scheduling optimizer / merger every {}", mergeInterval);
} else {
logger.debug("scheduled optimizer / merger disabled");
}
}

private Query filterQueryIfNeeded(Query query, String[] types) {
Expand All @@ -960,8 +966,6 @@ private Query filterQueryIfNeeded(Query query, String[] types) {
return query;
}

public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";

public void addFailedEngineListener(Engine.FailedEngineListener failedEngineListener) {
this.failedEngineListener.delegates.add(failedEngineListener);
}
Expand Down Expand Up @@ -1122,7 +1126,8 @@ private void reschedule() {
class EngineMerger implements Runnable {
@Override
public void run() {
if (!engine().possibleMergeNeeded()) {
final Engine engine = engineUnsafe();
if (engine == null || engine.possibleMergeNeeded() == false) {
synchronized (mutex) {
if (state != IndexShardState.CLOSED) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, this);
Expand All @@ -1134,7 +1139,7 @@ public void run() {
@Override
public void run() {
try {
engine().maybeMerge();
engine.maybeMerge();
} catch (EngineClosedException e) {
// we are being closed, ignore
} catch (OptimizeFailedEngineException e) {
Expand Down

0 comments on commit 7003959

Please sign in to comment.