Skip to content

Commit

Permalink
Internal: pending operations in the translog prevent shard from being…
Browse files Browse the repository at this point in the history
… marked as inactive

The IndexingMemoryController checks periodically if there is any indexing activity on the shard. If no activity is sean for 5m (default) the shard is marked as inactive allowing it's indexing buffer quota to given to other active shards.

Sadly the current check is bad as it checks for 0 translog operation. This makes the inactive wait for a flush to happen - which used to take 30m and since elastic#13707 doesn't happen at all (as we rely on the synced flush triggered by inactivity). This commit fixes the check so it will work with any translog size.
  • Loading branch information
bleskes committed Sep 24, 2015
1 parent 1fd8c64 commit b70bd67
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
Expand Up @@ -38,12 +38,7 @@
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ScheduledFuture;

/**
Expand Down Expand Up @@ -258,7 +253,7 @@ private int updateShardStatuses(EnumSet<ShardStatusChangeType> changes, List<Ind
}

// consider shard inactive if it has same translogFileGeneration and no operations for a long time
if (status.translogId == translog.currentFileGeneration() && translog.totalOperations() == 0) {
if (status.translogId == translog.currentFileGeneration() && translog.totalOperations() == status.translogNumberOfOperations) {
if (status.timeMS == -1) {
// first time we noticed the shard become idle
status.timeMS = timeMS;
Expand All @@ -282,6 +277,7 @@ private int updateShardStatuses(EnumSet<ShardStatusChangeType> changes, List<Ind
status.timeMS = -1;
}
status.translogId = translog.currentFileGeneration();
status.translogNumberOfOperations = translog.totalOperations();

if (status.activeIndexing) {
activeShards++;
Expand Down Expand Up @@ -376,6 +372,7 @@ private static enum ShardStatusChangeType {

private static class ShardIndexingStatus {
long translogId = -1;
long translogNumberOfOperations = -1;
boolean activeIndexing = true;
long timeMS = -1; // contains the first time we saw this shard with no operations done on it
}
Expand Down
Expand Up @@ -24,13 +24,14 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test;

import java.util.concurrent.ExecutionException;


@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IndexingMemoryControllerIT extends ESIntegTestCase {
Expand Down Expand Up @@ -77,7 +78,7 @@ public void testIndexBufferSizeUpdateAfterCreationRemoval() throws InterruptedEx
}

@Test
public void testIndexBufferSizeUpdateInactiveShard() throws InterruptedException {
public void testIndexBufferSizeUpdateInactiveShard() throws InterruptedException, ExecutionException {

createNode(Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, "100ms").build());

Expand All @@ -86,6 +87,12 @@ public void testIndexBufferSizeUpdateInactiveShard() throws InterruptedException
ensureGreen();

final IndexShard shard1 = internalCluster().getInstance(IndicesService.class).indexService("test1").shard(0);

if (randomBoolean()) {
logger.info("--> indexing some pending operations");
indexRandom(false, client().prepareIndex("test1", "type", "0").setSource("f", "0"));
}

boolean success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes());
if (!success) {
fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
Expand All @@ -97,12 +104,15 @@ public void testIndexBufferSizeUpdateInactiveShard() throws InterruptedException

success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() > EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes());
if (!success) {
fail("failed to update shard indexing buffer size due to inactive state. expected something larger then [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
fail("failed to update shard indexing buffer size due to active state. expected something larger then [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
shard1.engine().config().getIndexingBufferSize().bytes() + "]"
);
}

flush(); // clean translogs
if (randomBoolean()) {
logger.info("--> flushing translogs");
flush(); // clean translogs
}

success = awaitBusy(() -> shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes());
if (!success) {
Expand Down

0 comments on commit b70bd67

Please sign in to comment.