Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shards with heavy indexing should get more of the indexing buffer #14121

Merged
merged 24 commits into from Jan 12, 2016
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b3357f0
a start
mikemccand Oct 14, 2015
6ae8ca9
put back active/inactive logic, for sync'd flush
mikemccand Oct 14, 2015
1b9e9ed
fix failing test
mikemccand Oct 14, 2015
77c2445
don't call IMC.forceCheck when going active; remove nocommit/sops
mikemccand Oct 14, 2015
4639657
Merge branch 'master' into fair_indexing_buffers
mikemccand Oct 14, 2015
0a2a7f2
set active to true on indexing ops
mikemccand Oct 14, 2015
c66b05d
force check after X bytes indexed
mikemccand Nov 5, 2015
99e328c
remove 'white lie' and tracking refreshing bytes explicitly
mikemccand Dec 14, 2015
27d8509
merged master
mikemccand Dec 15, 2015
86a0dd0
use IW.flush to move bytes to disk
mikemccand Dec 15, 2015
330bc5b
trigger index throttling if writing indexing buffers to disk can't ke…
mikemccand Dec 16, 2015
52c092e
pull out constant for default 5 minute idle setting
mikemccand Dec 16, 2015
ed5c0e7
factor out exception handling cases
mikemccand Dec 16, 2015
319dc8c
remove dead code; get one test working again; fix docs; remove nocommits
mikemccand Dec 16, 2015
cbb6463
add throttling test case
mikemccand Dec 17, 2015
485f417
a few cleanups
mikemccand Dec 17, 2015
99d6ec5
fold in feedback
mikemccand Jan 5, 2016
1d46a00
move async-ness upwards
mikemccand Jan 5, 2016
5f4afe8
remove nocommits; fix test case
mikemccand Jan 6, 2016
3744fb9
merge master
mikemccand Jan 6, 2016
db832cc
improve logging messages a bit
mikemccand Jan 11, 2016
f3de778
merge master
mikemccand Jan 11, 2016
5e7144f
IMC is now just another IndexingOperationListener
mikemccand Jan 11, 2016
07e8370
feedback
mikemccand Jan 11, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -231,7 +231,6 @@ private void registerBuiltinIndexSettings() {
registerIndexDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
registerIndexDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
registerIndexDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);
registerIndexDynamicSetting(EngineConfig.INDEX_VERSION_MAP_SIZE, Validator.BYTES_SIZE_OR_PERCENTAGE);
registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good! :)

registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME);
registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME);
Expand Down Expand Up @@ -324,4 +323,4 @@ protected void configure() {
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
}
}
}
Expand Up @@ -178,7 +178,7 @@ void refreshConfig() {
boolean isEnabled = getIORateLimitMBPerSec() != Double.POSITIVE_INFINITY;
if (config.isAutoThrottle() && isEnabled == false) {
enableAutoIOThrottle();
} else if (config.isAutoThrottle() == false && isEnabled){
} else if (config.isAutoThrottle() == false && isEnabled) {
disableAutoIOThrottle();
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Expand Up @@ -361,8 +361,8 @@ protected void writerSegmentStats(SegmentsStats stats) {
stats.addIndexWriterMaxMemoryInBytes(0);
}

/** How much heap Lucene's IndexWriter is using */
abstract public long indexWriterRAMBytesUsed();
/** How much heap is used that would be freed by a refresh */
abstract public long indexBufferRAMBytesUsed();

protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) {
ensureOpen();
Expand Down Expand Up @@ -460,7 +460,7 @@ public final boolean refreshNeeded() {
}

/**
* Refreshes the engine for new search operations to reflect the latest
* Synchronously refreshes the engine for new search operations to reflect the latest
* changes.
*/
public abstract void refresh(String source) throws EngineException;
Expand Down
Expand Up @@ -54,9 +54,7 @@ public final class EngineConfig {
private final ShardId shardId;
private final TranslogRecoveryPerformer translogRecoveryPerformer;
private final Settings indexSettings;
private volatile ByteSizeValue indexingBufferSize;
private volatile ByteSizeValue versionMapSize;
private volatile String versionMapSizeSetting;
private final ByteSizeValue indexingBufferSize;
private volatile boolean compoundOnFlush = true;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private volatile boolean enableGcDeletes = true;
Expand Down Expand Up @@ -96,21 +94,16 @@ public final class EngineConfig {
public static final String INDEX_CODEC_SETTING = "index.codec";

/**
* The maximum size the version map should grow to before issuing a refresh. Can be an absolute value or a percentage of
* the current index memory buffer (defaults to 25%)
* Index setting to control the index buffer size.
* This setting is <b>not</b> realtime updateable.
*/
public static final String INDEX_VERSION_MAP_SIZE = "index.version_map_size";


/** if set to true the engine will start even if the translog id in the commit point can not be found */
public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog";


public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);

public static final String DEFAULT_VERSION_MAP_SIZE = "25%";

private static final String DEFAULT_CODEC_NAME = "default";
private TranslogConfig translogConfig;
private boolean create = false;
Expand All @@ -136,65 +129,24 @@ public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService
this.similarity = similarity;
this.codecService = codecService;
this.failedEngineListener = failedEngineListener;
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
// We start up inactive and rely on IndexingMemoryController to give us our fair share once we start indexing:
indexingBufferSize = IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER;
gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis();
versionMapSizeSetting = indexSettings.get(INDEX_VERSION_MAP_SIZE, DEFAULT_VERSION_MAP_SIZE);
updateVersionMapSize();
this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
codecName = indexSettings.get(INDEX_CODEC_SETTING, DEFAULT_CODEC_NAME);
// We give IndexWriter a huge buffer, so it won't flush on its own. Instead, IndexingMemoryController periodically checks
// and refreshes the most heap-consuming shards when total indexing heap usage is too high:
indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB);
gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, DEFAULT_GC_DELETES).millis();
this.translogRecoveryPerformer = translogRecoveryPerformer;
this.forceNewTranslog = indexSettings.getAsBoolean(INDEX_FORCE_NEW_TRANSLOG, false);
this.queryCache = queryCache;
this.queryCachingPolicy = queryCachingPolicy;
this.translogConfig = translogConfig;
}

/** updates {@link #versionMapSize} based on current setting and {@link #indexingBufferSize} */
private void updateVersionMapSize() {
if (versionMapSizeSetting.endsWith("%")) {
double percent = Double.parseDouble(versionMapSizeSetting.substring(0, versionMapSizeSetting.length() - 1));
versionMapSize = new ByteSizeValue((long) ((double) indexingBufferSize.bytes() * (percent / 100)));
} else {
versionMapSize = ByteSizeValue.parseBytesSizeValue(versionMapSizeSetting, INDEX_VERSION_MAP_SIZE);
}
}

/**
* Settings the version map size that should trigger a refresh. See {@link #INDEX_VERSION_MAP_SIZE} for details.
*/
public void setVersionMapSizeSetting(String versionMapSizeSetting) {
this.versionMapSizeSetting = versionMapSizeSetting;
updateVersionMapSize();
}

/**
* current setting for the version map size that should trigger a refresh. See {@link #INDEX_VERSION_MAP_SIZE} for details.
*/
public String getVersionMapSizeSetting() {
return versionMapSizeSetting;
}

/** if true the engine will start even if the translog id in the commit point can not be found */
public boolean forceNewTranslog() {
return forceNewTranslog;
}

/**
* returns the size of the version map that should trigger a refresh
*/
public ByteSizeValue getVersionMapSize() {
return versionMapSize;
}

/**
* Sets the indexing buffer
*/
public void setIndexingBufferSize(ByteSizeValue indexingBufferSize) {
this.indexingBufferSize = indexingBufferSize;
updateVersionMapSize();
}

/**
* Enables / disables gc deletes
*
Expand Down
Expand Up @@ -102,6 +102,8 @@ public class InternalEngine extends Engine {

private volatile SegmentInfos lastCommittedSegmentInfos;

private volatile boolean refreshing;

private final IndexThrottle throttle;

public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException {
Expand Down Expand Up @@ -296,7 +298,6 @@ private SearcherManager createSearcherManager() throws EngineException {
private void updateIndexWriterSettings() {
try {
final LiveIndexWriterConfig iwc = indexWriter.getConfig();
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().mbFrac());
iwc.setUseCompoundFile(engineConfig.isCompoundOnFlush());
} catch (AlreadyClosedException ex) {
// ignore
Expand Down Expand Up @@ -347,7 +348,6 @@ public boolean index(Index index) {
maybeFailEngine("index", t);
throw new IndexFailedEngineException(shardId, index.type(), index.id(), t);
}
checkVersionMapRefresh();
return created;
}

Expand Down Expand Up @@ -412,33 +412,6 @@ private boolean innerIndex(Index index) throws IOException {
}
}

/**
* Forces a refresh if the versionMap is using too much RAM
*/
private void checkVersionMapRefresh() {
if (versionMap.ramBytesUsedForRefresh() > config().getVersionMapSize().bytes() && versionMapRefreshPending.getAndSet(true) == false) {
try {
if (isClosed.get()) {
// no point...
return;
}
// Now refresh to clear versionMap:
engineConfig.getThreadPool().executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
@Override
public void run() {
try {
refresh("version_table_full");
} catch (EngineClosedException ex) {
// ignore
}
}
});
} catch (EsRejectedExecutionException ex) {
// that is fine too.. we might be shutting down
}
}
}

@Override
public void delete(Delete delete) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
Expand All @@ -451,7 +424,6 @@ public void delete(Delete delete) throws EngineException {
}

maybePruneDeletedTombstones();
checkVersionMapRefresh();
}

private void maybePruneDeletedTombstones() {
Expand Down Expand Up @@ -517,6 +489,7 @@ public void refresh(String source) throws EngineException {
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
refreshing = true;
searcherManager.maybeRefreshBlocking();
} catch (AlreadyClosedException e) {
ensureOpen();
Expand All @@ -526,6 +499,8 @@ public void refresh(String source) throws EngineException {
} catch (Throwable t) {
failEngine("refresh failed", t);
throw new RefreshFailedEngineException(shardId, t);
} finally {
refreshing = false;
}

// TODO: maybe we should just put a scheduled job in threadPool?
Expand Down Expand Up @@ -783,8 +758,12 @@ protected final void writerSegmentStats(SegmentsStats stats) {
}

@Override
public long indexWriterRAMBytesUsed() {
return indexWriter.ramBytesUsed();
public long indexBufferRAMBytesUsed() {
if (refreshing) {
return 0;
} else {
return indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

watch out this can throw a AlreadyClosedException! I really wonder why we need the refreshing invariant... can you explain and add a comment?

}
}

@Override
Expand Down Expand Up @@ -1107,8 +1086,6 @@ private void commitIndexWriter(IndexWriter writer, Translog translog) throws IOE
public void onSettingsChanged() {
mergeScheduler.refreshConfig();
updateIndexWriterSettings();
// config().getVersionMapSize() may have changed:
checkVersionMapRefresh();
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
maybePruneDeletedTombstones();
}
Expand Down
Expand Up @@ -234,8 +234,8 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
}

@Override
public long indexWriterRAMBytesUsed() {
// No IndexWriter
public long indexBufferRAMBytesUsed() {
// No IndexWriter nor version map
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
}
}