Skip to content

Commit

Permalink
Recovery: restart recovery upon mapping changes during translog replay
Browse files Browse the repository at this point in the history
In rare occasion, the translog replay phase of recovery may require mapping changes on the target shard. This can happen where indexing on the primary introduces new mappings while the recovery is in phase1. If the source node processes the new mapping from the master, allowing the indexing to proceed, before the target node does and the recovery moves to the phase 2 (translog replay) before as well, the translog operations arriving on the target node may miss the mapping changes. Since this is extremely rare, we opt for a simple fix and simply restart the recovery. Note that in the case the file copy phase will likely be very short as the files are already in sync.

Restarting recoveries in such a late phase means we may need to copy segment_N files and/or files that were quickly merged away on the target again. This annoys the write-once protection in our testing infra. To work around it I have introduces a counter in the termpoary file name prefix used by the recovery code.

**** THERE IS STILL AN ONGOING ISSUE ***: Lucene will try to write the same segment_N file (which was cleaned by the recovery code) twice triggering test failures.

 Closes elastic#11281
  • Loading branch information
bleskes committed May 27, 2015
1 parent 7451b47 commit 81a639a
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 33 deletions.
Expand Up @@ -20,15 +20,10 @@
package org.elasticsearch.index.engine;

import com.google.common.collect.Lists;

import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.*;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -219,7 +214,7 @@ protected void recoverFromTranslog(EngineConfig engineConfig, Translog.TranslogG
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
try {
handler.performRecoveryOperation(this, operation);
handler.performRecoveryOperation(this, operation, true);
opsRecovered++;
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.BAD_REQUEST) {
Expand Down
16 changes: 12 additions & 4 deletions src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -21,7 +21,6 @@

import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;

import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -107,6 +106,7 @@
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.recovery.DelayRecoveryException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat;
import org.elasticsearch.search.suggest.completion.CompletionStats;
Expand Down Expand Up @@ -801,14 +801,22 @@ public void prepareForIndexRecovery() {

/**
* Applies all operations in the iterable to the current engine and returns the number of operations applied.
* This operation will stop applying operations once an opertion failed to apply.
* Note: This method is typically used in peer recovery to replay remote tansaction log entries.
* This operation will stop applying operations once an operation failed to apply.
* Note: This method is typically used in peer recovery to replay remote transaction log entries.
*/
public int performBatchRecovery(Iterable<Translog.Operation> operations) {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
return engineConfig.getTranslogRecoveryPerformer().performBatchRecovery(engine(), operations);
final TranslogRecoveryPerformer performer = engineConfig.getTranslogRecoveryPerformer();
try {
return performer.performBatchRecovery(engine(), operations, false);
} catch (MapperException e) {
// in very rare cases a translog replay from primary is processed before a mapping update on this node
// which causes local mapping changes. we want to wait until these mappings are processed.
logger.trace("delaying recovery due to missing mapping changes", e);
throw new DelayRecoveryException("missing mapping changes", e);
}
}

/**
Expand Down
Expand Up @@ -24,12 +24,7 @@
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperAnalyzer;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MapperUtils;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.translog.Translog;

Expand Down Expand Up @@ -62,20 +57,30 @@ protected Tuple<DocumentMapper, Mapping> docMapper(String type) {
return mapperService.documentMapperWithAutoCreate(type); // protected for testing
}

/*
/**
* Applies all operations in the iterable to the current engine and returns the number of operations applied.
* This operation will stop applying operations once an opertion failed to apply.
* This operation will stop applying operations once an operation failed to apply.
*
* @param allowMappingUpdates true if mapping update should be accepted (but collected). Setting it to false will
* cause a {@link MapperException} to be thrown if an update
* is encountered.
*/
int performBatchRecovery(Engine engine, Iterable<Translog.Operation> operations) {
int performBatchRecovery(Engine engine, Iterable<Translog.Operation> operations, boolean allowMappingUpdates) {
int numOps = 0;
for (Translog.Operation operation : operations) {
performRecoveryOperation(engine, operation);
performRecoveryOperation(engine, operation, allowMappingUpdates);
numOps++;
}
return numOps;
}

private void addMappingUpdate(String type, Mapping update) {
private void maybeAddMappingUpdate(String type, Mapping update, String docId, boolean allowMappingUpdates) {
if (update == null) {
return;
}
if (allowMappingUpdates == false) {
throw new MapperException("mapping updates are not allowed (type: [" + type + "], id: [" + docId + "])");
}
Mapping currentUpdate = recoveredTypes.get(type);
if (currentUpdate == null) {
recoveredTypes.put(type, update);
Expand All @@ -85,10 +90,13 @@ private void addMappingUpdate(String type, Mapping update) {
}

/**
* Performs a single recovery operation, and returns the indexing operation (or null if its not an indexing operation)
* that can then be used for mapping updates (for example) if needed.
* Performs a single recovery operation.
*
* @param allowMappingUpdates true if mapping update should be accepted (but collected). Setting it to false will
* cause a {@link MapperException} to be thrown if an update
* is encountered.
*/
public void performRecoveryOperation(Engine engine, Translog.Operation operation) {
public void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates) {
try {
switch (operation.opType()) {
case CREATE:
Expand All @@ -98,21 +106,17 @@ public void performRecoveryOperation(Engine engine, Translog.Operation operation
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false);
mapperAnalyzer.setType(create.type()); // this is a PITA - once mappings are per index not per type this can go away an we can just simply move this to the engine eventually :)
maybeAddMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate(), engineCreate.id(), allowMappingUpdates);
engine.create(engineCreate);
if (engineCreate.parsedDoc().dynamicMappingsUpdate() != null) {
addMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate());
}
break;
case SAVE:
Translog.Index index = (Translog.Index) operation;
Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), source(index.source()).type(index.type()).id(index.id())
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true);
mapperAnalyzer.setType(index.type());
maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
engine.index(engineIndex);
if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) {
addMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate());
}
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;
Expand Down
Expand Up @@ -39,6 +39,7 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -58,7 +59,6 @@ public class RecoveryStatus extends AbstractRefCounted {
private final long recoveryId;
private final IndexShard indexShard;
private final DiscoveryNode sourceNode;
private final String tempFilePrefix;
private final Store store;
private final RecoveryTarget.RecoveryListener listener;

Expand All @@ -72,6 +72,11 @@ public class RecoveryStatus extends AbstractRefCounted {
// last time this status was accessed
private volatile long lastAccessTime = System.nanoTime();


// a counter which is incremented with each recovery attempt to make sure file names are unique
private final AtomicInteger attempt = new AtomicInteger();
private String tempFilePrefix;

public RecoveryStatus(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTarget.RecoveryListener listener) {

super("recovery_status");
Expand All @@ -81,11 +86,15 @@ public RecoveryStatus(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryT
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.shardId = indexShard.shardId();
this.tempFilePrefix = RECOVERY_PREFIX + indexShard.recoveryState().getTimer().startTime() + ".";
this.store = indexShard.store();
// make sure the store is not released until we are done.
store.incRef();
indexShard.recoveryStats().incCurrentAsTarget();
refreshTempFilesPrefix();
}

protected void refreshTempFilesPrefix() {
this.tempFilePrefix = RECOVERY_PREFIX + this.indexShard.recoveryState().getTimer().startTime() + "." + attempt.getAndIncrement() + ".";
}

private final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();
Expand Down Expand Up @@ -236,6 +245,8 @@ public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetaData meta
public void resetRecovery() throws IOException {
cleanOpenFiles();
indexShard().performRecoveryRestart();
// ensure that all future temp files will have unique names
refreshTempFilesPrefix();
}

@Override
Expand Down

0 comments on commit 81a639a

Please sign in to comment.