From d7faab1143106d0b6cd035bd98fff7b2d0d47599 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 28 Sep 2015 11:56:59 +0200 Subject: [PATCH] Internal: an inactive shard is activated by triggered synced flush When a shard becomes in active we trigger a sync flush in order to speed up future recoveries. The sync flush causes a new translog generation to be made, which in turn confuses the IndexingMemoryController making it think that the shard is active. If no documents comes along in the next 5m, the shard is made inactive again , triggering a sync flush and so forth. To avoid this, the IndexingMemoryController is changed to ignore empty translogs when checking if a shard became active. This comes with the price of potentially missing indexing operations which are followed by a flush. This is acceptable as if no more index operation come in, it's OK to leave the shard in active. A new unit test is introduced and comparable integration tests are removed. Relates #13802 Includes a backport of #13784 Closes #13824 --- .../common/unit/ByteSizeValue.java | 5 +- .../memory/IndexingMemoryController.java | 367 ++++++++++++------ .../IndexingMemoryControllerUnitTests.java | 286 ++++++++++++++ 3 files changed, 536 insertions(+), 122 deletions(-) create mode 100644 src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerUnitTests.java diff --git a/src/main/java/org/elasticsearch/common/unit/ByteSizeValue.java b/src/main/java/org/elasticsearch/common/unit/ByteSizeValue.java index 539b25de9b258..445da29995732 100644 --- a/src/main/java/org/elasticsearch/common/unit/ByteSizeValue.java +++ b/src/main/java/org/elasticsearch/common/unit/ByteSizeValue.java @@ -238,10 +238,7 @@ public boolean equals(Object o) { ByteSizeValue sizeValue = (ByteSizeValue) o; - if (size != sizeValue.size) return false; - if (sizeUnit != sizeValue.sizeUnit) return false; - - return true; + return bytes() == sizeValue.bytes(); } @Override diff --git a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index f1555c85e9197..e4962f0e9e3ec 100644 --- a/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -19,8 +19,8 @@ package org.elasticsearch.indices.memory; -import com.google.common.collect.Lists; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -48,6 +48,44 @@ */ public class IndexingMemoryController extends AbstractLifecycleComponent { + + /** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */ + public static final String INDEX_BUFFER_SIZE_SETTING = "indices.memory.index_buffer_size"; + + /** Only applies when indices.memory.index_buffer_size is a %, to set a floor on the actual size in bytes (default: 48 MB). */ + public static final String MIN_INDEX_BUFFER_SIZE_SETTING = "indices.memory.min_index_buffer_size"; + + /** Only applies when indices.memory.index_buffer_size is a %, to set a ceiling on the actual size in bytes (default: not set). */ + public static final String MAX_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_index_buffer_size"; + + /** Sets a floor on the per-shard index buffer size (default: 4 MB). */ + public static final String MIN_SHARD_INDEX_BUFFER_SIZE_SETTING = "indices.memory.min_shard_index_buffer_size"; + + /** Sets a ceiling on the per-shard index buffer size (default: 512 MB). */ + public static final String MAX_SHARD_INDEX_BUFFER_SIZE_SETTING = "indices.memory.max_shard_index_buffer_size"; + + /** How much heap (% or bytes) we will share across all actively indexing shards for the translog buffer (default: 1%). */ + public static final String TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.translog_buffer_size"; + + /** Only applies when indices.memory.translog_buffer_size is a %, to set a floor on the actual size in bytes (default: 256 KB). */ + public static final String MIN_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.min_translog_buffer_size"; + + /** Only applies when indices.memory.translog_buffer_size is a %, to set a ceiling on the actual size in bytes (default: not set). */ + public static final String MAX_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.max_translog_buffer_size"; + + /** Sets a floor on the per-shard translog buffer size (default: 2 KB). */ + public static final String MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.min_shard_translog_buffer_size"; + + /** Sets a ceiling on the per-shard translog buffer size (default: 64 KB). */ + public static final String MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING = "indices.memory.max_shard_translog_buffer_size"; + + /** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */ + public static final String SHARD_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time"; + + /** How frequently we check shards to find inactive ones (default: 30 seconds). */ + public static final String SHARD_INACTIVE_INTERVAL_TIME_SETTING = "indices.memory.interval"; + + private final ThreadPool threadPool; private final IndicesService indicesService; @@ -67,19 +105,26 @@ public class IndexingMemoryController extends AbstractLifecycleComponent CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of( IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); + private final ShardsIndicesStatusChecker statusChecker; + @Inject public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) { + this(settings, threadPool, indicesService, JvmInfo.jvmInfo().getMem().getHeapMax().bytes()); + } + + // for testing + protected IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService, long jvmMemoryInBytes) { super(settings); this.threadPool = threadPool; this.indicesService = indicesService; ByteSizeValue indexingBuffer; - String indexingBufferSetting = componentSettings.get("index_buffer_size", "10%"); + String indexingBufferSetting = settings.get(INDEX_BUFFER_SIZE_SETTING, "10%"); if (indexingBufferSetting.endsWith("%")) { double percent = Double.parseDouble(indexingBufferSetting.substring(0, indexingBufferSetting.length() - 1)); - indexingBuffer = new ByteSizeValue((long) (((double) JvmInfo.jvmInfo().mem().heapMax().bytes()) * (percent / 100))); - ByteSizeValue minIndexingBuffer = componentSettings.getAsBytesSize("min_index_buffer_size", new ByteSizeValue(48, ByteSizeUnit.MB)); - ByteSizeValue maxIndexingBuffer = componentSettings.getAsBytesSize("max_index_buffer_size", null); + indexingBuffer = new ByteSizeValue((long) (((double) jvmMemoryInBytes) * (percent / 100))); + ByteSizeValue minIndexingBuffer = settings.getAsBytesSize(MIN_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(48, ByteSizeUnit.MB)); + ByteSizeValue maxIndexingBuffer = settings.getAsBytesSize(MAX_INDEX_BUFFER_SIZE_SETTING, null); if (indexingBuffer.bytes() < minIndexingBuffer.bytes()) { indexingBuffer = minIndexingBuffer; @@ -91,18 +136,17 @@ public IndexingMemoryController(Settings settings, ThreadPool threadPool, Indice indexingBuffer = ByteSizeValue.parseBytesSizeValue(indexingBufferSetting, null); } this.indexingBuffer = indexingBuffer; - this.minShardIndexBufferSize = componentSettings.getAsBytesSize("min_shard_index_buffer_size", new ByteSizeValue(4, ByteSizeUnit.MB)); + this.minShardIndexBufferSize = settings.getAsBytesSize(MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(4, ByteSizeUnit.MB)); // LUCENE MONITOR: Based on this thread, currently (based on Mike), having a large buffer does not make a lot of sense: https://issues.apache.org/jira/browse/LUCENE-2324?focusedCommentId=13005155&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13005155 - this.maxShardIndexBufferSize = componentSettings.getAsBytesSize("max_shard_index_buffer_size", new ByteSizeValue(512, ByteSizeUnit.MB)); + this.maxShardIndexBufferSize = settings.getAsBytesSize(MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(512, ByteSizeUnit.MB)); ByteSizeValue translogBuffer; - String translogBufferSetting = componentSettings.get("translog_buffer_size", "1%"); + String translogBufferSetting = settings.get(TRANSLOG_BUFFER_SIZE_SETTING, "1%"); if (translogBufferSetting.endsWith("%")) { double percent = Double.parseDouble(translogBufferSetting.substring(0, translogBufferSetting.length() - 1)); - translogBuffer = new ByteSizeValue((long) (((double) JvmInfo.jvmInfo().mem().heapMax().bytes()) * (percent / 100))); - ByteSizeValue minTranslogBuffer = componentSettings.getAsBytesSize("min_translog_buffer_size", new ByteSizeValue(256, ByteSizeUnit.KB)); - ByteSizeValue maxTranslogBuffer = componentSettings.getAsBytesSize("max_translog_buffer_size", null); - + translogBuffer = new ByteSizeValue((long) (((double) jvmMemoryInBytes) * (percent / 100))); + ByteSizeValue minTranslogBuffer = settings.getAsBytesSize(MIN_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(256, ByteSizeUnit.KB)); + ByteSizeValue maxTranslogBuffer = settings.getAsBytesSize(MAX_TRANSLOG_BUFFER_SIZE_SETTING, null); if (translogBuffer.bytes() < minTranslogBuffer.bytes()) { translogBuffer = minTranslogBuffer; } @@ -113,21 +157,26 @@ public IndexingMemoryController(Settings settings, ThreadPool threadPool, Indice translogBuffer = ByteSizeValue.parseBytesSizeValue(translogBufferSetting, null); } this.translogBuffer = translogBuffer; - this.minShardTranslogBufferSize = componentSettings.getAsBytesSize("min_shard_translog_buffer_size", new ByteSizeValue(2, ByteSizeUnit.KB)); - this.maxShardTranslogBufferSize = componentSettings.getAsBytesSize("max_shard_translog_buffer_size", new ByteSizeValue(64, ByteSizeUnit.KB)); + this.minShardTranslogBufferSize = settings.getAsBytesSize(MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(2, ByteSizeUnit.KB)); + this.maxShardTranslogBufferSize = settings.getAsBytesSize(MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, new ByteSizeValue(64, ByteSizeUnit.KB)); - this.inactiveTime = componentSettings.getAsTime("shard_inactive_time", TimeValue.timeValueMinutes(5)); + this.inactiveTime = settings.getAsTime(SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5)); // we need to have this relatively small to move a shard from inactive to active fast (enough) - this.interval = componentSettings.getAsTime("interval", TimeValue.timeValueSeconds(30)); - - logger.debug("using index_buffer_size [{}], with min_shard_index_buffer_size [{}], max_shard_index_buffer_size [{}], shard_inactive_time [{}]", this.indexingBuffer, this.minShardIndexBufferSize, this.maxShardIndexBufferSize, this.inactiveTime); - + this.interval = settings.getAsTime(SHARD_INACTIVE_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(30)); + + this.statusChecker = new ShardsIndicesStatusChecker(); + logger.debug("using indexing buffer size [{}], with {} [{}], {} [{}], {} [{}], {} [{}]", + this.indexingBuffer, + MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, this.minShardIndexBufferSize, + MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, this.maxShardIndexBufferSize, + SHARD_INACTIVE_TIME_SETTING, this.inactiveTime, + SHARD_INACTIVE_INTERVAL_TIME_SETTING, this.interval); } @Override protected void doStart() throws ElasticsearchException { // its fine to run it on the scheduler thread, no busy work - this.scheduler = threadPool.scheduleWithFixedDelay(new ShardsIndicesStatusChecker(), interval); + this.scheduler = threadPool.scheduleWithFixedDelay(statusChecker, interval); } @Override @@ -148,6 +197,89 @@ public ByteSizeValue indexingBufferSize() { return indexingBuffer; } + /** + * returns the current budget for the total amount of translog buffers of + * active shards on this node + */ + public ByteSizeValue translogBufferSize() { + return translogBuffer; + } + + + protected List availableShards() { + ArrayList list = new ArrayList<>(); + + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + if (shardAvailable(indexShard)) { + list.add(indexShard.shardId()); + } + } + } + return list; + } + + /** returns true if shard exists and is availabe for updates */ + protected boolean shardAvailable(ShardId shardId) { + return shardAvailable(getShard(shardId)); + } + + /** returns true if shard exists and is availabe for updates */ + protected boolean shardAvailable(@Nullable IndexShard shard) { + return shard != null && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state()); + } + + /** gets an {@link IndexShard} instance for the given shard. returns null if the shard doesn't exist */ + protected IndexShard getShard(ShardId shardId) { + IndexService indexService = indicesService.indexService(shardId.index().name()); + if (indexService != null) { + IndexShard indexShard = indexService.shard(shardId.id()); + return indexShard; + } + return null; + } + + protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { + final IndexShard shard = getShard(shardId); + if (shard != null) { + try { + shard.updateBufferSize(shardIndexingBufferSize, shardTranslogBufferSize); + } catch (EngineClosedException e) { + // ignore + } catch (FlushNotAllowedEngineException e) { + // ignore + } catch (Exception e) { + logger.warn("failed to set shard {} index buffer to [{}]", shardId, shardIndexingBufferSize); + } + } + } + + + /** returns the current translog status (generation id + ops) for the given shard id. Returns null if unavailable. */ + protected ShardIndexingStatus getTranslogStatus(ShardId shardId) { + final IndexShard indexShard = getShard(shardId); + if (indexShard == null) { + return null; + } + final Translog translog; + try { + translog = indexShard.translog(); + } catch (EngineClosedException e) { + // not ready yet to be checked for activity + return null; + } + + ShardIndexingStatus status = new ShardIndexingStatus(); + status.translogId = translog.currentId(); + status.translogNumberOfOperations = translog.estimatedNumberOfOperations(); + return status; + } + + // used for tests + void forceCheck() { + statusChecker.run(); + } + class ShardsIndicesStatusChecker implements Runnable { private final Map shardsIndicesStatus = new HashMap<>(); @@ -159,17 +291,10 @@ public void run() { changes.addAll(purgeDeletedAndClosedShards()); - final List activeToInactiveIndexingShards = Lists.newArrayList(); + final List activeToInactiveIndexingShards = new ArrayList<>(); final int activeShards = updateShardStatuses(changes, activeToInactiveIndexingShards); - for (IndexShard indexShard : activeToInactiveIndexingShards) { - // update inactive indexing buffer size - try { - indexShard.markAsInactive(); - } catch (EngineClosedException e) { - // ignore - } catch (FlushNotAllowedEngineException e) { - // ignore - } + for (ShardId indexShard : activeToInactiveIndexingShards) { + markShardAsInactive(indexShard); } if (!changes.isEmpty()) { calcAndSetShardBuffers(activeShards, "[" + changes + "]"); @@ -181,57 +306,40 @@ public void run() { * * @return the current count of active shards */ - private int updateShardStatuses(EnumSet changes, List activeToInactiveIndexingShards) { + private int updateShardStatuses(EnumSet changes, List activeToInactiveIndexingShards) { int activeShards = 0; - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - - if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state())) { - // not ready to be updated yet. - continue; - } - - final long time = threadPool.estimatedTimeInMillis(); + for (ShardId shardId : availableShards()) { + final ShardIndexingStatus currentStatus = getTranslogStatus(shardId); - Translog translog = indexShard.translog(); - ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); - if (status == null) { - status = new ShardIndexingStatus(); - shardsIndicesStatus.put(indexShard.shardId(), status); - changes.add(ShardStatusChangeType.ADDED); - } - // check if it is deemed to be inactive (sam translogId and numberOfOperations over a long period of time) - if (status.translogId == translog.currentId() && translog.estimatedNumberOfOperations() == status.translogNumberOfOperations) { - if (status.time == -1) { // first time - status.time = time; - } - // inactive? - if (status.activeIndexing) { - // mark it as inactive only if enough time has passed and there are no ongoing merges going on... - if ((time - status.time) > inactiveTime.millis() && indexShard.mergeStats().getCurrent() == 0) { - // inactive for this amount of time, mark it - activeToInactiveIndexingShards.add(indexShard); - status.activeIndexing = false; - changes.add(ShardStatusChangeType.BECAME_INACTIVE); - logger.debug("marking shard [{}][{}] as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", indexShard.shardId().index().name(), indexShard.shardId().id(), inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER); - } - } - } else { - if (!status.activeIndexing) { - status.activeIndexing = true; - changes.add(ShardStatusChangeType.BECAME_ACTIVE); - logger.debug("marking shard [{}][{}] as active indexing wise", indexShard.shardId().index().name(), indexShard.shardId().id()); - } - status.time = -1; - } - status.translogId = translog.currentId(); - status.translogNumberOfOperations = translog.estimatedNumberOfOperations(); + if (currentStatus == null) { + // shard was closed.. + continue; + } - if (status.activeIndexing) { - activeShards++; + ShardIndexingStatus status = shardsIndicesStatus.get(shardId); + if (status == null) { + status = currentStatus; + shardsIndicesStatus.put(shardId, status); + changes.add(ShardStatusChangeType.ADDED); + } else { + final boolean lastActiveIndexing = status.activeIndexing; + status.updateWith(currentTimeInNanos(), currentStatus, inactiveTime.nanos()); + if (lastActiveIndexing && (status.activeIndexing == false)) { + activeToInactiveIndexingShards.add(shardId); + changes.add(ShardStatusChangeType.BECAME_INACTIVE); + logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise, setting size to [{}]", + shardId, + inactiveTime, EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER); + } else if ((lastActiveIndexing == false) && status.activeIndexing) { + changes.add(ShardStatusChangeType.BECAME_ACTIVE); + logger.debug("marking shard {} as active indexing wise", shardId); } } + if (status.activeIndexing) { + activeShards++; + } } + return activeShards; } @@ -245,26 +353,10 @@ private EnumSet purgeDeletedAndClosedShards() { Iterator statusShardIdIterator = shardsIndicesStatus.keySet().iterator(); while (statusShardIdIterator.hasNext()) { - ShardId statusShardId = statusShardIdIterator.next(); - IndexService indexService = indicesService.indexService(statusShardId.getIndex()); - boolean remove = false; - try { - if (indexService == null) { - remove = true; - continue; - } - IndexShard indexShard = indexService.shard(statusShardId.id()); - if (indexShard == null) { - remove = true; - continue; - } - remove = !CAN_UPDATE_INDEX_BUFFER_STATES.contains(indexShard.state()); - - } finally { - if (remove) { - changes.add(ShardStatusChangeType.DELETED); - statusShardIdIterator.remove(); - } + ShardId shardId = statusShardIdIterator.next(); + if (shardAvailable(shardId) == false) { + changes.add(ShardStatusChangeType.DELETED); + statusShardIdIterator.remove(); } } return changes; @@ -291,41 +383,80 @@ private void calcAndSetShardBuffers(int activeShards, String reason) { } logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShards, shardIndexingBufferSize, shardTranslogBufferSize); - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - IndexShardState state = indexShard.state(); - if (!CAN_UPDATE_INDEX_BUFFER_STATES.contains(state)) { - logger.trace("shard [{}] is not yet ready for index buffer update. index shard state: [{}]", indexShard.shardId(), state); - continue; - } - ShardIndexingStatus status = shardsIndicesStatus.get(indexShard.shardId()); - if (status == null || status.activeIndexing) { - try { - indexShard.updateBufferSize(shardIndexingBufferSize, shardTranslogBufferSize); - } catch (EngineClosedException e) { - // ignore - continue; - } catch (FlushNotAllowedEngineException e) { - // ignore - continue; - } catch (Exception e) { - logger.warn("failed to set shard {} index buffer to [{}]", indexShard.shardId(), shardIndexingBufferSize); - } - } + for (ShardId shardId : availableShards()) { + ShardIndexingStatus status = shardsIndicesStatus.get(shardId); + if (status == null || status.activeIndexing) { + updateShardBuffers(shardId, shardIndexingBufferSize, shardTranslogBufferSize); } } } } + protected long currentTimeInNanos() { + return System.nanoTime(); + } + + // update inactive indexing buffer size + protected void markShardAsInactive(ShardId shardId) { + String ignoreReason = null; + final IndexShard shard = getShard(shardId); + if (shard != null) { + try { + shard.markAsInactive(); + } catch (EngineClosedException e) { + // ignore + ignoreReason = "EngineClosedException"; + } catch (FlushNotAllowedEngineException e) { + // ignore + ignoreReason = "FlushNotAllowedEngineException"; + } + } else { + ignoreReason = "shard not found"; + } + if (ignoreReason != null) { + logger.trace("ignore [{}] while marking shard {} as inactive", ignoreReason, shardId); + } + } + private static enum ShardStatusChangeType { ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE } - static class ShardIndexingStatus { long translogId = -1; int translogNumberOfOperations = -1; boolean activeIndexing = true; - long time = -1; // contains the first time we saw this shard with no operations done on it + long idleSinceNanoTime = -1; // contains the first time we saw this shard with no operations done on it + + + /** update status based on a new sample. updates all internal variables */ + public void updateWith(long currentNanoTime, ShardIndexingStatus current, long inactiveNanoInterval) { + final boolean idle = (translogId == current.translogId && translogNumberOfOperations == current.translogNumberOfOperations); + if (activeIndexing && idle) { + // no indexing activity detected. + if (idleSinceNanoTime < 0) { + // first time we see this, start the clock. + idleSinceNanoTime = currentNanoTime; + } else if ((currentNanoTime - idleSinceNanoTime) > inactiveNanoInterval) { + // shard is inactive. mark it as such. + activeIndexing = false; + } + } else if (activeIndexing == false // we weren't indexing before + && idle == false // but we do now + && current.translogNumberOfOperations > 0 // but only if we're really sure - see note bellow + ) { + // since we sync flush once a shard becomes inactive, the translog id can change, however that + // doesn't mean the an indexing operation has happened. Note that if we're really unlucky and a flush happens + // immediately after an indexing operation we may not become active immediately. The following + // indexing operation will mark the shard as active, so it's OK. If that one doesn't come, we might as well stay + // inactive + + activeIndexing = true; + idleSinceNanoTime = -1; + } + + translogId = current.translogId; + translogNumberOfOperations = current.translogNumberOfOperations; + } } } diff --git a/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerUnitTests.java b/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerUnitTests.java new file mode 100644 index 0000000000000..4f79b745b93b0 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerUnitTests.java @@ -0,0 +1,286 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices.memory; + +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ElasticsearchTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; + +public class IndexingMemoryControllerUnitTests extends ElasticsearchTestCase { + + static class MockController extends IndexingMemoryController { + + final static ByteSizeValue INACTIVE = new ByteSizeValue(-1); + + final Map translogIds = new HashMap<>(); + final Map translogOps = new HashMap<>(); + + final Map indexingBuffers = new HashMap<>(); + final Map translogBuffers = new HashMap<>(); + + long currentTimeSec = TimeValue.timeValueNanos(System.nanoTime()).seconds(); + + public MockController(Settings settings) { + super(ImmutableSettings.builder() + .put(SHARD_INACTIVE_INTERVAL_TIME_SETTING, "200h") // disable it + .put(SHARD_INACTIVE_TIME_SETTING, "0s") // immediate + .put(settings) + .build(), + null, null, 100 * 1024 * 1024); // fix jvm mem size to 100mb + } + + public void incTranslog(ShardId shard1, int id, int ops) { + setTranslog(shard1, translogIds.get(shard1) + id, translogOps.get(shard1) + ops); + } + + public void setTranslog(ShardId id, long translogId, int ops) { + translogIds.put(id, translogId); + translogOps.put(id, ops); + } + + public void deleteShard(ShardId id) { + translogIds.remove(id); + translogOps.remove(id); + indexingBuffers.remove(id); + translogBuffers.remove(id); + } + + public void assertActive(ShardId id) { + assertThat(indexingBuffers.get(id), not(equalTo(INACTIVE))); + assertThat(translogBuffers.get(id), not(equalTo(INACTIVE))); + } + + public void assertBuffers(ShardId id, ByteSizeValue indexing, ByteSizeValue translog) { + assertThat(indexingBuffers.get(id), equalTo(indexing)); + assertThat(translogBuffers.get(id), equalTo(translog)); + } + + public void assertInActive(ShardId id) { + assertThat(indexingBuffers.get(id), equalTo(INACTIVE)); + assertThat(translogBuffers.get(id), equalTo(INACTIVE)); + } + + @Override + protected long currentTimeInNanos() { + return TimeValue.timeValueSeconds(currentTimeSec).nanos(); + } + + @Override + protected List availableShards() { + return new ArrayList<>(translogIds.keySet()); + } + + @Override + protected boolean shardAvailable(ShardId shardId) { + return translogIds.containsKey(shardId); + } + + @Override + protected void markShardAsInactive(ShardId shardId) { + indexingBuffers.put(shardId, INACTIVE); + translogBuffers.put(shardId, INACTIVE); + } + + @Override + protected ShardIndexingStatus getTranslogStatus(ShardId shardId) { + if (!shardAvailable(shardId)) { + return null; + } + ShardIndexingStatus status = new ShardIndexingStatus(); + status.translogId = translogIds.get(shardId); + status.translogNumberOfOperations = translogOps.get(shardId); + return status; + } + + @Override + protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { + indexingBuffers.put(shardId, shardIndexingBufferSize); + translogBuffers.put(shardId, shardTranslogBufferSize); + } + + public void incrementTimeSec(int sec) { + currentTimeSec += sec; + } + + public void simulateFlush(ShardId shard) { + setTranslog(shard, translogIds.get(shard) + 1, 0); + } + } + + public void testShardAdditionAndRemoval() { + MockController controller = new MockController(ImmutableSettings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb").build()); + final ShardId shard1 = new ShardId("test", 1); + controller.setTranslog(shard1, randomInt(10), randomInt(10)); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + + // add another shard + final ShardId shard2 = new ShardId("test", 2); + controller.setTranslog(shard2, randomInt(10), randomInt(10)); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + // remove first shard + controller.deleteShard(shard1); + controller.forceCheck(); + controller.assertBuffers(shard2, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + + // remove second shard + controller.deleteShard(shard2); + controller.forceCheck(); + + // add a new one + final ShardId shard3 = new ShardId("test", 3); + controller.setTranslog(shard3, randomInt(10), randomInt(10)); + controller.forceCheck(); + controller.assertBuffers(shard3, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); // translog is maxed at 64K + } + + public void testActiveInactive() { + MockController controller = new MockController(ImmutableSettings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "100kb") + .put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "5s") + .build()); + + final ShardId shard1 = new ShardId("test", 1); + controller.setTranslog(shard1, 0, 0); + final ShardId shard2 = new ShardId("test", 2); + controller.setTranslog(shard2, 0, 0); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + // index into both shards, move the clock and see that they are still active + controller.setTranslog(shard1, randomInt(2), randomInt(2) + 1); + controller.setTranslog(shard2, randomInt(2) + 1, randomInt(2)); + // the controller doesn't know when the ops happened, so even if this is more + // than the inactive time the shard is still marked as active + controller.incrementTimeSec(10); + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + // index into one shard only, see other shard is made inactive correctly + controller.incTranslog(shard1, randomInt(2), randomInt(2) + 1); + controller.forceCheck(); // register what happened with the controller (shard is still active) + controller.incrementTimeSec(3); // increment but not enough + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + + controller.incrementTimeSec(3); // increment some more + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(64, ByteSizeUnit.KB)); + controller.assertInActive(shard2); + + if (randomBoolean()) { + // once a shard gets inactive it will be synced flushed and a new translog generation will be made + controller.simulateFlush(shard2); + controller.forceCheck(); + controller.assertInActive(shard2); + } + + // index some and shard becomes immediately active + controller.incTranslog(shard2, randomInt(2), 1 + randomInt(2)); // we must make sure translog ops is never 0 + controller.forceCheck(); + controller.assertBuffers(shard1, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + controller.assertBuffers(shard2, new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(50, ByteSizeUnit.KB)); + } + + public void testMinShardBufferSizes() { + MockController controller = new MockController(ImmutableSettings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") + .put(IndexingMemoryController.MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MIN_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "40kb").build()); + + assertTwoActiveShards(controller, new ByteSizeValue(6, ByteSizeUnit.MB), new ByteSizeValue(40, ByteSizeUnit.KB)); + } + + public void testMaxShardBufferSizes() { + MockController controller = new MockController(ImmutableSettings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "10mb") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "50kb") + .put(IndexingMemoryController.MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, "3mb") + .put(IndexingMemoryController.MAX_SHARD_TRANSLOG_BUFFER_SIZE_SETTING, "10kb").build()); + + assertTwoActiveShards(controller, new ByteSizeValue(3, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.KB)); + } + + public void testRelativeBufferSizes() { + MockController controller = new MockController(ImmutableSettings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "50%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.5%") + .build()); + + assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(50, ByteSizeUnit.MB))); + assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); + } + + + public void testMinBufferSizes() { + MockController controller = new MockController(ImmutableSettings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "0.001%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "0.001%") + .put(IndexingMemoryController.MIN_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MIN_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + + assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); + assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); + } + + public void testMaxBufferSizes() { + MockController controller = new MockController(ImmutableSettings.builder() + .put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "90%") + .put(IndexingMemoryController.TRANSLOG_BUFFER_SIZE_SETTING, "90%") + .put(IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, "6mb") + .put(IndexingMemoryController.MAX_TRANSLOG_BUFFER_SIZE_SETTING, "512kb").build()); + + assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB))); + assertThat(controller.translogBufferSize(), equalTo(new ByteSizeValue(512, ByteSizeUnit.KB))); + } + + protected void assertTwoActiveShards(MockController controller, ByteSizeValue indexBufferSize, ByteSizeValue translogBufferSize) { + final ShardId shard1 = new ShardId("test", 1); + controller.setTranslog(shard1, 0, 0); + final ShardId shard2 = new ShardId("test", 2); + controller.setTranslog(shard2, 0, 0); + controller.forceCheck(); + controller.assertBuffers(shard1, indexBufferSize, translogBufferSize); + controller.assertBuffers(shard2, indexBufferSize, translogBufferSize); + + } + +}