From b70bd670a373271d8bd14ea899e8f1a878d9d395 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 24 Sep 2015 09:26:12 +0200 Subject: [PATCH] Internal: pending operations in the translog prevent shard from being marked as inactive The IndexingMemoryController checks periodically if there is any indexing activity on the shard. If no activity is sean for 5m (default) the shard is marked as inactive allowing it's indexing buffer quota to given to other active shards. Sadly the current check is bad as it checks for 0 translog operation. This makes the inactive wait for a flush to happen - which used to take 30m and since #13707 doesn't happen at all (as we rely on the synced flush triggered by inactivity). This commit fixes the check so it will work with any translog size. --- .../memory/IndexingMemoryController.java | 11 ++++------- .../memory/IndexingMemoryControllerIT.java | 18 ++++++++++++++---- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java index 2bf6946bc32d7..f54020802b5b8 100644 --- a/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java @@ -38,12 +38,7 @@ import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ScheduledFuture; /** @@ -258,7 +253,7 @@ private int updateShardStatuses(EnumSet changes, List changes, List indexing some pending operations"); + indexRandom(false, client().prepareIndex("test1", "type", "0").setSource("f", "0")); + } + boolean success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); if (!success) { fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + @@ -97,12 +104,15 @@ public void testIndexBufferSizeUpdateInactiveShard() throws InterruptedException success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() > EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); if (!success) { - fail("failed to update shard indexing buffer size due to inactive state. expected something larger then [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + + fail("failed to update shard indexing buffer size due to active state. expected something larger then [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + shard1.engine().config().getIndexingBufferSize().bytes() + "]" ); } - flush(); // clean translogs + if (randomBoolean()) { + logger.info("--> flushing translogs"); + flush(); // clean translogs + } success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes()); if (!success) {