diff --git a/src/main/java/org/elasticsearch/common/unit/ByteSizeValue.java b/src/main/java/org/elasticsearch/common/unit/ByteSizeValue.java index 539b25de9b258..445da29995732 100644 --- a/src/main/java/org/elasticsearch/common/unit/ByteSizeValue.java +++ b/src/main/java/org/elasticsearch/common/unit/ByteSizeValue.java @@ -238,10 +238,7 @@ public boolean equals(Object o) { ByteSizeValue sizeValue = (ByteSizeValue) o; - if (size != sizeValue.size) return false; - if (sizeUnit != sizeValue.sizeUnit) return false; - - return true; + return bytes() == sizeValue.bytes(); } @Override diff --git a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index f1555c85e9197..e4962f0e9e3ec 100644 --- a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -19,8 +19,8 @@ package org.elasticsearch.indices.memory; -import com.google.common.collect.Lists; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -48,6 +48,44 @@ */ public class IndexingMemoryController extends AbstractLifecycleComponent { + + /** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */ + public static final String INDEX_BUFFER_SIZE_SETTING = "indices.memory.index_buffer_size"; + + /** Only applies when indices.memory.index_buffer_size is a %, to set a floor on the actual size in bytes (default: 48 MB). */ + public static final String MIN_INDEX_BUFFER_SIZE_SETTING = "indices.memory.min_index_buffer_size"; + + /** Only applies when indices.memory.index_buffer_size is a %, to set a ceiling on the actual size in bytes (default: not set). */ + public static final String MAX_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_index_buffer_size"; + + /** Sets a floor on the per-shard index buffer size (default: 4 MB). */ + public static final String MIN_SHARD_INDEX_BUFFER_SIZE_SETTING = "indices.memory.min_shard_index_buffer_size"; + + /** Sets a ceiling on the per-shard index buffer size (default: 512 MB). */ + public static final String MAX_SHARD_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_shard_index_buffer_size"; + + /** How much heap (% or bytes) we will share across all actively indexing shards for the translog buffer (default: 1%). */ + public static final String TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.translog_buffer_size"; + + /** Only applies when indices.memory.translog_buffer_size is a %, to set a floor on the actual size in bytes (default: 256 KB). */ + public static final String MIN_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.min_translog_buffer_size"; + + /** Only applies when indices.memory.translog_buffer_size is a %, to set a ceiling on the actual size in bytes (default: not set). */ + public static final String MAX_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.max_translog_buffer_size"; + + /** Sets a floor on the per-shard translog buffer size (default: 2 KB). */ + public static final String MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.min_shard_translog_buffer_size"; + + /** Sets a ceiling on the per-shard translog buffer size (default: 64 KB). */ + public static final String MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.max_shard_translog_buffer_size"; + + /** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */ + public static final String SHARD_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time"; + + /** How frequently we check shards to find inactive ones (default: 30 seconds). */ + public static final String SHARD_INACTIVE_INTERVAL_TIME_SETTING = "indices.memory.interval"; + + private final ThreadPool threadPool; private final IndicesService indicesService; @@ -67,19 +105,26 @@ public class IndexingMemoryController extends AbstractLifecycleComponent CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of( IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); + private final ShardsIndicesStatusChecker statusChecker; + @Inject public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) { + this(settings, threadPool, indicesService, JvmInfo.jvmInfo().getMem().getHeapMax().bytes()); + } + + // for testing + protected IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService, long jvmMemoryInBytes) { super(settings); this.threadPool = threadPool; this.indicesService = indicesService; ByteSizeValue indexingBuffer; - String indexingBufferSetting = componentSettings.get("index_buffer_size", "10%"); + String indexingBufferSetting = settings.get(INDEX_BUFFER_SIZE_SETTING, "10%"); if (indexingBufferSetting.endsWith("%")) { double percent = Double.parseDouble(indexingBufferSetting.substring(0, indexingBufferSetting.length() - 1)); - indexingBuffer = new ByteSizeValue((long) (((double) JvmInfo.jvmInfo().mem().heapMax().bytes()) * (percent / 100))); - ByteSizeValue minIndexingBuffer = componentSettings.getAsBytesSize("min_index_buffer_size", new ByteSizeValue(48, ByteSizeUnit.MB)); - ByteSizeValue maxIndexingBuffer = componentSettings.getAsBytesSize("max_index_buffer_size", null); + indexingBuffer = new ByteSizeValue((long) (((double) jvmMemoryInBytes) * (percent / 100))); + ByteSizeValue minIndexingBuffer = settings.getAsBytesSize(MIN_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(48, ByteSizeUnit.MB)); + ByteSizeValue maxIndexingBuffer = settings.getAsBytesSize(MAX_INDEX_BUFFER_SIZE_SETTING, null); if (indexingBuffer.bytes() < minIndexingBuffer.bytes()) { indexingBuffer = minIndexingBuffer; @@ -91,18 +136,17 @@ public IndexingMemoryController(Settings settings, ThreadPool threadPool, Indice indexingBuffer = ByteSizeValue.parseBytesSizeValue(indexingBufferSetting, null); } this.indexingBuffer = indexingBuffer; - this.minShardIndexBufferSize = componentSettings.getAsBytesSize("min_shard_index_buffer_size", new ByteSizeValue(4, ByteSizeUnit.MB)); + this.minShardIndexBufferSize = settings.getAsBytesSize(MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(4, ByteSizeUnit.MB)); // LUCENE MONITOR: Based on this thread, currently (based on Mike), having a large buffer does not make a lot of sense: https://issues.apache.org/jira/browse/LUCENE-2324?focusedCommentId=13005155&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13005155 - this.maxShardIndexBufferSize = componentSettings.getAsBytesSize("max_shard_index_buffer_size", new ByteSizeValue(512, ByteSizeUnit.MB)); + this.maxShardIndexBufferSize = settings.getAsBytesSize(MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(512, ByteSizeUnit.MB)); ByteSizeValue translogBuffer; - String translogBufferSetting = componentSettings.get("translog_buffer_size", "1%"); + String translogBufferSetting = settings.get(TRANSLOG_BUFFER_SIZE_SETTING, "1%"); if (translogBufferSetting.endsWith("%")) { double percent = Double.parseDouble(translogBufferSetting.substring(0, translogBufferSetting.length() - 1)); - translogBuffer = new ByteSizeValue((long) (((double) JvmInfo.jvmInfo().mem().heapMax().bytes()) * (percent / 100))); - ByteSizeValue minTranslogBuffer = componentSettings.getAsBytesSize("min_translog_buffer_size", new ByteSizeValue(256, ByteSizeUnit.KB)); - ByteSizeValue maxTranslogBuffer = componentSettings.getAsBytesSize("max_translog_buffer_size", null); - + translogBuffer = new ByteSizeValue((long) (((double) jvmMemoryInBytes) * (percent / 100))); + ByteSizeValue minTranslogBuffer = settings.getAsBytesSize(MIN_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(256, ByteSizeUnit.KB)); + ByteSizeValue maxTranslogBuffer = settings.getAsBytesSize(MAX_TRANSLOG_BUFFER_SIZE_SETTING, null); if (translogBuffer.bytes() < minTranslogBuffer.bytes()) { translogBuffer = minTranslogBuffer; } @@ -113,21 +157,26 @@ public IndexingMemoryController(Settings settings, ThreadPool threadPool, Indice translogBuffer = ByteSizeValue.parseBytesSizeValue(translogBufferSetting, null); } this.translogBuffer = translogBuffer; - this.minShardTranslogBufferSize = componentSettings.getAsBytesSize("min_shard_translog_buffer_size", new ByteSizeValue(2, ByteSizeUnit.KB)); - this.maxShardTranslogBufferSize = componentSettings.getAsBytesSize("max_shard_translog_buffer_size", new ByteSizeValue(64, ByteSizeUnit.KB)); + this.minShardTranslogBufferSize = settings.getAsBytesSize(MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(2, ByteSizeUnit.KB)); + this.maxShardTranslogBufferSize = settings.getAsBytesSize(MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(64, ByteSizeUnit.KB)); - this.inactiveTime = componentSettings.getAsTime("shard_inactive_time", TimeValue.timeValueMinutes(5)); + this.inactiveTime = settings.getAsTime(SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5)); // we need to have this relatively small to move a shard from inactive to active fast (enough) - this.interval = componentSettings.getAsTime("interval", TimeValue.timeValueSeconds(30)); - - logger.debug("using index_buffer_size [{}], with min_shard_index_buffer_size [{}], max_shard_index_buffer_size [{}], shard_inactive_time [{}]", this.indexingBuffer, this.minShardIndexBufferSize, this.maxShardIndexBufferSize, this.inactiveTime); - + this.interval = settings.getAsTime(SHARD_INACTIVE_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(30)); + + this.statusChecker = new ShardsIndicesStatusChecker(); + logger.debug("using indexing buffer size [{}], with {} [{}], {} [{}], {} [{}], {} [{}]", + this.indexingBuffer, + MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, this.minShardIndexBufferSize, + MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, this.maxShardIndexBufferSize, + SHARD_INACTIVE_TIME_SETTING, this.inactiveTime, + SHARD_INACTIVE_INTERVAL_TIME_SETTING, this.interval); } @Override protected void doStart() throws ElasticsearchException { // its fine to run it on the scheduler thread, no busy work - this.scheduler = threadPool.scheduleWithFixedDelay(new ShardsIndicesStatusChecker(), interval); + this.scheduler = threadPool.scheduleWithFixedDelay(statusChecker, interval); } @Override @@ -148,6 +197,89 @@ public ByteSizeValue indexingBufferSize() { return indexingBuffer; } + /** + * returns the current budget for the total amount of translog buffers of + * active shards on this node + */ + public ByteSizeValue translogBufferSize() { + return translogBuffer; + } + + + protected List availableShards() { + ArrayList list = new ArrayList<>(); + + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + if (shardAvailable(indexShard)) { + list.add(indexShard.shardId()); + } + } + } + return list; + } + + /** returns true if shard exists and is availabe for updates */ + protected boolean shardAvailable(ShardId shardId) { + return shardAvailable(getShard(shardId)); + } + + /** returns true if shard exists and is availabe for updates */ + protected boolean shardAvailable(@Nullable IndexShard shard) { + return shard != null && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state()); + } + + /** gets an {@link IndexShard} instance for the given shard. returns null if the shard doesn't exist */ + protected IndexShard getShard(ShardId shardId) { + IndexService indexService = indicesService.indexService(shardId.index().name()); + if (indexService != null) { + IndexShard indexShard = indexService.shard(shardId.id()); + return indexShard; + } + return null; + } + + protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { + final IndexShard shard = getShard(shardId); + if (shard != null) { + try { + shard.updateBufferSize(shardIndexingBufferSize, shardTranslogBufferSize); + } catch (EngineClosedException e) { + // ignore + } catch (FlushNotAllowedEngineException e) { + // ignore + } catch (Exception e) { + logger.warn("failed to set shard {} index buffer to [{}]", shardId, shardIndexingBufferSize); + } + } + } + + + /** returns the current translog status (generation id + ops) for the given shard id. Returns null if unavailable. */ + protected ShardIndexingStatus getTranslogStatus(ShardId shardId) { + final IndexShard indexShard = getShard(shardId); + if (indexShard == null) { + return null; + } + final Translog translog; + try { + translog = indexShard.translog(); + } catch (EngineClosedException e) { + // not ready yet to be checked for activity + return null; + } + + ShardIndexingStatus status = new ShardIndexingStatus(); + status.translogId = translog.currentId(); + status.translogNumberOfOperations = translog.estimatedNumberOfOperations(); + return status; + } + + // used for tests + void forceCheck() { + statusChecker.run(); + } + class ShardsIndicesStatusChecker implements Runnable { private final Map shardsIndicesStatus = new HashMap<>(); @@ -159,17 +291,10 @@ public void run() { changes.addAll(purgeDeletedAndClosedShards()); - final List activeToInactiveIndexingShards = Lists.newArrayList(); + final List activeToInactiveIndexingShards = new ArrayList<>(); final int activeShards = updateShardStatuses(changes, activeToInactiveIndexingShards); - for (IndexShard indexShard : activeToInactiveIndexingShards) { - // update inactive indexing buffer size - try { - indexShard.markAsInactive(); - } catch (EngineClosedException e) { - // ignore - } catch (FlushNotAllowedEngineException e) { - // ignore - } + for (ShardId indexShard : activeToInactiveIndexingShards) { + markShardAsInactive(indexShard); } if (!changes.isEmpty()) { calcAndSetShardBuffers(activeShards, "[" + changes + "]"); @@ -181,57 +306,40 @@ public void run() { * * @return the current count of active shards */ - private int updateShardStatuses(EnumSet changes, List activeToInactiveIndexingShards) { + private int updateShardStatuses(EnumSet changes, List activeToInactiveIndexingShards) { int activeShards = 0; - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - - if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state())) { - // not ready to be updated yet. - continue; - } - - final long time = threadPool.estimatedTimeInMillis(); + for (ShardId shardId : availableShards()) { + final ShardIndexingStatus currentStatus = getTranslogStatus(shardId); - Translog translog = indexShard.translog(); - ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); - if (status == null) { - status = new ShardIndexingStatus(); - shardsIndicesStatus.put(indexShard.shardId(), status); - changes.add(ShardStatusChangeType.ADDED); - } - // check if it is deemed to be inactive (sam translogId and numberOfOperations over a long period of time) - if (status.translogId == translog.currentId() && translog.estimatedNumberOfOperations() == status.translogNumberOfOperations) { - if (status.time == -1) { // first time - status.time = time; - } - // inactive? - if (status.activeIndexing) { - // mark it as inactive only if enough time has passed and there are no ongoing merges going on... - if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().getCurrent() == 0) { - // inactive for this amount of time, mark it - activeToInactiveIndexingShards.add(indexShard); - status.activeIndexing = false; - changes.add(ShardStatusChangeType.BECAME_INACTIVE); - logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER); - } - } - } else { - if (!status.activeIndexing) { - status.activeIndexing = true; - changes.add(ShardStatusChangeType.BECAME_ACTIVE); - logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id()); - } - status.time = -1; - } - status.translogId = translog.currentId(); - status.translogNumberOfOperations = translog.estimatedNumberOfOperations(); + if (currentStatus == null) { + // shard was closed.. + continue; + } - if (status.activeIndexing) { - activeShards++; + ShardIndexingStatus status = shardsIndicesStatus.get(shardId); + if (status == null) { + status = currentStatus; + shardsIndicesStatus.put(shardId, status); + changes.add(ShardStatusChangeType.ADDED); + } else { + final boolean lastActiveIndexing = status.activeIndexing; + status.updateWith(currentTimeInNanos(), currentStatus, inactiveTime.nanos()); + if (lastActiveIndexing && (status.activeIndexing == false)) { + activeToInactiveIndexingShards.add(shardId); + changes.add(ShardStatusChangeType.BECAME_INACTIVE); + logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", + shardId, + inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER); + } else if ((lastActiveIndexing == false) && status.activeIndexing) { + changes.add(ShardStatusChangeType.BECAME_ACTIVE); + logger.debug("marking shard {} as active indexing wise", shardId); } } + if (status.activeIndexing) { + activeShards++; + } } + return activeShards; } @@ -245,26 +353,10 @@ private EnumSet purgeDeletedAndClosedShards() { Iterator statusShardIdIterator = shardsIndicesStatus.keySet().iterator(); while (statusShardIdIterator.hasNext()) { - ShardId statusShardId = statusShardIdIterator.next(); - IndexService indexService = indicesService.indexService(statusShardId.getIndex()); - boolean remove = false; - try { - if (indexService == null) { - remove = true; - continue; - } - IndexShard indexShard = indexService.shard(statusShardId.id()); - if (indexShard == null) { - remove = true; - continue; - } - remove = !CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state()); - - } finally { - if (remove) { - changes.add(ShardStatusChangeType.DELETED); - statusShardIdIterator.remove(); - } + ShardId shardId = statusShardIdIterator.next(); + if (shardAvailable(shardId) == false) { + changes.add(ShardStatusChangeType.DELETED); + statusShardIdIterator.remove(); } } return changes; @@ -291,41 +383,80 @@ private void calcAndSetShardBuffers(int activeShards, String reason) { } logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShards, shardIndexingBufferSize, shardTranslogBufferSize); - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - IndexShardState state = indexShard.state(); - if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(state)) { - logger.trace("shard [{}] is not yet ready for index buffer update. index shard state: [{}]", indexShard.shardId(), state); - continue; - } - ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); - if (status == null || status.activeIndexing) { - try { - indexShard.updateBufferSize(shardIndexingBufferSize, shardTranslogBufferSize); - } catch (EngineClosedException e) { - // ignore - continue; - } catch (FlushNotAllowedEngineException e) { - // ignore - continue; - } catch (Exception e) { - logger.warn("failed to set shard {} index buffer to [{}]", indexShard.shardId(), shardIndexingBufferSize); - } - } + for (ShardId shardId : availableShards()) { + ShardIndexingStatus status = shardsIndicesStatus.get(shardId); + if (status == null || status.activeIndexing) { + updateShardBuffers(shardId, shardIndexingBufferSize, shardTranslogBufferSize); } } } } + protected long currentTimeInNanos() { + return System.nanoTime(); + } + + // update inactive indexing buffer size + protected void markShardAsInactive(ShardId shardId) { + String ignoreReason = null; + final IndexShard shard = getShard(shardId); + if (shard != null) { + try { + shard.markAsInactive(); + } catch (EngineClosedException e) { + // ignore + ignoreReason = "EngineClosedException"; + } catch (FlushNotAllowedEngineException e) { + // ignore + ignoreReason = "FlushNotAllowedEngineException"; + } + } else { + ignoreReason = "shard not found"; + } + if (ignoreReason != null) { + logger.trace("ignore [{}] while marking shard {} as inactive", ignoreReason, shardId); + } + } + private static enum ShardStatusChangeType { ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE } - static class ShardIndexingStatus { long translogId = -1; int translogNumberOfOperations = -1; boolean activeIndexing = true; - long time = -1; // contains the first time we saw this shard with no operations done on it + long idleSinceNanoTime = -1; // contains the first time we saw this shard with no operations done on it + + + /** update status based on a new sample. updates all internal variables */ + public void updateWith(long currentNanoTime, ShardIndexingStatus current, long inactiveNanoInterval) { + final boolean idle = (translogId == current.translogId && translogNumberOfOperations == current.translogNumberOfOperations); + if (activeIndexing && idle) { + // no indexing activity detected. + if (idleSinceNanoTime < 0) { + // first time we see this, start the clock. + idleSinceNanoTime = currentNanoTime; + } else if ((currentNanoTime - idleSinceNanoTime) > inactiveNanoInterval) { + // shard is inactive. mark it as such. + activeIndexing = false; + } + } else if (activeIndexing == false // we weren't indexing before + && idle == false // but we do now + && current.translogNumberOfOperations > 0 // but only if we're really sure - see note bellow + ) { + // since we sync flush once a shard becomes inactive, the translog id can change, however that + // doesn't mean the an indexing operation has happened. Note that if we're really unlucky and a flush happens + // immediately after an indexing operation we may not become active immediately. The following + // indexing operation will mark the shard as active, so it's OK. If that one doesn't come, we might as well stay + // inactive + + activeIndexing = true; + idleSinceNanoTime = -1; + } + + translogId = current.translogId; + translogNumberOfOperations = current.translogNumberOfOperations; + } } } diff --git a/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerUnitTests.java b/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerUnitTests.java new file mode 100644 index 0000000000000..4f79b745b93b0 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerUnitTests.java @@ -0,0 +1,286 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices.memory; + +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ElasticsearchTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; + +public class IndexingMemoryControllerUnitTests extends ElasticsearchTestCase { + + static class MockController extends IndexingMemoryController { + + final static ByteSizeValue INACTIVE = new ByteSizeValue(-1); + + final Map translogIds = new HashMap<>(); + final Map translogOps = new HashMap<>(); + + final Map indexingBuffers = new HashMap<>(); + final Map translogBuffers = new HashMap<>(); + + long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds(); + + public MockController(Settings settings) { + super(ImmutableSettings.builder() + .put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it + .put(SHARD_INACTIVE_TIME_SETTING, "0s") // immediate + .put(settings) + .build(), + null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb + } + + public void incTranslog(ShardId shard1, int id, int ops) { + setTranslog(shard1, translogIds.get(shard1) + id, translogOps.get(shard1) + ops); + } + + public void setTranslog(ShardId id, long translogId, int ops) { + translogIds.put(id, translogId); + translogOps.put(id, ops); + } + + public void deleteShard(ShardId id) { + translogIds.remove(id); + translogOps.remove(id); + indexingBuffers.remove(id); + translogBuffers.remove(id); + } + + public void assertActive(ShardId id) { + assertThat(indexingBuffers.get(id), not(equalTo(INACTIVE))); + assertThat(translogBuffers.get(id), not(equalTo(INACTIVE))); + } + + public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) { + assertThat(indexingBuffers.get(id), equalTo(indexing)); + assertThat(translogBuffers.get(id), equalTo(translog)); + } + + public void assertInActive(ShardId id) { + assertThat(indexingBuffers.get(id), equalTo(INACTIVE)); + assertThat(translogBuffers.get(id), equalTo(INACTIVE)); + } + + @Override + protected long currentTimeInNanos() { + return TimeValue.timeValueSeconds(currentTimeSec).nanos(); + } + + @Override + protected List availableShards() { + return new ArrayList<>(translogIds.keySet()); + } + + @Override + protected boolean shardAvailable(ShardId shardId) { + return translogIds.containsKey(shardId); + } + + @Override + protected void markShardAsInactive(ShardId shardId) { + indexingBuffers.put(shardId, INACTIVE); + translogBuffers.put(shardId, INACTIVE); + } + + @Override + protected ShardIndexingStatus getTranslogStatus(ShardId shardId) { + if (!shardAvailable(shardId)) { + return null; + } + ShardIndexingStatus status = new ShardIndexingStatus(); + status.translogId = translogIds.get(shardId); + status.translogNumberOfOperations = translogOps.get(shardId); + return status; + } + + @Override + protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { + indexingBuffers.put(shardId, shardIndexingBufferSize); + translogBuffers.put(shardId, shardTranslogBufferSize); + } + + public void incrementTimeSec(int sec) { + currentTimeSec += sec; + } + + public void simulateFlush(ShardId shard) { + setTranslog(shard, translogIds.get(shard) + 1, 0); + } + } + + public void testShardAdditionAndRemoval() { + MockController controller = new MockController(ImmutableSettings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build()); + final ShardId shard1 = new ShardId("test", 1); + controller.setTranslog(shard1, randomInt(10), randomInt(10)); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + + // add another shard + final ShardId shard2 = new ShardId("test", 2); + controller.setTranslog(shard2, randomInt(10), randomInt(10)); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + // remove first shard + controller.deleteShard(shard1); + controller.forceCheck(); + controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + + // remove second shard + controller.deleteShard(shard2); + controller.forceCheck(); + + // add a new one + final ShardId shard3 = new ShardId("test", 3); + controller.setTranslog(shard3, randomInt(10), randomInt(10)); + controller.forceCheck(); + controller.assertBuffers(shard3, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + } + + public void testActiveInactive() { + MockController controller = new MockController(ImmutableSettings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb") + .put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "5s") + .build()); + + final ShardId shard1 = new ShardId("test", 1); + controller.setTranslog(shard1, 0, 0); + final ShardId shard2 = new ShardId("test", 2); + controller.setTranslog(shard2, 0, 0); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + // index into both shards, move the clock and see that they are still active + controller.setTranslog(shard1, randomInt(2), randomInt(2) + 1); + controller.setTranslog(shard2, randomInt(2) + 1, randomInt(2)); + // the controller doesn't know when the ops happened, so even if this is more + // than the inactive time the shard is still marked as active + controller.incrementTimeSec(10); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + // index into one shard only, see other shard is made inactive correctly + controller.incTranslog(shard1, randomInt(2), randomInt(2) + 1); + controller.forceCheck(); // register what happened with the controller (shard is still active) + controller.incrementTimeSec(3); // increment but not enough + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + controller.incrementTimeSec(3); // increment some more + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.assertInActive(shard2); + + if (randomBoolean()) { + // once a shard gets inactive it will be synced flushed and a new translog generation will be made + controller.simulateFlush(shard2); + controller.forceCheck(); + controller.assertInActive(shard2); + } + + // index some and shard becomes immediately active + controller.incTranslog(shard2, randomInt(2), 1 + randomInt(2)); // we must make sure translog ops is never 0 + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + } + + public void testMinShardBufferSizes() { + MockController controller = new MockController(ImmutableSettings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") + .put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build()); + + assertTwoActiveShards(controller, new ByteSizeValue(6, ByteSizeUnit.MB), new ByteSizeValue(40, ByteSizeUnit.KB)); + } + + public void testMaxShardBufferSizes() { + MockController controller = new MockController(ImmutableSettings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") + .put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb") + .put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build()); + + assertTwoActiveShards(controller, new ByteSizeValue(3, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.KB)); + } + + public void testRelativeBufferSizes() { + MockController controller = new MockController(ImmutableSettings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.5%") + .build()); + + assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(50, ByteSizeUnit.MB))); + assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); + } + + + public void testMinBufferSizes() { + MockController controller = new MockController(ImmutableSettings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.001%") + .put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MIN_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + + assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); + assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); + } + + public void testMaxBufferSizes() { + MockController controller = new MockController(ImmutableSettings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "90%") + .put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MAX_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + + assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); + assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); + } + + protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) { + final ShardId shard1 = new ShardId("test", 1); + controller.setTranslog(shard1, 0, 0); + final ShardId shard2 = new ShardId("test", 2); + controller.setTranslog(shard2, 0, 0); + controller.forceCheck(); + controller.assertBuffers(shard1, indexBufferSize, translogBufferSize); + controller.assertBuffers(shard2, indexBufferSize, translogBufferSize); + + } + +}