From 755a25a41e508bc9e88a0572f5282307323b1077 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 5 Jun 2018 14:11:19 -0400 Subject: [PATCH] Replace exact numDocs by soft-del count in SegmentInfo (#31086) This PR adapts/utilizes recent enhancements in Lucene-7.4: - Replaces exactNumDocs by the soft-deletes count in SegmentCommitInfo. This enhancement allows us to back out changes introduced in #30228. - Always configure the soft-deletes field in IWC --- .../elasticsearch/common/lucene/Lucene.java | 15 ++----- .../index/engine/CommitStats.java | 4 +- .../elasticsearch/index/engine/Engine.java | 4 +- .../index/shard/StoreRecovery.java | 1 + .../org/elasticsearch/index/store/Store.java | 13 +----- .../translog/TruncateTranslogCommand.java | 2 + .../indices/flush/SyncedFlushService.java | 17 +++----- .../recovery/PeerRecoveryTargetService.java | 4 +- .../blobstore/BlobStoreRepository.java | 1 + .../index/store/CorruptedFileIT.java | 2 - .../PeerRecoveryTargetServiceTests.java | 34 +-------------- .../test/InternalTestCluster.java | 43 +++++-------------- 12 files changed, 33 insertions(+), 107 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 25138a2909606..5fe10d8fc684f 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -44,7 +44,6 @@ import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; -import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.FieldDoc; @@ -145,21 +144,11 @@ public static Iterable files(SegmentInfos infos) throws IOException { public static int getNumDocs(SegmentInfos info) { int numDocs = 0; for (SegmentCommitInfo si : info) { - numDocs += si.info.maxDoc() - si.getDelCount(); + numDocs += si.info.maxDoc() - si.getDelCount() - si.getSoftDelCount(); } return numDocs; } - /** - * Unlike {@link #getNumDocs(SegmentInfos)} this method returns a numDocs that always excludes soft-deleted docs. - * This method is expensive thus prefer using {@link #getNumDocs(SegmentInfos)} unless an exact numDocs is required. - */ - public static int getExactNumDocs(IndexCommit commit) throws IOException { - try (DirectoryReader reader = DirectoryReader.open(commit)) { - return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs(); - } - } - /** * Reads the segments infos from the given commit, failing if it fails to load */ @@ -212,6 +201,7 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc } final CommitPoint cp = new CommitPoint(si, directory); try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER) + .setSoftDeletesField(Lucene.SOFT_DELETE_FIELD) .setIndexCommit(cp) .setCommitOnClose(false) .setMergePolicy(NoMergePolicy.INSTANCE) @@ -235,6 +225,7 @@ public static void cleanLuceneIndex(Directory directory) throws IOException { } } try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER) + .setSoftDeletesField(Lucene.SOFT_DELETE_FIELD) .setMergePolicy(NoMergePolicy.INSTANCE) // no merges .setCommitOnClose(false) // no commits .setOpenMode(IndexWriterConfig.OpenMode.CREATE))) // force creation - don't append... diff --git a/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java b/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java index 8fbbe3a9deaa9..21025046b8c57 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java @@ -39,13 +39,13 @@ public final class CommitStats implements Streamable, ToXContentFragment { private String id; // lucene commit id in base 64; private int numDocs; - public CommitStats(SegmentInfos segmentInfos, int numDocs) { + public CommitStats(SegmentInfos segmentInfos) { // clone the map to protect against concurrent changes userData = MapBuilder.newMapBuilder().putAll(segmentInfos.getUserData()).immutableMap(); // lucene calls the current generation, last generation. generation = segmentInfos.getLastGeneration(); id = Base64.getEncoder().encodeToString(segmentInfos.getId()); - this.numDocs = numDocs; + numDocs = Lucene.getNumDocs(segmentInfos); } private CommitStats() { diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index d0aaab1dbc5a0..874f03ae434de 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -632,9 +632,7 @@ protected final void ensureOpen() { /** get commits stats for the last commit */ public CommitStats commitStats() { - try (Engine.Searcher searcher = acquireSearcher("commit_stats", Engine.SearcherScope.INTERNAL)) { - return new CommitStats(getLastCommittedSegmentInfos(), searcher.reader().numDocs()); - } + return new CommitStats(getLastCommittedSegmentInfos()); } /** diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 54718c545a44e..0a03e8601b42d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -156,6 +156,7 @@ void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory ta final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target); IndexWriterConfig iwc = new IndexWriterConfig(null) + .setSoftDeletesField(Lucene.SOFT_DELETE_FIELD) .setCommitOnClose(false) // we don't want merges to happen here - we call maybe merge on the engine // later once we stared it up otherwise we would need to wait for it here diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index c76169f40b43e..68b672de1ee25 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -864,7 +864,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg Map commitUserDataBuilder = new HashMap<>(); try { final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory); - numDocs = Lucene.getExactNumDocs(commit != null ? commit : findIndexCommit(directory, segmentCommitInfos)); + numDocs = Lucene.getNumDocs(segmentCommitInfos); commitUserDataBuilder.putAll(segmentCommitInfos.getUserData()); Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version. for (SegmentCommitInfo info : segmentCommitInfos) { @@ -947,16 +947,6 @@ public static void hashFile(BytesRefBuilder fileHash, InputStream in, long size) assert fileHash.length() == len : Integer.toString(fileHash.length()) + " != " + Integer.toString(len); } - private static IndexCommit findIndexCommit(Directory directory, SegmentInfos sis) throws IOException { - List commits = DirectoryReader.listCommits(directory); - for (IndexCommit commit : commits) { - if (commit.getSegmentsFileName().equals(sis.getSegmentsFileName())) { - return commit; - } - } - throw new IOException("Index commit [" + sis.getSegmentsFileName() + "] is not found"); - } - @Override public Iterator iterator() { return metadata.values().iterator(); @@ -1604,6 +1594,7 @@ private static IndexWriter newIndexWriter(final IndexWriterConfig.OpenMode openM throws IOException { assert openMode == IndexWriterConfig.OpenMode.APPEND || commit == null : "can't specify create flag with a commit"; IndexWriterConfig iwc = new IndexWriterConfig(null) + .setSoftDeletesField(Lucene.SOFT_DELETE_FIELD) .setCommitOnClose(false) .setIndexCommit(commit) // we don't want merges to happen here - we call maybe merge on the engine diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java index b8bd93e05a6f8..56a084196131f 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java @@ -33,6 +33,7 @@ import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.NativeFSLockFactory; import org.apache.lucene.store.OutputStreamDataOutput; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cli.EnvironmentAwareCommand; @@ -179,6 +180,7 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th terminal.println("Marking index with the new history uuid"); // commit the new histroy id IndexWriterConfig iwc = new IndexWriterConfig(null) + .setSoftDeletesField(Lucene.SOFT_DELETE_FIELD) .setCommitOnClose(false) // we don't want merges to happen here - we call maybe merge on the engine // later once we stared it up otherwise we would need to wait for it here diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index 5b35660ac30e8..63210807deebb 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -19,7 +19,6 @@ package org.elasticsearch.indices.flush; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -42,13 +41,13 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; @@ -468,19 +467,15 @@ public String executor() { } } - private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) throws IOException { + private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) { IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true); logger.trace("{} performing pre sync flush", request.shardId()); indexShard.flush(flushRequest); - try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) { - final SegmentInfos segmentInfos = Lucene.readSegmentInfos(commitRef.getIndexCommit()); - final int numDocs = Lucene.getExactNumDocs(commitRef.getIndexCommit()); - final Engine.CommitId commitId = new Engine.CommitId(segmentInfos.getId()); - final String syncId = segmentInfos.userData.get(Engine.SYNC_COMMIT_ID); - logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, numDocs); - return new PreSyncedFlushResponse(commitId, numDocs, syncId); - } + final CommitStats commitStats = indexShard.commitStats(); + final Engine.CommitId commitId = commitStats.getRawCommitId(); + logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs()); + return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId()); } private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 3e09312bec86f..cb49eed25f8fe 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -289,7 +289,7 @@ public RecoveryResponse newInstance() { * @param recoveryTarget the target of the recovery * @return a snapshot of the store metadata */ - static Store.MetadataSnapshot getStoreMetadataSnapshot(final Logger logger, final RecoveryTarget recoveryTarget) { + private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) { try { return recoveryTarget.indexShard().snapshotStoreMetadata(); } catch (final org.apache.lucene.index.IndexNotFoundException e) { @@ -312,7 +312,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove final StartRecoveryRequest request; logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); - final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(logger, recoveryTarget); + final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget); logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size()); final long startingSeqNo; diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 618dd3b8bc3b9..b01a1363c1cf6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1425,6 +1425,7 @@ public void restore() throws IOException { // empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty // shard anyway, we just create the empty shard here and then exit. IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(null) + .setSoftDeletesField(Lucene.SOFT_DELETE_FIELD) .setOpenMode(IndexWriterConfig.OpenMode.CREATE) .setCommitOnClose(true)); writer.close(); diff --git a/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java b/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java index c4292410d0ac5..d3119bbc0fdc4 100644 --- a/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -23,7 +23,6 @@ import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; @@ -108,7 +107,6 @@ import static org.hamcrest.Matchers.notNullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE) -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/30228") // What if DV is corrupted? public class CorruptedFileIT extends ESIntegTestCase { @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 9c4c1c1e736fd..7b1003a862481 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -24,18 +24,16 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.NoMergePolicy; -import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.translog.Translog; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import static org.hamcrest.Matchers.equalTo; @@ -94,6 +92,7 @@ public void testGetStartingSeqNo() throws Exception { replica.close("test", false); final List commits = DirectoryReader.listCommits(replica.store().directory()); IndexWriterConfig iwc = new IndexWriterConfig(null) + .setSoftDeletesField(Lucene.SOFT_DELETE_FIELD) .setCommitOnClose(false) .setMergePolicy(NoMergePolicy.INSTANCE) .setOpenMode(IndexWriterConfig.OpenMode.APPEND); @@ -111,33 +110,4 @@ public void testGetStartingSeqNo() throws Exception { closeShards(replica); } } - - public void testExactNumDocsInStoreMetadataSnapshot() throws Exception { - final IndexShard replica = newShard(false); - recoveryEmptyReplica(replica); - long flushedDocs = 0; - final int numDocs = scaledRandomIntBetween(1, 20); - final Set docIds = new HashSet<>(); - for (int i = 0; i < numDocs; i++) { - String id = Integer.toString(i); - docIds.add(id); - indexDoc(replica, "_doc", id); - if (randomBoolean()) { - replica.flush(new FlushRequest()); - flushedDocs = docIds.size(); - } - } - for (String id : randomSubsetOf(docIds)) { - deleteDoc(replica, "_doc", id); - docIds.remove(id); - if (randomBoolean()) { - replica.flush(new FlushRequest()); - flushedDocs = docIds.size(); - } - } - final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); - assertThat(PeerRecoveryTargetService.getStoreMetadataSnapshot(logger, recoveryTarget).getNumDocs(), equalTo(flushedDocs)); - recoveryTarget.decRef(); - closeShards(replica); - } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index b0eef695b7ff1..d39a0ad2aedaf 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -26,8 +26,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomStrings; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; @@ -77,10 +75,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.EngineTestCase; -import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -1105,7 +1100,7 @@ public void beforeIndexDeletion() throws Exception { // ElasticsearchIntegrationTest must override beforeIndexDeletion() to avoid failures. assertNoPendingIndexOperations(); //check that shards that have same sync id also contain same number of documents - assertSameSyncIdSameDocs(); + assertSameSyncIdSameDocs(); assertOpenTranslogReferences(); } @@ -1116,16 +1111,16 @@ private void assertSameSyncIdSameDocs() { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - Tuple commitStats = commitStats(indexShard); - if (commitStats != null) { - String syncId = commitStats.v1(); - long liveDocsOnShard = commitStats.v2(); - if (docsOnShards.get(syncId) != null) { - assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + - ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), - equalTo(liveDocsOnShard)); - } else { - docsOnShards.put(syncId, liveDocsOnShard); + CommitStats commitStats = indexShard.commitStats(); + if (commitStats != null) { // null if the engine is closed or if the shard is recovering + String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID); + if (syncId != null) { + long liveDocsOnShard = commitStats.getNumDocs(); + if (docsOnShards.get(syncId) != null) { + assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard)); + } else { + docsOnShards.put(syncId, liveDocsOnShard); + } } } } @@ -1133,22 +1128,6 @@ private void assertSameSyncIdSameDocs() { } } - private Tuple commitStats(IndexShard indexShard) { - try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) { - final String syncId = commitRef.getIndexCommit().getUserData().get(Engine.SYNC_COMMIT_ID); - // Only read if sync_id exists - if (Strings.hasText(syncId)) { - return Tuple.tuple(syncId, Lucene.getExactNumDocs(commitRef.getIndexCommit())); - } else { - return null; - } - } catch (IllegalIndexShardStateException ex) { - return null; // Shard is closed or not started yet. - } catch (IOException ex) { - throw new AssertionError(ex); - } - } - private void assertNoPendingIndexOperations() throws Exception { assertBusy(() -> { final Collection nodesAndClients = nodes.values();