Skip to content

Commit

Permalink
Replace exact numDocs by soft-del count in SegmentInfo (#31086)
Browse files Browse the repository at this point in the history
This PR adapts/utilizes recent enhancements in Lucene-7.4:

- Replaces exactNumDocs by the soft-deletes count in SegmentCommitInfo.
This enhancement allows us to back out changes introduced in #30228.

- Always configure the soft-deletes field in IWC
  • Loading branch information
dnhatn committed Jun 5, 2018
1 parent 91de0ba commit 755a25a
Show file tree
Hide file tree
Showing 12 changed files with 33 additions and 107 deletions.
15 changes: 3 additions & 12 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FieldDoc;
Expand Down Expand Up @@ -145,21 +144,11 @@ public static Iterable<String> files(SegmentInfos infos) throws IOException {
public static int getNumDocs(SegmentInfos info) {
int numDocs = 0;
for (SegmentCommitInfo si : info) {
numDocs += si.info.maxDoc() - si.getDelCount();
numDocs += si.info.maxDoc() - si.getDelCount() - si.getSoftDelCount();
}
return numDocs;
}

/**
* Unlike {@link #getNumDocs(SegmentInfos)} this method returns a numDocs that always excludes soft-deleted docs.
* This method is expensive thus prefer using {@link #getNumDocs(SegmentInfos)} unless an exact numDocs is required.
*/
public static int getExactNumDocs(IndexCommit commit) throws IOException {
try (DirectoryReader reader = DirectoryReader.open(commit)) {
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs();
}
}

/**
* Reads the segments infos from the given commit, failing if it fails to load
*/
Expand Down Expand Up @@ -212,6 +201,7 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc
}
final CommitPoint cp = new CommitPoint(si, directory);
try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
.setIndexCommit(cp)
.setCommitOnClose(false)
.setMergePolicy(NoMergePolicy.INSTANCE)
Expand All @@ -235,6 +225,7 @@ public static void cleanLuceneIndex(Directory directory) throws IOException {
}
}
try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
.setMergePolicy(NoMergePolicy.INSTANCE) // no merges
.setCommitOnClose(false) // no commits
.setOpenMode(IndexWriterConfig.OpenMode.CREATE))) // force creation - don't append...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ public final class CommitStats implements Streamable, ToXContentFragment {
private String id; // lucene commit id in base 64;
private int numDocs;

public CommitStats(SegmentInfos segmentInfos, int numDocs) {
public CommitStats(SegmentInfos segmentInfos) {
// clone the map to protect against concurrent changes
userData = MapBuilder.<String, String>newMapBuilder().putAll(segmentInfos.getUserData()).immutableMap();
// lucene calls the current generation, last generation.
generation = segmentInfos.getLastGeneration();
id = Base64.getEncoder().encodeToString(segmentInfos.getId());
this.numDocs = numDocs;
numDocs = Lucene.getNumDocs(segmentInfos);
}

private CommitStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,9 +632,7 @@ protected final void ensureOpen() {

/** get commits stats for the last commit */
public CommitStats commitStats() {
try (Engine.Searcher searcher = acquireSearcher("commit_stats", Engine.SearcherScope.INTERNAL)) {
return new CommitStats(getLastCommittedSegmentInfos(), searcher.reader().numDocs());
}
return new CommitStats(getLastCommittedSegmentInfos());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory ta
final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);

IndexWriterConfig iwc = new IndexWriterConfig(null)
.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
Expand Down
13 changes: 2 additions & 11 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
Map<String, String> commitUserDataBuilder = new HashMap<>();
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
numDocs = Lucene.getExactNumDocs(commit != null ? commit : findIndexCommit(directory, segmentCommitInfos));
numDocs = Lucene.getNumDocs(segmentCommitInfos);
commitUserDataBuilder.putAll(segmentCommitInfos.getUserData());
Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version.
for (SegmentCommitInfo info : segmentCommitInfos) {
Expand Down Expand Up @@ -947,16 +947,6 @@ public static void hashFile(BytesRefBuilder fileHash, InputStream in, long size)
assert fileHash.length() == len : Integer.toString(fileHash.length()) + " != " + Integer.toString(len);
}

private static IndexCommit findIndexCommit(Directory directory, SegmentInfos sis) throws IOException {
List<IndexCommit> commits = DirectoryReader.listCommits(directory);
for (IndexCommit commit : commits) {
if (commit.getSegmentsFileName().equals(sis.getSegmentsFileName())) {
return commit;
}
}
throw new IOException("Index commit [" + sis.getSegmentsFileName() + "] is not found");
}

@Override
public Iterator<StoreFileMetaData> iterator() {
return metadata.values().iterator();
Expand Down Expand Up @@ -1604,6 +1594,7 @@ private static IndexWriter newIndexWriter(final IndexWriterConfig.OpenMode openM
throws IOException {
assert openMode == IndexWriterConfig.OpenMode.APPEND || commit == null : "can't specify create flag with a commit";
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
.setCommitOnClose(false)
.setIndexCommit(commit)
// we don't want merges to happen here - we call maybe merge on the engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cli.EnvironmentAwareCommand;
Expand Down Expand Up @@ -179,6 +180,7 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
terminal.println("Marking index with the new history uuid");
// commit the new histroy id
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.indices.flush;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
Expand All @@ -42,13 +41,13 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -468,19 +467,15 @@ public String executor() {
}
}

private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) throws IOException {
private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true);
logger.trace("{} performing pre sync flush", request.shardId());
indexShard.flush(flushRequest);
try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) {
final SegmentInfos segmentInfos = Lucene.readSegmentInfos(commitRef.getIndexCommit());
final int numDocs = Lucene.getExactNumDocs(commitRef.getIndexCommit());
final Engine.CommitId commitId = new Engine.CommitId(segmentInfos.getId());
final String syncId = segmentInfos.userData.get(Engine.SYNC_COMMIT_ID);
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, numDocs);
return new PreSyncedFlushResponse(commitId, numDocs, syncId);
}
final CommitStats commitStats = indexShard.commitStats();
final Engine.CommitId commitId = commitStats.getRawCommitId();
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs());
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId());
}

private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public RecoveryResponse newInstance() {
* @param recoveryTarget the target of the recovery
* @return a snapshot of the store metadata
*/
static Store.MetadataSnapshot getStoreMetadataSnapshot(final Logger logger, final RecoveryTarget recoveryTarget) {
private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
try {
return recoveryTarget.indexShard().snapshotStoreMetadata();
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
Expand All @@ -312,7 +312,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());

final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(logger, recoveryTarget);
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());

final long startingSeqNo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1425,6 +1425,7 @@ public void restore() throws IOException {
// empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty
// shard anyway, we just create the empty shard here and then exit.
IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(null)
.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
.setOpenMode(IndexWriterConfig.OpenMode.CREATE)
.setCommitOnClose(true));
writer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
Expand Down Expand Up @@ -108,7 +107,6 @@
import static org.hamcrest.Matchers.notNullValue;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE)
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/30228") // What if DV is corrupted?
public class CorruptedFileIT extends ESIntegTestCase {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,16 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.Translog;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -94,6 +92,7 @@ public void testGetStartingSeqNo() throws Exception {
replica.close("test", false);
final List<IndexCommit> commits = DirectoryReader.listCommits(replica.store().directory());
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
.setCommitOnClose(false)
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
Expand All @@ -111,33 +110,4 @@ public void testGetStartingSeqNo() throws Exception {
closeShards(replica);
}
}

public void testExactNumDocsInStoreMetadataSnapshot() throws Exception {
final IndexShard replica = newShard(false);
recoveryEmptyReplica(replica);
long flushedDocs = 0;
final int numDocs = scaledRandomIntBetween(1, 20);
final Set<String> docIds = new HashSet<>();
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
docIds.add(id);
indexDoc(replica, "_doc", id);
if (randomBoolean()) {
replica.flush(new FlushRequest());
flushedDocs = docIds.size();
}
}
for (String id : randomSubsetOf(docIds)) {
deleteDoc(replica, "_doc", id);
docIds.remove(id);
if (randomBoolean()) {
replica.flush(new FlushRequest());
flushedDocs = docIds.size();
}
}
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
assertThat(PeerRecoveryTargetService.getStoreMetadataSnapshot(logger, recoveryTarget).getNumDocs(), equalTo(flushedDocs));
recoveryTarget.decRef();
closeShards(replica);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand Down Expand Up @@ -77,10 +75,7 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -1105,7 +1100,7 @@ public void beforeIndexDeletion() throws Exception {
// ElasticsearchIntegrationTest must override beforeIndexDeletion() to avoid failures.
assertNoPendingIndexOperations();
//check that shards that have same sync id also contain same number of documents
assertSameSyncIdSameDocs();
assertSameSyncIdSameDocs();
assertOpenTranslogReferences();
}

Expand All @@ -1116,39 +1111,23 @@ private void assertSameSyncIdSameDocs() {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
Tuple<String, Integer> commitStats = commitStats(indexShard);
if (commitStats != null) {
String syncId = commitStats.v1();
long liveDocsOnShard = commitStats.v2();
if (docsOnShards.get(syncId) != null) {
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name +
". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId),
equalTo(liveDocsOnShard));
} else {
docsOnShards.put(syncId, liveDocsOnShard);
CommitStats commitStats = indexShard.commitStats();
if (commitStats != null) { // null if the engine is closed or if the shard is recovering
String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID);
if (syncId != null) {
long liveDocsOnShard = commitStats.getNumDocs();
if (docsOnShards.get(syncId) != null) {
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard));
} else {
docsOnShards.put(syncId, liveDocsOnShard);
}
}
}
}
}
}
}

private Tuple<String, Integer> commitStats(IndexShard indexShard) {
try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) {
final String syncId = commitRef.getIndexCommit().getUserData().get(Engine.SYNC_COMMIT_ID);
// Only read if sync_id exists
if (Strings.hasText(syncId)) {
return Tuple.tuple(syncId, Lucene.getExactNumDocs(commitRef.getIndexCommit()));
} else {
return null;
}
} catch (IllegalIndexShardStateException ex) {
return null; // Shard is closed or not started yet.
} catch (IOException ex) {
throw new AssertionError(ex);
}
}

private void assertNoPendingIndexOperations() throws Exception {
assertBusy(() -> {
final Collection<NodeAndClient> nodesAndClients = nodes.values();
Expand Down

0 comments on commit 755a25a

Please sign in to comment.