Skip to content

Commit

Permalink
Account soft-deletes in FrozenEngine (elastic#51192)
Browse files Browse the repository at this point in the history
Currently, we do not exclude soft-deleted documents when opening index 
reader in the FrozenEngine.

Relates elastic#50775
  • Loading branch information
dnhatn authored and SivagurunathanV committed Jan 21, 2020
1 parent 3c1529a commit d804ff0
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 4 deletions.
Expand Up @@ -54,7 +54,7 @@ public NoOpEngine(EngineConfig config) {
super(config, null, null, true, Function.identity());
this.stats = new SegmentsStats();
Directory directory = store.directory();
try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) {
try (DirectoryReader reader = openDirectory(directory)) {
for (LeafReaderContext ctx : reader.getContext().leaves()) {
SegmentReader segmentReader = Lucene.segmentReader(ctx.reader());
fillSegmentStats(segmentReader, true, stats);
Expand Down
Expand Up @@ -68,7 +68,7 @@ public class ReadOnlyEngine extends Engine {
* Reader attributes used for read only engines. These attributes prevent loading term dictionaries on-heap even if the field is an
* ID field if we are reading form memory maps.
*/
public static final Map<String, String> OFF_HEAP_READER_ATTRIBUTES = Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY,
private static final Map<String, String> OFF_HEAP_READER_ATTRIBUTES = Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY,
BlockTreeTermsReader.FSTLoadMode.AUTO.name());
private final SegmentInfos lastCommittedSegmentInfos;
private final SeqNoStats seqNoStats;
Expand Down Expand Up @@ -528,4 +528,8 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {
assert maxSeqNoOfUpdatesOnPrimary <= getMaxSeqNoOfUpdatesOrDeletes() :
maxSeqNoOfUpdatesOnPrimary + ">" + getMaxSeqNoOfUpdatesOrDeletes();
}

protected static DirectoryReader openDirectory(Directory dir) throws IOException {
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(dir, OFF_HEAP_READER_ATTRIBUTES), Lucene.SOFT_DELETES_FIELD);
}
}
Expand Up @@ -77,7 +77,7 @@ public FrozenEngine(EngineConfig config) {

boolean success = false;
Directory directory = store.directory();
try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) {
try (DirectoryReader reader = openDirectory(directory)) {
canMatchReader = ElasticsearchDirectoryReader.wrap(new RewriteCachingDirectoryReader(directory, reader.leaves()),
config.getShardId());
// we record the segment stats here - that's what the reader needs when it's open and it give the user
Expand Down Expand Up @@ -167,7 +167,7 @@ private synchronized ElasticsearchDirectoryReader getOrOpenReader() throws IOExc
for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) {
listeners.beforeRefresh();
}
final DirectoryReader dirReader = DirectoryReader.open(engineConfig.getStore().directory(), OFF_HEAP_READER_ATTRIBUTES);
final DirectoryReader dirReader = openDirectory(engineConfig.getStore().directory());
reader = lastOpenedReader = wrapReader(dirReader, Function.identity());
processReader(reader);
reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed);
Expand Down
Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static org.hamcrest.Matchers.equalTo;

public class FrozenEngineTests extends EngineTestCase {

public void testAcquireReleaseReset() throws IOException {
Expand Down Expand Up @@ -328,4 +330,30 @@ public void testCanMatch() throws IOException {
}
}
}

public void testSearchers() throws Exception {
IOUtils.close(engine, store);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, null,
globalCheckpoint::get, new NoneCircuitBreakerService());
final int totalDocs;
try (InternalEngine engine = createEngine(config)) {
applyOperations(engine, generateHistoryOnReplica(between(10, 1000), false, randomBoolean(), randomBoolean()));
globalCheckpoint.set(engine.getProcessedLocalCheckpoint());
engine.syncTranslog();
engine.flush();
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
totalDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE).scoreDocs.length;
}
}
try (FrozenEngine frozenEngine = new FrozenEngine(config)) {
try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
assertThat(topDocs.scoreDocs.length, equalTo(totalDocs));
}
}
}
}
}
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.test.StreamsUtils;
Expand Down Expand Up @@ -595,4 +596,41 @@ private Map<String, Object> getJob(Map<String, Object> jobsMap, String targetJob
}
return null;
}

public void testFrozenIndexAfterRestarted() throws Exception {
final String index = "test_frozen_index";
if (isRunningAgainstOldCluster()) {
Settings.Builder settings = Settings.builder();
if (minimumNodeVersion().before(Version.V_8_0_0) && randomBoolean()) {
settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean());
}
String mappings = randomBoolean() ? "\"_source\": { \"enabled\": false}" : null;
createIndex(index, settings.build(), mappings);
ensureGreen(index);
int numDocs = randomIntBetween(10, 500);
for (int i = 0; i < numDocs; i++) {
int id = randomIntBetween(0, 100);
final Request indexRequest = new Request("POST", "/" + index + "/" + "_doc/" + id);
indexRequest.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("f", "v").endObject()));
assertOK(client().performRequest(indexRequest));
if (rarely()) {
flush(index, randomBoolean());
}
}
} else {
ensureGreen(index);
final int totalHits = (int) XContentMapValues.extractValue("hits.total.value",
entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
assertOK(client().performRequest(new Request("POST", index + "/_freeze")));
ensureGreen(index);
assertNoFileBasedRecovery(index, n -> true);
final Request request = new Request("GET", "/" + index + "/_search");
request.addParameter("ignore_throttled", "false");
assertThat(XContentMapValues.extractValue("hits.total.value", entityAsMap(client().performRequest(request))),
equalTo(totalHits));
assertOK(client().performRequest(new Request("POST", index + "/_unfreeze")));
ensureGreen(index);
assertNoFileBasedRecovery(index, n -> true);
}
}
}

0 comments on commit d804ff0

Please sign in to comment.