Skip to content

Commit

Permalink
Engine: update index buffer size during recovery and allow configurin…
Browse files Browse the repository at this point in the history
…g version map size.

To support real time gets, the engine keeps an in-memory map of recently index docs and their location in the translog. This is needed until documents become visible in Lucene. With 1.3.0, we have improved this map and made tightly integrated with refresh cycles in Lucene in order to keep the memory signature to a bare minimum. On top of that, if the version map grows above a 25% of the index buffer size, we proactively refresh in order to be able to trim the version map back to 0 (see elastic#6363) . In the same release, we have fixed an issue where an update to the indexing buffer could result in an unwanted exception during recovery (elastic#6667) . We solved this by waiting with updating the size of the index buffer until the shard was fully recovered. Sadly this two together can have a negative impact on the speed of translog recovery.

During the second phase of recovery we replay all operations that happened on the shard during the first phase of copying files. In parallel we start indexing new documents into the new created shard. At some point (phase 3 in the recovery), the translog replay starts to send operation which have already been indexed into the shard. The version map is crucial in being able to quickly detect this and skip the replayed operations, without hitting lucene. Sadly elastic#6667 (only updating the index memory buffer once shard is started) means that a shard is using the default 64MB for it's index buffer, and thus only 16MB (25%) for the version map. This much less then the default index buffer size 10% of machine memory (shared between shards).

Since we don't flush anymore when updating the memory buffer, we can remove elastic#6667 and update recovering shards as well. Also, we make the version map max size configurable, with the same default of 25% of the current index buffer.

Closes elastic#10046
  • Loading branch information
bleskes committed Mar 15, 2015
1 parent 0e91c1a commit 75ddd83
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 20 deletions.
38 changes: 38 additions & 0 deletions src/main/java/org/elasticsearch/cluster/settings/Validator.java
Expand Up @@ -205,6 +205,44 @@ public String validate(String setting, String value) {
}
};

public static final Validator PERCENTAGE = new Validator() {
@Override
public String validate(String setting, String value) {
try {
if (value == null) {
return "the value of " + setting + " can not be null";
}
if (!value.endsWith("%")) {
return "the value [" + value + "] for " + setting + " must end with %";
}
final double asDouble = Double.parseDouble(value.substring(0, value.length() - 1));
if (asDouble < 0.0 || asDouble > 100.0) {
return "the value [" + value + "] for " + setting + " must be a percentage between 0% and 100%";
}
} catch (NumberFormatException ex) {
return ex.getMessage();
}
return null;
}
};


public static final Validator BYTES_SIZE_OR_PERCENTAGE = new Validator() {
@Override
public String validate(String setting, String value) {
String byteSize = BYTES_SIZE.validate(setting, value);
if (byteSize != null) {
String percentage = PERCENTAGE.validate(setting, value);
if (percentage == null) {
return null;
}
return percentage + " or be a valid bytes size value, like [16mb]";
}
return null;
}
};


public static final Validator MEMORY_SIZE = new Validator() {
@Override
public String validate(String setting, String value) {
Expand Down
47 changes: 46 additions & 1 deletion src/main/java/org/elasticsearch/index/engine/EngineConfig.java
Expand Up @@ -23,7 +23,6 @@
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.search.similarities.Similarity;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -53,6 +52,8 @@ public final class EngineConfig {
private volatile boolean failOnMergeFailure = true;
private volatile boolean failEngineOnCorruption = true;
private volatile ByteSizeValue indexingBufferSize;
private volatile ByteSizeValue versionMapSize;
private volatile String versionMapSizeSetting;
private final int indexConcurrency;
private volatile boolean compoundOnFlush = true;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
Expand Down Expand Up @@ -131,12 +132,20 @@ public final class EngineConfig {
*/
public static final String INDEX_CHECKSUM_ON_MERGE = "index.checksum_on_merge";

/**
* The maximum size the version map should grow to before issuing a refresh. Can be an absolute value or a percentage of
* the current index memory buffer (defaults to 25%)
*/
public static final String INDEX_VERSION_MAP_SIZE = "index.version_map_size";


public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
public static final ByteSizeValue DEFAUTL_INDEX_BUFFER_SIZE = new ByteSizeValue(64, ByteSizeUnit.MB);
public static final ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb");

public static final String DEFAULT_VERSION_MAP_SIZE = "25%";

private static final String DEFAULT_CODEC_NAME = "default";


Expand Down Expand Up @@ -167,13 +176,49 @@ public EngineConfig(ShardId shardId, boolean optimizeAutoGenerateId, ThreadPool
failEngineOnCorruption = indexSettings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION_SETTING, true);
failOnMergeFailure = indexSettings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE_SETTING, true);
gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis();
versionMapSizeSetting = indexSettings.get(INDEX_VERSION_MAP_SIZE, DEFAULT_VERSION_MAP_SIZE);
updateVersionMapSize();
}

/** updates {@link #versionMapSize} based on current setting and {@link #indexingBufferSize} */
private void updateVersionMapSize() {
if (versionMapSizeSetting.endsWith("%")) {
double percent = Double.parseDouble(versionMapSizeSetting.substring(0, versionMapSizeSetting.length() - 1));
versionMapSize = new ByteSizeValue((long) (((double) indexingBufferSize.bytes() * (percent / 100))));
} else {
versionMapSize = ByteSizeValue.parseBytesSizeValue(versionMapSizeSetting);
}
}

/**
* Settings the version map size that should trigger a refresh. See {@link #INDEX_VERSION_MAP_SIZE} for details.
*/
public void setVersionMapSizeSetting(String versionMapSizeSetting) {
this.versionMapSizeSetting = versionMapSizeSetting;
updateVersionMapSize();
}

/**
* current setting for the version map size that should trigger a refresh. See {@link #INDEX_VERSION_MAP_SIZE} for details.
*/
public String getVersionMapSizeSetting() {
return versionMapSizeSetting;
}


/**
* returns the size of the version map that should trigger a refresh
*/
public ByteSizeValue getVersionMapSize() {
return versionMapSize;
}

/**
* Sets the indexing buffer
*/
public void setIndexingBufferSize(ByteSizeValue indexingBufferSize) {
this.indexingBufferSize = indexingBufferSize;
updateVersionMapSize();
}

/**
Expand Down
Expand Up @@ -26,7 +26,8 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.*;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
Expand Down Expand Up @@ -370,11 +371,10 @@ public void index(Index index) throws EngineException {
}

/**
* Forces a refresh if the versionMap is using too much RAM (currently > 25% of IndexWriter's RAM buffer).
* Forces a refresh if the versionMap is using too much RAM
*/
private void checkVersionMapRefresh() {
// TODO: we force refresh when versionMap is using > 25% of IW's RAM buffer; should we make this separately configurable?
if (versionMap.ramBytesUsedForRefresh() > 0.25 * engineConfig.getIndexingBufferSize().bytes() && versionMapRefreshPending.getAndSet(true) == false) {
if (versionMap.ramBytesUsedForRefresh() > config().getVersionMapSize().bytes() && versionMapRefreshPending.getAndSet(true) == false) {
try {
if (isClosed.get()) {
// no point...
Expand Down
Expand Up @@ -39,9 +39,9 @@
import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.indices.IndicesWarmer;

/**
*/
Expand Down Expand Up @@ -87,6 +87,7 @@ public IndexDynamicSettingsModule() {
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, Validator.BOOLEAN);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_CHECKSUM_ON_MERGE, Validator.BOOLEAN);
indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_VERSION_MAP_SIZE, Validator.BYTES_SIZE_OR_PERCENTAGE);
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME);
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME);
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -962,7 +962,8 @@ public void addFailedEngineListener(Engine.FailedEngineListener failedEngineList
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
ByteSizeValue preValue = config.getIndexingBufferSize();
config.setIndexingBufferSize(shardIndexingBufferSize);
if (preValue.bytes() != shardIndexingBufferSize.bytes()) {
// update engine if it is already started.
if (preValue.bytes() != shardIndexingBufferSize.bytes() && engineUnsafe() != null) {
// its inactive, make sure we do a refresh / full IW flush in this case, since the memory
// changes only after a "data" change has happened to the writer
// the index writer lazily allocates memory and a refresh will clean it all up.
Expand Down Expand Up @@ -1050,6 +1051,10 @@ public void onRefreshSettings(Settings settings) {
config.setChecksumOnMerge(checksumOnMerge);
change = true;
}
final String versionMapSize = settings.get(EngineConfig.INDEX_VERSION_MAP_SIZE, config.getVersionMapSizeSetting());
if (config.getVersionMapSizeSetting().equals(versionMapSize) == false) {
config.setVersionMapSizeSetting(versionMapSize);
}
}
if (change) {
refresh("apply settings");
Expand Down Expand Up @@ -1198,13 +1203,17 @@ private void checkIndex(boolean throwException) throws IndexShardException {
}

public Engine engine() {
Engine engine = this.currentEngineReference.get();
Engine engine = engineUnsafe();
if (engine == null) {
throw new EngineClosedException(shardId);
}
return engine;
}

protected Engine engineUnsafe() {
return this.currentEngineReference.get();
}

class ShardEngineFailListener implements Engine.FailedEngineListener {
private final CopyOnWriteArrayList<Engine.FailedEngineListener> delegates = new CopyOnWriteArrayList<>();

Expand Down
Expand Up @@ -28,14 +28,13 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.monitor.jvm.JvmInfo;
Expand Down Expand Up @@ -65,7 +64,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin

private volatile ScheduledFuture scheduler;

private static final EnumSet<IndexShardState> CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
private static final EnumSet<IndexShardState> CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(
IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);

@Inject
public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
Expand Down
Expand Up @@ -161,6 +161,7 @@ public void run() {

long totalRecoveryTime = 0;
long startTime = System.currentTimeMillis();
long[] recoveryTimes = new long[3];
for (int iteration = 0; iteration < 3; iteration++) {
logger.info("--> removing replicas");
client1.admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(IndexMetaData.SETTING_NUMBER_OF_REPLICAS + ": 0").get();
Expand All @@ -170,7 +171,9 @@ public void run() {
client1.admin().cluster().prepareHealth(INDEX_NAME).setWaitForGreenStatus().setTimeout("15m").get();
long recoveryTime = System.currentTimeMillis() - recoveryStart;
totalRecoveryTime += recoveryTime;
recoveryTimes[iteration] = recoveryTime;
logger.info("--> recovery done in [{}]", new TimeValue(recoveryTime));

// sleep some to let things clean up
Thread.sleep(10000);
}
Expand All @@ -185,7 +188,9 @@ public void run() {

backgroundLogger.join();

logger.info("average doc/s [{}], average relocation time [{}]", (endDocIndexed - startDocIndexed) * 1000.0 / totalTime, new TimeValue(totalRecoveryTime / 3));
logger.info("average doc/s [{}], average relocation time [{}], taking [{}], [{}], [{}]", (endDocIndexed - startDocIndexed) * 1000.0 / totalTime, new TimeValue(totalRecoveryTime / 3),
TimeValue.timeValueMillis(recoveryTimes[0]), TimeValue.timeValueMillis(recoveryTimes[1]), TimeValue.timeValueMillis(recoveryTimes[2])
);

client1.close();
node1.close();
Expand Down
Expand Up @@ -22,9 +22,7 @@
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;

/**
*
Expand Down Expand Up @@ -83,6 +81,24 @@ public void testValidators() throws Exception {
assertThat(Validator.POSITIVE_INTEGER.validate("", "0"), notNullValue());
assertThat(Validator.POSITIVE_INTEGER.validate("", "-1"), notNullValue());
assertThat(Validator.POSITIVE_INTEGER.validate("", "10.2"), notNullValue());

assertThat(Validator.PERCENTAGE.validate("", "asdasd"), notNullValue());
assertThat(Validator.PERCENTAGE.validate("", "-1"), notNullValue());
assertThat(Validator.PERCENTAGE.validate("", "20"), notNullValue()); // we expect 20%
assertThat(Validator.PERCENTAGE.validate("", "-1%"), notNullValue());
assertThat(Validator.PERCENTAGE.validate("", "101%"), notNullValue());
assertThat(Validator.PERCENTAGE.validate("", "100%"), nullValue());
assertThat(Validator.PERCENTAGE.validate("", "99%"), nullValue());
assertThat(Validator.PERCENTAGE.validate("", "0%"), nullValue());

assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "asdasd"), notNullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "20"), nullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "20mb"), nullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "-1%"), notNullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "101%"), notNullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "100%"), nullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "99%"), nullValue());
assertThat(Validator.BYTES_SIZE_OR_PERCENTAGE.validate("", "0%"), nullValue());
}

@Test
Expand Down

0 comments on commit 75ddd83

Please sign in to comment.