Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IndexingMemoryController should not track shard index states #15251

merged 2 commits into from Dec 4, 2015
Changes from 1 commit
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.


Just for now


IndexingMemoryController should not track shard index states

This commit modifies IndexingMemoryController to be stateless. Rather
than statefully tracking the indexing status of shards,
IndexingMemoryController can grab all available shards, check their idle
state, and then resize the buffers based on the number of and which
shards are not idle.

The driver for this change is a performance regression that can arise in
some scenarios after #13918. One scenario under which this performance
regression can arise is if an index is deleted and then created
again. Because IndexingMemoryController was previously statefully
tracking the state of shards via a map of ShardIds, the new shards with
the same ShardIds as previously existing shards would not be detected
and therefore their version maps would never be resized from the
defaults. This led to an explosion in the number of merges causing a
degradation in performance.

Closes #15225
  • Loading branch information...
jasontedor committed Dec 4, 2015
commit 5341404f014fbfd0c0b67c61546df38625d9b4ad
@@ -33,7 +33,6 @@
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
@@ -200,23 +199,17 @@ public ByteSizeValue translogBufferSize() {
return translogBuffer;

protected List<ShardId> availableShards() {
ArrayList<ShardId> list = new ArrayList<>();
protected List<IndexShard> availableShards() {
List<IndexShard> activeShards = new ArrayList<>();

for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
if (shardAvailable(indexShard)) {
for (IndexShard shard : indexService) {
if (shardAvailable(shard)) {
return list;

/** returns true if shard exists and is availabe for updates */
protected boolean shardAvailable(ShardId shardId) {
return shardAvailable(getShard(shardId));
return activeShards;

/** returns true if shard exists and is availabe for updates */
@@ -225,19 +218,8 @@ protected boolean shardAvailable(@Nullable IndexShard shard) {
return shard != null && shard.canIndex() && CAN_UPDATE_INDEX_BUFFER_STATES.contains(shard.state());

/** gets an {@link IndexShard} instance for the given shard. returns null if the shard doesn't exist */
protected IndexShard getShard(ShardId shardId) {
IndexService indexService = indicesService.indexService(shardId.index().name());
if (indexService != null) {
IndexShard indexShard = indexService.getShardOrNull(;
return indexShard;
return null;

/** set new indexing and translog buffers on this shard. this may cause the shard to refresh to free up heap. */
protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
final IndexShard shard = getShard(shardId);
protected void updateShardBuffers(IndexShard shard, ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
if (shard != null) {
try {
shard.updateBufferSize(shardIndexingBufferSize, shardTranslogBufferSize);
@@ -246,113 +228,37 @@ protected void updateShardBuffers(ShardId shardId, ByteSizeValue shardIndexingBu
} catch (FlushNotAllowedEngineException e) {
// ignore
} catch (Exception e) {
logger.warn("failed to set shard {} index buffer to [{}]", e, shardId, shardIndexingBufferSize);
logger.warn("failed to set shard {} index buffer to [{}]", e, shard.shardId(), shardIndexingBufferSize);

/** returns {@link IndexShard#getActive} if the shard exists, else null */
protected Boolean getShardActive(ShardId shardId) {
final IndexShard indexShard = getShard(shardId);
if (indexShard == null) {
return null;
return indexShard.getActive();

/** check if any shards active status changed, now. */
public void forceCheck() {;

class ShardsIndicesStatusChecker implements Runnable {

// True if the shard was active last time we checked
private final Map<ShardId,Boolean> shardWasActive = new HashMap<>();

public synchronized void run() {
EnumSet<ShardStatusChangeType> changes = purgeDeletedAndClosedShards();


if (changes.isEmpty() == false) {
// Something changed: recompute indexing buffers:
calcAndSetShardBuffers("[" + changes + "]");

* goes through all existing shards and check whether there are changes in their active status
private void updateShardStatuses(EnumSet<ShardStatusChangeType> changes) {
for (ShardId shardId : availableShards()) {

// Is the shard active now?
Boolean isActive = getShardActive(shardId);

if (isActive == null) {
// shard was closed..

// Was the shard active last time we checked?
Boolean wasActive = shardWasActive.get(shardId);
if (wasActive == null) {
// First time we are seeing this shard
shardWasActive.put(shardId, isActive);
} else if (isActive) {
// Shard is active now
if (wasActive == false) {
// Shard became active itself, since we last checked (due to new indexing op arriving)
logger.debug("marking shard {} as active indexing wise", shardId);
shardWasActive.put(shardId, true);
} else if (checkIdle(shardId) == Boolean.TRUE) {
// Make shard inactive now

shardWasActive.put(shardId, false);

* purge any existing statuses that are no longer updated
* @return the changes applied
private EnumSet<ShardStatusChangeType> purgeDeletedAndClosedShards() {
EnumSet<ShardStatusChangeType> changes = EnumSet.noneOf(ShardStatusChangeType.class);

Iterator<ShardId> statusShardIdIterator = shardWasActive.keySet().iterator();
while (statusShardIdIterator.hasNext()) {
ShardId shardId =;
if (shardAvailable(shardId) == false) {
return changes;

private void calcAndSetShardBuffers(String reason) {

// Count how many shards are now active:
int activeShardCount = 0;
for (Map.Entry<ShardId,Boolean> ent : shardWasActive.entrySet()) {
if (ent.getValue()) {
private void calcAndSetShardBuffers() {

This comment has been minimized.

Copy link

mikemccand Dec 4, 2015


Can we just absorb this method directly into run()?

This comment has been minimized.

Copy link

jasontedor Dec 4, 2015

Author Member

Done in 5341404.

List<IndexShard> availableShards = availableShards();
List<IndexShard> activeShards = new ArrayList<>();
for (IndexShard shard : availableShards) {
if (!checkIdle(shard)) {
int activeShardCount = activeShards.size();

// TODO: we could be smarter here by taking into account how RAM the IndexWriter on each shard
// is actually using (using IW.ramBytesUsed), so that small indices (e.g. Marvel) would not
// get the same indexing buffer as large indices. But it quickly gets tricky...
if (activeShardCount == 0) {
logger.debug("no active shards (reason={})", reason);
logger.debug("no active shards");

@@ -372,13 +278,10 @@ private void calcAndSetShardBuffers(String reason) {
shardTranslogBufferSize = maxShardTranslogBufferSize;

logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", reason, indexingBuffer, activeShardCount, shardIndexingBufferSize, shardTranslogBufferSize);
logger.debug("recalculating shard indexing buffer, total is [{}] with [{}] active shards, each shard set to indexing=[{}], translog=[{}]", indexingBuffer, activeShardCount, shardIndexingBufferSize, shardTranslogBufferSize);

for (Map.Entry<ShardId,Boolean> ent : shardWasActive.entrySet()) {
if (ent.getValue()) {
// This shard is active
updateShardBuffers(ent.getKey(), shardIndexingBufferSize, shardTranslogBufferSize);
for (IndexShard shard : activeShards) {
updateShardBuffers(shard, shardIndexingBufferSize, shardTranslogBufferSize);
@@ -389,14 +292,13 @@ protected long currentTimeInNanos() {

/** ask this shard to check now whether it is inactive, and reduces its indexing and translog buffers if so. returns Boolean.TRUE if
* it did deactive, Boolean.FALSE if it did not, and null if the shard is unknown */
protected Boolean checkIdle(ShardId shardId) {
String ignoreReason; // eclipse compiler does not know it is really final
final IndexShard shard = getShard(shardId);
protected Boolean checkIdle(IndexShard shard) {
String ignoreReason = null; // eclipse compiler does not know it is really final
if (shard != null) {
try {
if (shard.checkIdle()) {
logger.debug("marking shard {} as inactive (inactive_time[{}]) indexing wise",
return Boolean.TRUE;
@@ -412,15 +314,11 @@ protected Boolean checkIdle(ShardId shardId) {
ignoreReason = "shard not found";
if (ignoreReason != null) {
logger.trace("ignore [{}] while marking shard {} as inactive", ignoreReason, shardId);
logger.trace("ignore [{}] while marking shard {} as inactive", ignoreReason, shard.shardId());
return null;

private static enum ShardStatusChangeType {

public void onShardActive(IndexShard indexShard) {
// At least one shard used to be inactive ie. a new write operation just showed up.
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.