Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Untangle Engine Constructor logic #28245

Merged
merged 39 commits into from
Mar 14, 2018
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
2826b41
basic utils in place
bleskes Jan 13, 2018
922bdb4
Merge remote-tracking branch 'upstream/master' into engine_simple_ope…
bleskes Jan 13, 2018
d9c236b
integrated
bleskes Jan 14, 2018
e1ed1dc
Merge remote-tracking branch 'upstream/master' into engine_simple_ope…
bleskes Jan 14, 2018
5be23dc
lint
bleskes Jan 14, 2018
88bd738
fix index shard assertion
bleskes Jan 14, 2018
7816210
fix translog tests
bleskes Jan 15, 2018
e13f0f2
fix recovery tests
bleskes Jan 15, 2018
963e86e
remove assertion openEngineAndRecoverFromTranslog is now used by all …
bleskes Jan 15, 2018
aba67a4
fix RefreshListenersTests
bleskes Jan 15, 2018
8d1fe79
fix testShardActiveDuringPeerRecovery
bleskes Jan 15, 2018
4bc7f9b
fix testStressMaybeFlushOrRollTranslogGeneration
bleskes Jan 15, 2018
d6d3f6c
fix testMaybeFlush
bleskes Jan 15, 2018
0bbd50b
properly acquire store reference in RecoveryTarget#cleanFiles
bleskes Jan 15, 2018
152aca3
fix translog retention rest tests
bleskes Jan 15, 2018
f52ffd6
Merge remote-tracking branch 'upstream/master' into engine_simple_ope…
bleskes Jan 15, 2018
2c36b88
fix flush docs
bleskes Jan 15, 2018
e78d93d
merge from master
bleskes Jan 20, 2018
5b9b8cb
fix static method and compilation
bleskes Jan 20, 2018
ac96ede
missing sync
bleskes Jan 21, 2018
3b11801
fix InternalEngineTests
bleskes Jan 21, 2018
0b942b0
Merge remote-tracking branch 'upstream/master' into engine_simple_ope…
bleskes Jan 21, 2018
8fa3106
Merge branch 'master' into engine_simple_opening
bleskes Jan 22, 2018
3a81d8f
Merge remote-tracking branch 'upstream/master' into engine_simple_ope…
bleskes Jan 22, 2018
7251980
Merge remote-tracking branch 'upstream/master' into engine_simple_ope…
bleskes Feb 7, 2018
aad5774
fix tests
bleskes Feb 7, 2018
7086f92
remove invalid assertion that was merged
bleskes Feb 7, 2018
0e98b3c
Merge remote-tracking branch 'upstream/master' into engine_simple_ope…
bleskes Feb 12, 2018
627c7d6
Merge remote-tracking branch 'upstream/master' into engine_simple_ope…
bleskes Feb 12, 2018
5e58fb2
Merge remote-tracking branch 'upstream/master' into engine_simple_ope…
bleskes Mar 2, 2018
d5f5a0c
tweak translog recovery
bleskes Mar 4, 2018
5a4e957
roll back changes to 20_translog.yml test
bleskes Mar 7, 2018
f1114bd
Merge remote-tracking branch 'upstream/master' into engine_simple_ope…
bleskes Mar 8, 2018
1bc2a64
Merge remote-tracking branch 'upstream/master' into engine_simple_ope…
bleskes Mar 11, 2018
b8ecd0e
java docs and dedicated uni tests
bleskes Mar 11, 2018
f8ff0aa
license FTW
bleskes Mar 11, 2018
37853c9
lint
bleskes Mar 11, 2018
72b9cb0
Merge remote-tracking branch 'upstream/master' into engine_simple_ope…
bleskes Mar 14, 2018
7139116
feedback
bleskes Mar 14, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/reference/indices/flush.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ which returns something similar to:
{
"commit" : {
"id" : "3M3zkw2GHMo2Y4h4/KFKCg==",
"generation" : 2,
"generation" : 3,
"user_data" : {
"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA",
"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ",
"local_checkpoint" : "-1",
"translog_generation" : "2",
"translog_generation" : "3",
"max_seq_no" : "-1",
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
"max_unsafe_auto_id_timestamp" : "-1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,14 @@
public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final Logger logger;
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;
private final IndexCommit startingCommit;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile IndexCommit lastCommit; // the most recent commit point

CombinedDeletionPolicy(EngineConfig.OpenMode openMode, Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
this.openMode = openMode;
this.logger = logger;
this.translogDeletionPolicy = translogDeletionPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
Expand All @@ -65,25 +63,11 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {

@Override
public synchronized void onInit(List<? extends IndexCommit> commits) throws IOException {
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
assert startingCommit == null : "CREATE_INDEX_AND_TRANSLOG must not have starting commit; commit [" + startingCommit + "]";
break;
case OPEN_INDEX_CREATE_TRANSLOG:
case OPEN_INDEX_AND_TRANSLOG:
assert commits.isEmpty() == false : "index is opened, but we have no commits";
assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
+ "startingCommit [" + startingCommit + "], commit list [" + commits + "]";
keepOnlyStartingCommitOnInit(commits);
// OPEN_INDEX_CREATE_TRANSLOG can open an index commit from other shard with a different translog history,
// We therefore should not use that index commit to update the translog deletion policy.
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
updateTranslogDeletionPolicy();
}
break;
default:
throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
}
assert commits.isEmpty() == false : "index is opened, but we have no commits";
assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
+ "startingCommit [" + startingCommit + "], commit list [" + commits + "]";
keepOnlyStartingCommitOnInit(commits);
updateTranslogDeletionPolicy();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public final class EngineConfig {
private final List<ReferenceManager.RefreshListener> internalRefreshListener;
@Nullable
private final Sort indexSort;
private final boolean forceNewHistoryUUID;
private final TranslogRecoveryRunner translogRecoveryRunner;
@Nullable
private final CircuitBreakerService circuitBreakerService;
Expand Down Expand Up @@ -113,24 +112,20 @@ public final class EngineConfig {
Property.IndexScope, Property.Dynamic);

private final TranslogConfig translogConfig;
private final OpenMode openMode;

/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, ThreadPool threadPool,
public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
IndexSettings indexSettings, Engine.Warmer warmer, Store store,
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter,
TranslogConfig translogConfig, TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
this.shardId = shardId;
this.allocationId = allocationId;
this.indexSettings = indexSettings;
Expand All @@ -151,8 +146,6 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
this.queryCachingPolicy = queryCachingPolicy;
this.translogConfig = translogConfig;
this.flushMergesAfter = flushMergesAfter;
this.openMode = openMode;
this.forceNewHistoryUUID = forceNewHistoryUUID;
this.externalRefreshListener = externalRefreshListener;
this.internalRefreshListener = internalRefreshListener;
this.indexSort = indexSort;
Expand Down Expand Up @@ -315,22 +308,6 @@ public TranslogConfig getTranslogConfig() {
*/
public TimeValue getFlushMergesAfter() { return flushMergesAfter; }

/**
* Returns the {@link OpenMode} for this engine config.
*/
public OpenMode getOpenMode() {
return openMode;
}


/**
* Returns true if a new history uuid must be generated. If false, a new uuid will only be generated if no existing
* one is found.
*/
public boolean getForceNewHistoryUUID() {
return forceNewHistoryUUID;
}

@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
Expand All @@ -343,20 +320,6 @@ public TranslogRecoveryRunner getTranslogRecoveryRunner() {
return translogRecoveryRunner;
}

/**
* Engine open mode defines how the engine should be opened or in other words what the engine should expect
* to recover from. We either create a brand new engine with a new index and translog or we recover from an existing index.
* If the index exists we also have the ability open only the index and create a new transaction log which happens
* during remote recovery since we have already transferred the index files but the translog is replayed from remote. The last
* and safest option opens the lucene index as well as it's referenced transaction log for a translog recovery.
* See also {@link Engine#recoverFromTranslog()}
*/
public enum OpenMode {
CREATE_INDEX_AND_TRANSLOG,
OPEN_INDEX_CREATE_TRANSLOG,
OPEN_INDEX_AND_TRANSLOG;
}

/**
* The refresh listeners to add to Lucene for externally visible refreshes
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.Directory;
import org.elasticsearch.Assertions;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs javadocs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add.

public final class EngineDiskUtils {

private EngineDiskUtils() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as an alternative you can mark it as final abstract then you don't need the private ctor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't care. I've seen requests to go both ways. I just want people to be happy. I'll go with abstract. I

}

/**
* creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted.
*/
public static void createEmpty(final Directory dir, final Path translogPath, final ShardId shardId) throws IOException {
try (IndexWriter writer = newIndexWriter(true, dir)) {
final String translogUuid = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId);
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUuid);
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
map.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, "-1");
updateCommitData(writer, map);
}
}


/**
* Converts an existing lucene index and marks it with a new history uuid. Also creates a new empty translog file.
* This is used to make sure no existing shard will recovery from this index using ops based recovery.
*/
public static void bootstrapNewHistoryFromLuceneIndex(final Directory dir, final Path translogPath, final ShardId shardId)
throws IOException {
try (IndexWriter writer = newIndexWriter(false, dir)) {
final Map<String, String> userData = getUserData(writer);
final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO));
final String translogUuid = Translog.createEmptyTranslog(translogPath, maxSeqNo, shardId);
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUuid);
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
updateCommitData(writer, map);
}
}

/**
* Creates a new empty translog and associates it with an existing lucene index.
*/
public static void createNewTranslog(final Directory dir, final Path translogPath, long initialGlobalCheckpoint, final ShardId shardId)
throws IOException {
if (Assertions.ENABLED) {
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(dir);
assert existingCommits.size() == 1 : "creating a translog translog should have one commit, commits[" + existingCommits + "]";
SequenceNumbers.CommitInfo commitInfo = Store.loadSeqNoInfo(existingCommits.get(0));
assert commitInfo.localCheckpoint >= initialGlobalCheckpoint :
"trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint ["
+ initialGlobalCheckpoint + "]";
}

try (IndexWriter writer = newIndexWriter(false, dir)) {
final String translogUuid = Translog.createEmptyTranslog(translogPath, initialGlobalCheckpoint, shardId);
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUuid);
updateCommitData(writer, map);
}
}


/**
* Checks that the Lucene index contains a history uuid marker. If not, a new one is generated and committed.
*/
public static void ensureIndexHasHistoryUUID(final Directory dir) throws IOException {
try (IndexWriter writer = newIndexWriter(false, dir)) {
final Map<String, String> userData = getUserData(writer);
if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) {
updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()));
}
}
}

private static void updateCommitData(IndexWriter writer, Map<String, String> keysToUpdate) throws IOException {
final Map<String, String> userData = getUserData(writer);
userData.putAll(keysToUpdate);
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}

private static Map<String, String> getUserData(IndexWriter writer) {
final Map<String, String> userData = new HashMap<>();
writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));
return userData;
}

private static IndexWriter newIndexWriter(final boolean create, final Directory dir) throws IOException {
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
return new IndexWriter(dir, iwc);
}
}
Loading