Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restoring from snapshot should force generation of a new history uuid #26694

Merged
merged 9 commits into from
Sep 19, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public final class EngineConfig {
private final List<ReferenceManager.RefreshListener> refreshListeners;
@Nullable
private final Sort indexSort;
private final boolean forceNewHistoryUUID;
private final TranslogRecoveryRunner translogRecoveryRunner;

/**
Expand Down Expand Up @@ -115,8 +116,9 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig, TimeValue flushMergesAfter, List<ReferenceManager.RefreshListener> refreshListeners,
Sort indexSort, TranslogRecoveryRunner translogRecoveryRunner) {
boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> refreshListeners, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
Expand All @@ -141,6 +143,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, String allocationId, Thr
this.translogConfig = translogConfig;
this.flushMergesAfter = flushMergesAfter;
this.openMode = openMode;
this.forceNewHistoryUUID = forceNewHistoryUUID;
this.refreshListeners = refreshListeners;
this.indexSort = indexSort;
this.translogRecoveryRunner = translogRecoveryRunner;
Expand Down Expand Up @@ -300,6 +303,15 @@ public OpenMode getOpenMode() {
return openMode;
}


/**
* Returns true if a new history uuid must be generated. If false, a new uuid will only be generated if no existing
* one is found.
*/
public boolean getForceNewHistoryUUID() {
return forceNewHistoryUUID;
}

@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,23 +177,15 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
writer = createWriter(false);
String existingHistoryUUID = loadHistoryUUIDFromCommit(writer);
if (existingHistoryUUID == null) {
historyUUID = UUIDs.randomBase64UUID();
} else {
historyUUID = existingHistoryUUID;
}
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
break;
case OPEN_INDEX_CREATE_TRANSLOG:
writer = createWriter(false);
historyUUID = loadHistoryUUIDFromCommit(writer);
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
break;
case CREATE_INDEX_AND_TRANSLOG:
writer = createWriter(true);
historyUUID = UUIDs.randomBase64UUID();
seqNoStats = new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
Expand All @@ -205,9 +197,13 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
logger.trace("recovered [{}]", seqNoStats);
seqNoService = sequenceNumberService(shardId, allocationId, engineConfig.getIndexSettings(), seqNoStats);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
Objects.requireNonNull(historyUUID, "history uuid should not be null");
indexWriter = writer;
translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint());
assert translog.getGeneration() != null;
this.translog = translog;
updateWriterOnOpen();
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
} catch (AssertionError e) {
Expand All @@ -219,8 +215,6 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
throw e;
}
}

this.translog = translog;
manager = createSearcherManager();
this.searcherManager = manager;
this.versionMap.setManager(searcherManager);
Expand Down Expand Up @@ -375,24 +369,32 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, Tra
throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
}
}
final Translog translog = new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier);
if (translogUUID == null) {
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be "
+ EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
boolean success = false;
try {
commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
? commitDataAsMap(writer).get(SYNC_COMMIT_ID) : null);
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(translog);
}
}
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier);
}

/** If needed, updates the metadata in the index writer to match the potentially new translog and history uuid */
private void updateWriterOnOpen() throws IOException {
Objects.requireNonNull(historyUUID);
final Map<String, String> commitUserData = commitDataAsMap(indexWriter);
boolean needsCommit = false;
if (historyUUID.equals(commitUserData.get(HISTORY_UUID_KEY)) == false) {
needsCommit = true;
} else {
assert config().getForceNewHistoryUUID() == false : "config forced a new history uuid but it didn't change";
assert openMode != EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG : "new index but it already has an existing history uuid";
}
if (translog.getTranslogUUID().equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
needsCommit = true;
} else {
assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "translog uuid didn't change but open mode is " + openMode;
}
if (needsCommit) {
commitIndexWriter(indexWriter, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commitIndexWriter still has a check if (historyUUID != null). Is that one obsolete now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. fixing.

? commitUserData.get(SYNC_COMMIT_ID) : null);
}
return translog;
}


@Override
public Translog getTranslog() {
ensureOpen();
Expand Down Expand Up @@ -424,14 +426,17 @@ private String loadTranslogUUIDFromCommit(IndexWriter writer) throws IOException
}

/**
* Reads the current stored history ID from the IW commit data. If the id is not found, returns null.
* Reads the current stored history ID from the IW commit data. Generates a new UUID if not found or if generation is forced.
*/
@Nullable
private String loadHistoryUUIDFromCommit(final IndexWriter writer) throws IOException {
private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean forceNew) throws IOException {
String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
if (uuid == null) {
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) :
"index was created after 6_0_0_rc1 but has no history uuid";
if (uuid == null || forceNew) {
assert
forceNew || // recovery from a local store creates an index that doesn't have yet a history_uuid
openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG ||
config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) :
"existing index was created after 6_0_0_rc1 but has no history uuid";
uuid = UUIDs.randomBase64UUID();
}
return uuid;
}
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2080,10 +2080,24 @@ private DocumentMapperForType docMapper(String type) {

private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
Sort indexSort = indexSortSupplier.get();
final boolean forceNewHistoryUUID;
switch (shardRouting.recoverySource().getType()) {
case EMPTY_STORE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EMPTY_STORE should force a new history uuid (see also my comment here).
EMPTY_STORE is used for two situations:

  • creating a shard of a fresh index: Here, we trivially want to force a fresh UUID.
  • force allocating an empty shard: Here, we are creating a fresh history, so we want to force a fresh UUID too.

Note that this leaves the case of allocating a stale primary, which should also force a fresh history. Here, we have no way at the moment though to detect that based on the recovery source object. For this, if the recovery source is EXISTING_STORE, we can compare the on-disk allocation id (ShardStateMetaData.FORMAT.load(...)) with the expected allocation id in the shard routing object, and if they mismatch, force a new history id. Loading the ShardStateMetaData can happen in the IndexShard constructor.

I'm ok if you do this in a follow-up, but I still think EMPTY_STORE should be correctly set here to force a new history uuid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EMPTY_STORE will always result in a new history uuid as we create a new index writer and it will not have any user data. I'm fine with setting forceNewHistoryUUID to true to be explicit (and later on morph this into just "generateHistoryUUID` once all existing indices are known to have one and we can be explicit about the moments we generate new ones).

Re forcing a stale allocations - good catch. As you say, that's a more complicated one indeed. I think we should do it as a follow up and discuss possible approaches. I'm thinking about a new recovery source. I think this is an edge case we should be explicit about.

case EXISTING_STORE:
case PEER:
forceNewHistoryUUID = false;
break;
case SNAPSHOT:
case LOCAL_SHARDS:
forceNewHistoryUUID = true;
break;
default:
throw new AssertionError("unknown recovery type: [" + shardRouting.recoverySource().getType() + "]");
}
return new EngineConfig(openMode, shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
indexCache.query(), cachingPolicy, translogConfig,
indexCache.query(), cachingPolicy, forceNewHistoryUUID, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort,
this::runTranslogRecovery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.mapper.MapperService;
Expand Down Expand Up @@ -164,11 +162,10 @@ void addIndices(
* document-level semantics.
*/
writer.setLiveCommitData(() -> {
final HashMap<String, String> liveCommitData = new HashMap<>(4);
final HashMap<String, String> liveCommitData = new HashMap<>(3);
liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp));
liveCommitData.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
return liveCommitData.entrySet().iterator();
});
writer.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -82,7 +81,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -271,8 +269,8 @@ public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, An
return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(),
config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(),
config.getTranslogRecoveryRunner());
config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(),
config.getIndexSort(), config.getTranslogRecoveryRunner());
}

@Override
Expand Down Expand Up @@ -453,7 +451,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
refreshListener == null ? emptyList() : Collections.singletonList(refreshListener);
EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store,
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig,
TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler);

return config;
Expand Down Expand Up @@ -2797,8 +2795,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, allocationId.getId(),
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), config.getRefreshListeners(),
null, config.getTranslogRecoveryRunner());
IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5),
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner());

try {
InternalEngine internalEngine = new InternalEngine(brokenConfig);
Expand All @@ -2810,7 +2808,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
assertVisibleCount(engine, numDocs, false);
}

public void testRecoverFromStoreSetsHistoryUUIDIfNeeded() throws IOException {
public void testHistoryUUIDIsSetIfMissing() throws IOException {
final int numDocs = randomIntBetween(0, 3);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
Expand Down Expand Up @@ -2843,11 +2841,56 @@ public void testRecoverFromStoreSetsHistoryUUIDIfNeeded() throws IOException {
.put(defaultSettings.getSettings())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1)
.build());
engine = createEngine(indexSettings, store, primaryTranslogDir, newMergePolicy(), null);
assertVisibleCount(engine, numDocs, false);

EngineConfig config = engine.config();

EngineConfig newConfig = new EngineConfig(
randomBoolean() ? EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG,
shardId, allocationId.getId(),
threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), false, config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner());
engine = new InternalEngine(newConfig);
if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
engine.recoverFromTranslog();
assertVisibleCount(engine, numDocs, false);
} else {
assertVisibleCount(engine, 0, false);
}
assertThat(engine.getHistoryUUID(), notNullValue());
}

public void testHistoryUUIDCanBeForced() throws IOException {
final int numDocs = randomIntBetween(0, 3);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
Engine.IndexResult index = engine.index(firstIndexRequest);
assertThat(index.getVersion(), equalTo(1L));
}
assertVisibleCount(engine, numDocs);
final String oldHistoryUUID = engine.getHistoryUUID();
engine.close();
EngineConfig config = engine.config();

EngineConfig newConfig = new EngineConfig(
randomBoolean() ? EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG,
shardId, allocationId.getId(),
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), true, config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
config.getRefreshListeners(), null, config.getTranslogRecoveryRunner());
engine = new InternalEngine(newConfig);
if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
engine.recoverFromTranslog();
assertVisibleCount(engine, numDocs, false);
} else {
assertVisibleCount(engine, 0, false);
}
assertThat(engine.getHistoryUUID(), not(equalTo(oldHistoryUUID)));
}

public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException {
AtomicReference<Exception> exception = new AtomicReference<>();
String operation = randomFrom("optimize", "refresh", "flush");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
};
EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, allocationId, threadPool,
indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger),
eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig,
TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null);
engine = new InternalEngine(config);
listeners.setTranslog(engine.getTranslog());
Expand Down
Loading