Skip to content

Commit

Permalink
Reduce merging in PersistedClusterStateService (#79793)
Browse files Browse the repository at this point in the history
When writing the cluster state index we flush a segment every 2000 docs
or so, which sometimes triggers merging in the middle of the write
process. This merging is often unnecessary since many of the segments
being merged would have ended up containing no live docs at the end of
the process and hence could have just been deleted.

With this commit we adjust the merge policy to be much more relaxed
about merging, permitting up to 100 segments per tier, since we only
read this index very rarely and not on any hot paths. We also disable
merging completely during the write process, checking just before commit
to see if any merging should be done.

Relates #77466
  • Loading branch information
DaveCTurner committed Oct 26, 2021
1 parent 008daa1 commit 5ee9bde
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -53,20 +56,21 @@
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.NodeMetadata;
import org.elasticsearch.index.Index;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;

import java.io.Closeable;
import java.io.IOError;
Expand Down Expand Up @@ -124,6 +128,9 @@ public class PersistedClusterStateService {
private static final String INDEX_UUID_FIELD_NAME = "index_uuid";
private static final int COMMIT_DATA_SIZE = 4;

private static final MergePolicy NO_MERGE_POLICY = noMergePolicy();
private static final MergePolicy DEFAULT_MERGE_POLICY = defaultMergePolicy();

public static final String METADATA_DIRECTORY_NAME = MetadataStateFormat.STATE_DIR_NAME;

public static final Setting<TimeValue> SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold",
Expand Down Expand Up @@ -193,10 +200,13 @@ private static IndexWriter createIndexWriter(Directory directory, boolean openEx
indexWriterConfig.setOpenMode(openExisting ? IndexWriterConfig.OpenMode.APPEND : IndexWriterConfig.OpenMode.CREATE);
// only commit when specifically instructed, we must not write any intermediate states
indexWriterConfig.setCommitOnClose(false);
// most of the data goes into stored fields which are not buffered, so we only really need a tiny buffer
// most of the data goes into stored fields which are not buffered, so each doc written accounts for ~500B of indexing buffer
// (see e.g. BufferedUpdates#BYTES_PER_DEL_TERM); a 1MB buffer therefore gets flushed every ~2000 docs.
indexWriterConfig.setRAMBufferSizeMB(1.0);
// merge on the write thread (e.g. while flushing)
indexWriterConfig.setMergeScheduler(new SerialMergeScheduler());
// apply the adjusted merge policy
indexWriterConfig.setMergePolicy(DEFAULT_MERGE_POLICY);

return new IndexWriter(directory, indexWriterConfig);
}
Expand Down Expand Up @@ -481,6 +491,28 @@ private static void consumeFromType(IndexSearcher indexSearcher, String type,
FORMAT_PARAMS = new ToXContent.MapParams(params);
}

@SuppressForbidden(reason = "merges are only temporarily suppressed, the merge scheduler does not need changing")
private static MergePolicy noMergePolicy() {
return NoMergePolicy.INSTANCE;
}

private static MergePolicy defaultMergePolicy() {
final TieredMergePolicy mergePolicy = new TieredMergePolicy();

// don't worry about cleaning up deletes too much, segments will often get completely deleted once they're old enough
mergePolicy.setDeletesPctAllowed(50.0);
// more/smaller segments means there's a better chance they just get deleted before needing a merge
mergePolicy.setSegmentsPerTier(100);
// ... but if we do end up merging them then do them all
mergePolicy.setMaxMergeAtOnce(100);
// always use compound segments to avoid fsync overhead
mergePolicy.setNoCFSRatio(1.0);
// segments are mostly tiny, so don't pretend they are bigger
mergePolicy.setFloorSegmentMB(0.001);

return mergePolicy;
}

/**
* Encapsulates a single {@link IndexWriter} with its {@link Directory} for ease of closing, and a {@link Logger}. There is one of these
* for each data path.
Expand Down Expand Up @@ -522,7 +554,15 @@ void flush() throws IOException {
this.indexWriter.flush();
}

void startWrite() {
// Disable merges during indexing - many older segments will ultimately contain no live docs and simply get deleted.
indexWriter.getConfig().setMergePolicy(NO_MERGE_POLICY);
}

void prepareCommit(String nodeId, long currentTerm, long lastAcceptedVersion) throws IOException {
indexWriter.getConfig().setMergePolicy(DEFAULT_MERGE_POLICY);
indexWriter.maybeMerge();

final Map<String, String> commitData = new HashMap<>(COMMIT_DATA_SIZE);
commitData.put(CURRENT_TERM_KEY, Long.toString(currentTerm));
commitData.put(LAST_ACCEPTED_VERSION_KEY, Long.toString(lastAcceptedVersion));
Expand Down Expand Up @@ -594,6 +634,11 @@ public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState)
ensureOpen();
try {
final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();

for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
metadataIndexWriter.startWrite();
}

final WriterStats stats = overwriteMetadata(clusterState.metadata());
commit(currentTerm, clusterState.version());
fullStateWritten = true;
Expand Down Expand Up @@ -623,6 +668,11 @@ void writeIncrementalStateAndCommit(long currentTerm, ClusterState previousClust

try {
final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();

for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
metadataIndexWriter.startWrite();
}

final WriterStats stats = updateMetadata(previousClusterState.metadata(), clusterState.metadata());
commit(currentTerm, clusterState.version());
final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -421,9 +422,8 @@ public void sync(Collection<String> names) {
assertFalse(writer.isOpen());
}

// check if we can open writer again
// noinspection EmptyTryBlock - we are just checking that opening the writer again doesn't throw any exceptions
try (Writer ignored = persistedClusterStateService.createWriter()) {

}
}
}
Expand Down Expand Up @@ -469,9 +469,8 @@ public void rename(String source, String dest) throws IOException {
assertFalse(writer.isOpen());
}

// check if we can open writer again
// noinspection EmptyTryBlock - we are just checking that opening the writer again doesn't throw any exceptions
try (Writer ignored = persistedClusterStateService.createWriter()) {

}
}
}
Expand Down Expand Up @@ -881,6 +880,66 @@ public void testFailsIfCorrupt() throws IOException {
}
}

public void testLimitsFileCount() throws IOException {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment);

try (Writer writer = persistedClusterStateService.createWriter()) {

ClusterState clusterState = ClusterState.EMPTY_STATE;
writer.writeFullStateAndCommit(1, ClusterState.EMPTY_STATE);

final int indexCount = between(2, usually() ? 20 : 1000);

final int maxSegmentCount = (indexCount / 100) + 100; // only expect to have two tiers, each with max 100 segments
final int filesPerSegment = 3; // .cfe, .cfs, .si
final int extraFiles = 2; // segments_*, write.lock
final int maxFileCount = (maxSegmentCount * filesPerSegment) + extraFiles;

logger.info("--> adding [{}] indices one-by-one, verifying file count does not exceed [{}]", indexCount, maxFileCount);
for (int i = 0; i < indexCount; i++) {
final ClusterState previousClusterState = clusterState;

clusterState = ClusterState.builder(clusterState)
.metadata(Metadata.builder(clusterState.metadata())
.version(i + 2)
.put(IndexMetadata.builder("index-" + i)
.settings(Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())))))
.incrementVersion().build();

writer.writeIncrementalStateAndCommit(1, previousClusterState, clusterState);

for (Path dataPath : nodeEnvironment.nodeDataPaths()) {
try (DirectoryStream<Path> files
= Files.newDirectoryStream(dataPath.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME))) {

int fileCount = 0;
final List<String> fileNames = new ArrayList<>();
for (Path filePath : files) {
final String fileName = filePath.getFileName().toString();
if (ExtrasFS.isExtra(fileName) == false) {
fileNames.add(fileName);
fileCount += 1;
}
}

if (maxFileCount < fileCount) {
// don't bother preparing the description unless we are failing
fileNames.sort(Comparator.naturalOrder());
fail("after " + indexCount + " indices have " + fileCount + " files vs max of " + maxFileCount + ": " +
fileNames);
}
}
}
}
}
}
}

private void assertExpectedLogs(long currentTerm, ClusterState previousState, ClusterState clusterState,
PersistedClusterStateService.Writer writer, MockLogAppender.LoggingExpectation expectation)
throws IllegalAccessException, IOException {
Expand Down

0 comments on commit 5ee9bde

Please sign in to comment.