Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush instead of synced-flush inactive shards #51365

Merged
merged 2 commits into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
Expand All @@ -51,6 +52,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndexClosedException;
Expand Down Expand Up @@ -111,8 +113,13 @@ public SyncedFlushService(IndicesService indicesService,

@Override
public void onShardInactive(final IndexShard indexShard) {
// we only want to call sync flush once, so only trigger it when we are on a primary
if (indexShard.routingEntry().primary()) {
// A normal flush has the same effect as a synced flush if all nodes are on 7.6 or later.
final boolean preferNormalFlush = StreamSupport.stream(clusterService.state().nodes().spliterator(), false)
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
.allMatch(node -> node.getVersion().onOrAfter(Version.V_7_6_0));
if (preferNormalFlush) {
performNormalFlushOnInactive(indexShard);
} else if (indexShard.routingEntry().primary()) {
// we only want to call sync flush once, so only trigger it when we are on a primary
attemptSyncedFlush(indexShard.shardId(), new ActionListener<ShardsSyncedFlushResult>() {
@Override
public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
Expand All @@ -128,6 +135,23 @@ public void onFailure(Exception e) {
}
}

private void performNormalFlushOnInactive(IndexShard shard) {
logger.debug("flushing shard {} on inactive", shard.routingEntry());
shard.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (shard.state() != IndexShardState.CLOSED) {
logger.warn(new ParameterizedMessage("failed to flush shard {} on inactive", shard.routingEntry()), e);
}
}

@Override
protected void doRun() {
shard.flush(new FlushRequest().force(false).waitIfOngoing(false));
}
});
}

/**
* a utility method to perform a synced flush for all shards of multiple indices.
* see {@link #attemptSyncedFlush(ShardId, ActionListener)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,11 @@ public void testMarkAsInactiveTriggersSyncedFlush() throws Exception {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
assertBusy(() -> {
IndexStats indexStats = client().admin().indices().prepareStats("test").clear().get().getIndex("test");
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
IndexStats indexStats = client().admin().indices().prepareStats("test").clear().setTranslog(true).get().getIndex("test");
assertThat(indexStats.getTotal().translog.getUncommittedOperations(), equalTo(0));
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
}
);
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}

public void testDurableFlagHasEffect() throws Exception {
Expand Down
38 changes: 38 additions & 0 deletions server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
Expand All @@ -47,15 +48,22 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand All @@ -71,6 +79,11 @@
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class FlushIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(InternalSettingsPlugin.class);
}

public void testWaitIfOngoing() throws InterruptedException {
createIndex("test");
ensureGreen("test");
Expand Down Expand Up @@ -369,4 +382,29 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
}

public void testFlushOnInactive() throws Exception {
final String indexName = "flush_on_inactive";
List<String> dataNodes = internalCluster().startDataOnlyNodes(2, Settings.builder()
.put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.getKey(), randomTimeValue(10, 1000, "ms")).build());
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), randomTimeValue(50, 200, "ms"))
.put("index.routing.allocation.include._name", String.join(",", dataNodes))
.build()));
ensureGreen(indexName);
int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex(indexName, "_doc").setSource("f", "v").get();
}
if (randomBoolean()) {
internalCluster().restartNode(randomFrom(dataNodes), new InternalTestCluster.RestartCallback());
ensureGreen(indexName);
}
assertBusy(() -> {
for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getShards()) {
assertThat(shardStats.getStats().getTranslog().getUncommittedOperations(), equalTo(0));
}
}, 30, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2142,9 +2142,13 @@ public List<String> startMasterOnlyNodes(int numNodes, Settings settings) {
}

public List<String> startDataOnlyNodes(int numNodes) {
return startDataOnlyNodes(numNodes, Settings.EMPTY);
}

public List<String> startDataOnlyNodes(int numNodes, Settings settings) {
return startNodes(
numNodes,
Settings.builder().put(Settings.EMPTY).put(Node.NODE_MASTER_SETTING.getKey(), false)
Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false)
.put(Node.NODE_DATA_SETTING.getKey(), true).build());
}

Expand Down