Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always configure soft-deletes field of IndexWriterConfig #36196

Merged
merged 3 commits into from Dec 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -2108,8 +2108,9 @@ private IndexWriterConfig getIndexWriterConfig() {
// Give us the opportunity to upgrade old segments while performing
// background merges
MergePolicy mergePolicy = config().getMergePolicy();
// always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes.
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
if (softDeleteEnabled) {
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, softDeletesPolicy::getRetentionQuery,
new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery, mergePolicy));
}
Expand Down
Expand Up @@ -5454,6 +5454,34 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
}
}

public void testOpenSoftDeletesIndexWithSoftDeletesDisabled() throws Exception {
try (Store store = createStore()) {
Path translogPath = createTempDir();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final IndexSettings softDeletesEnabled = IndexSettingsModule.newIndexSettings(
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(Settings.builder().
put(defaultSettings.getSettings()).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)).build());
final List<DocIdSeqNoAndTerm> docs;
try (InternalEngine engine = createEngine(
config(softDeletesEnabled, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get))) {
List<Engine.Operation> ops = generateReplicaHistory(between(1, 100), randomBoolean());
applyOperations(engine, ops);
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
engine.flush();
docs = getDocIds(engine, true);
}
final IndexSettings softDeletesDisabled = IndexSettingsModule.newIndexSettings(
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(Settings.builder()
.put(defaultSettings.getSettings()).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false)).build());
EngineConfig config = config(softDeletesDisabled, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get);
trimUnsafeCommits(config);
try (InternalEngine engine = createEngine(config)) {
assertThat(getDocIds(engine, true), equalTo(docs));
}
}
}

static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -704,6 +705,32 @@ public static List<Engine.Operation> generateSingleDocHistory(boolean forReplica
return ops;
}

public List<Engine.Operation> generateReplicaHistory(int numOps, boolean allowGapInSeqNo) {
long seqNo = 0;
List<Engine.Operation> operations = new ArrayList<>(numOps);
for (int i = 0; i < numOps; i++) {
String id = Integer.toString(between(1, 100));
final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null);
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(),
-1, true));
} else if (randomBoolean()) {
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
} else {
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA,
threadPool.relativeTimeInMillis(), "test-" + i));
}
seqNo++;
if (allowGapInSeqNo && rarely()) {
seqNo++;
}
}
Randomness.shuffle(operations);
return operations;
}

public static void assertOpsOnReplica(
final List<Engine.Operation> ops,
final InternalEngine replicaEngine,
Expand Down Expand Up @@ -788,14 +815,7 @@ public static void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngi
int docOffset;
while ((docOffset = offset.incrementAndGet()) < ops.size()) {
try {
final Engine.Operation op = ops.get(docOffset);
if (op instanceof Engine.Index) {
engine.index((Engine.Index) op);
} else if (op instanceof Engine.Delete){
engine.delete((Engine.Delete) op);
} else {
engine.noOp((Engine.NoOp) op);
}
applyOperation(engine, ops.get(docOffset));
if ((docOffset + 1) % 4 == 0) {
engine.refresh("test");
}
Expand All @@ -814,6 +834,36 @@ public static void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngi
}
}

public static void applyOperations(Engine engine, List<Engine.Operation> operations) throws IOException {
for (Engine.Operation operation : operations) {
applyOperation(engine, operation);
if (randomInt(100) < 10) {
engine.refresh("test");
}
if (rarely()) {
engine.flush();
}
}
}

public static Engine.Result applyOperation(Engine engine, Engine.Operation operation) throws IOException {
final Engine.Result result;
switch (operation.operationType()) {
case INDEX:
result = engine.index((Engine.Index) operation);
break;
case DELETE:
result = engine.delete((Engine.Delete) operation);
break;
case NO_OP:
result = engine.noOp((Engine.NoOp) operation);
break;
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
return result;
}

/**
* Gets a collection of tuples of docId, sequence number, and primary term of all live documents in the provided engine.
*/
Expand Down