Skip to content

Commit

Permalink
Cleanup MergeScheduler infrastrucutre
Browse files Browse the repository at this point in the history
This commit cleans up all the MergeScheduler infrastructure
and simplifies / removes all unneeded abstractions. The MergeScheduler
itself is now private to the Engine and all abstractions like Providers
that had support for multiple merge schedulers etc. are removed.

Closes #11602
  • Loading branch information
s1monw committed Jun 11, 2015
1 parent 483a15a commit 440580d
Show file tree
Hide file tree
Showing 19 changed files with 335 additions and 501 deletions.
30 changes: 30 additions & 0 deletions core/src/main/java/org/apache/lucene/index/OneMergeHelper.java
@@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.lucene.index;

/**
* Allows pkg private access
*/
public class OneMergeHelper {
private OneMergeHelper() {}
public static String getSegmentName(MergePolicy.OneMerge merge) {
return merge.info != null ? merge.info.info.name : "_na_";
}
}
2 changes: 0 additions & 2 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.settings.IndexSettings;
Expand Down Expand Up @@ -370,7 +369,6 @@ private void closeShardInjector(String reason, ShardId sId, Injector shardInject
}
}
closeInjectorResource(sId, shardInjector,
MergeSchedulerProvider.class,
IndexShardGatewayService.class,
PercolatorQueriesRegistry.class);

Expand Down
Expand Up @@ -17,15 +17,22 @@
* under the License.
*/

package org.apache.lucene.index;
package org.elasticsearch.index.engine;

import org.apache.lucene.index.*;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -36,9 +43,11 @@
* An extension to the {@link ConcurrentMergeScheduler} that provides tracking on merge times, total
* and current merges.
*/
public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {

protected final ESLogger logger;
private final Settings indexSettings;
private final ShardId shardId;

private final MeanMetric totalMerges = new MeanMetric();
private final CounterMetric totalMergesNumDocs = new CounterMetric();
Expand All @@ -51,46 +60,14 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {

private final Set<OnGoingMerge> onGoingMerges = ConcurrentCollections.newConcurrentSet();
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);

public TrackingConcurrentMergeScheduler(ESLogger logger) {
super();
this.logger = logger;
}

public long totalMerges() {
return totalMerges.count();
}

public long totalMergeTime() {
return totalMerges.sum();
}

public long totalMergeNumDocs() {
return totalMergesNumDocs.count();
}

public long totalMergeSizeInBytes() {
return totalMergesSizeInBytes.count();
}

public long currentMerges() {
return currentMerges.count();
}

public long currentMergesNumDocs() {
return currentMergesNumDocs.count();
}

public long currentMergesSizeInBytes() {
return currentMergesSizeInBytes.count();
}

public long totalMergeStoppedTimeMillis() {
return totalMergeStoppedTime.count();
}

public long totalMergeThrottledTimeMillis() {
return totalMergeThrottledTime.count();
private final MergeSchedulerConfig config;

public ElasticsearchConcurrentMergeScheduler(ShardId shardId, Settings indexSettings, MergeSchedulerConfig config) {
this.config = config;
this.shardId = shardId;
this.indexSettings = indexSettings;
this.logger = Loggers.getLogger(getClass(), indexSettings, shardId);
refreshConfig();
}

public Set<OnGoingMerge> onGoingMerges() {
Expand All @@ -110,7 +87,7 @@ protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IO
onGoingMerges.add(onGoingMerge);

if (logger.isTraceEnabled()) {
logger.trace("merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size", merge.info == null ? "_na_" : merge.info.info.name, merge.segments.size(), totalNumDocs, new ByteSizeValue(totalSizeInBytes), new ByteSizeValue(merge.estimatedMergeBytes));
logger.trace("merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size", OneMergeHelper.getSegmentName(merge), merge.segments.size(), totalNumDocs, new ByteSizeValue(totalSizeInBytes), new ByteSizeValue(merge.estimatedMergeBytes));
}
try {
beforeMerge(onGoingMerge);
Expand All @@ -137,7 +114,7 @@ protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IO

String message = String.format(Locale.ROOT,
"merge segment [%s] done: took [%s], [%,.1f MB], [%,d docs], [%s stopped], [%s throttled], [%,.1f MB written], [%,.1f MB/sec throttle]",
merge.info == null ? "_na_" : merge.info.info.name,
OneMergeHelper.getSegmentName(merge),
TimeValue.timeValueMillis(tookMS),
totalSizeInBytes/1024f/1024f,
totalNumDocs,
Expand All @@ -157,21 +134,53 @@ protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IO
/**
* A callback allowing for custom logic before an actual merge starts.
*/
protected void beforeMerge(OnGoingMerge merge) {

}
protected void beforeMerge(OnGoingMerge merge) {}

/**
* A callback allowing for custom logic before an actual merge starts.
*/
protected void afterMerge(OnGoingMerge merge) {

}
protected void afterMerge(OnGoingMerge merge) {}

@Override
public MergeScheduler clone() {
// Lucene IW makes a clone internally but since we hold on to this instance
// the clone will just be the identity.
return this;
}

@Override
protected boolean maybeStall(IndexWriter writer) {
// Don't stall here, because we do our own index throttling (in InternalEngine.IndexThrottle) when merges can't keep up
return true;
}

@Override
protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
MergeThread thread = super.getMergeThread(writer, merge);
thread.setName(EsExecutors.threadName(indexSettings, "[" + shardId.index().name() + "][" + shardId.id() + "]: " + thread.getName()));
return thread;
}

MergeStats stats() {
final MergeStats mergeStats = new MergeStats();
mergeStats.add(totalMerges.count(), totalMerges.sum(), totalMergesNumDocs.count(), totalMergesSizeInBytes.count(),
currentMerges.count(), currentMergesNumDocs.count(), currentMergesSizeInBytes.count(),
totalMergeStoppedTime.count(),
totalMergeThrottledTime.count(),
config.isAutoThrottle() ? getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY);
return mergeStats;
}

void refreshConfig() {
if (this.getMaxMergeCount() != config.getMaxMergeCount() || this.getMaxThreadCount() != config.getMaxThreadCount()) {
this.setMaxMergesAndThreads(config.getMaxMergeCount(), config.getMaxThreadCount());
}
boolean isEnabled = getIORateLimitMBPerSec() != Double.POSITIVE_INFINITY;
if (config.isAutoThrottle() && isEnabled == false) {
enableAutoIOThrottle();
} else if (config.isAutoThrottle() == false && isEnabled){
disableAutoIOThrottle();
}
}

}
8 changes: 8 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Expand Up @@ -45,10 +45,12 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.fielddata.ShardFieldData;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -147,6 +149,10 @@ public final EngineConfig config() {

protected abstract SegmentInfos getLastCommittedSegmentInfos();

public MergeStats getMergeStats() {
return new MergeStats();
}

/** A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
* is enabled
Expand Down Expand Up @@ -1186,4 +1192,6 @@ public int hashCode() {
return Arrays.hashCode(id);
}
}

public void onSettingsChanged() {}
}
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.similarities.Similarity;
Expand All @@ -34,7 +35,6 @@
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
Expand Down Expand Up @@ -70,7 +70,7 @@ public final class EngineConfig {
private final Store store;
private final SnapshotDeletionPolicy deletionPolicy;
private final MergePolicy mergePolicy;
private final MergeSchedulerProvider mergeScheduler;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final Analyzer analyzer;
private final Similarity similarity;
private final CodecService codecService;
Expand Down Expand Up @@ -142,7 +142,7 @@ public final class EngineConfig {
*/
public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService,
IndexSettingsService indexSettingsService, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
MergePolicy mergePolicy, MergeSchedulerProvider mergeScheduler, Analyzer analyzer,
MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache filterCache, QueryCachingPolicy filterCachingPolicy, TranslogConfig translogConfig) {
this.shardId = shardId;
Expand All @@ -153,7 +153,7 @@ public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService
this.store = store;
this.deletionPolicy = deletionPolicy;
this.mergePolicy = mergePolicy;
this.mergeScheduler = mergeScheduler;
this.mergeSchedulerConfig = mergeSchedulerConfig;
this.analyzer = analyzer;
this.similarity = similarity;
this.codecService = codecService;
Expand Down Expand Up @@ -347,11 +347,10 @@ public MergePolicy getMergePolicy() {
}

/**
* Returns the {@link org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider} used to obtain
* a {@link org.apache.lucene.index.MergeScheduler} for the engines {@link org.apache.lucene.index.IndexWriter}
* Returns the {@link MergeSchedulerConfig}
*/
public MergeSchedulerProvider getMergeScheduler() {
return mergeScheduler;
public MergeSchedulerConfig getMergeSchedulerConfig() {
return mergeSchedulerConfig;
}

/**
Expand Down

0 comments on commit 440580d

Please sign in to comment.