Skip to content

Commit

Permalink
Recovery: fix recovered translog ops stat counting when retrying a batch
Browse files Browse the repository at this point in the history
elastic#11363 introduced a retry logic for the case where we have to wait on a mapping update during the translog replay phase of recovery. The retry throws or recovery stats off as it may count ops twice.
  • Loading branch information
bleskes committed Jun 8, 2015
1 parent 8f2dc10 commit 10adb71
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 13 deletions.
Expand Up @@ -1327,7 +1327,7 @@ private Tuple<DocumentMapper, Mapping> docMapper(String type) {
}

private final EngineConfig newEngineConfig(TranslogConfig translogConfig) {
final TranslogRecoveryPerformer translogRecoveryPerformer = new TranslogRecoveryPerformer(mapperService, mapperAnalyzer, queryParserService, indexAliasesService, indexCache) {
final TranslogRecoveryPerformer translogRecoveryPerformer = new TranslogRecoveryPerformer(shardId, mapperService, mapperAnalyzer, queryParserService, indexAliasesService, indexCache) {
@Override
protected void operationProcessed() {
assert recoveryState != null;
Expand Down
Expand Up @@ -55,8 +55,10 @@ public class TranslogRecoveryPerformer {
private final IndexCache indexCache;
private final MapperAnalyzer mapperAnalyzer;
private final Map<String, Mapping> recoveredTypes = new HashMap<>();
private final ShardId shardId;

protected TranslogRecoveryPerformer(MapperService mapperService, MapperAnalyzer mapperAnalyzer, IndexQueryParserService queryParserService, IndexAliasesService indexAliasesService, IndexCache indexCache) {
protected TranslogRecoveryPerformer(ShardId shardId, MapperService mapperService, MapperAnalyzer mapperAnalyzer, IndexQueryParserService queryParserService, IndexAliasesService indexAliasesService, IndexCache indexCache) {
this.shardId = shardId;
this.mapperService = mapperService;
this.queryParserService = queryParserService;
this.indexAliasesService = indexAliasesService;
Expand All @@ -76,13 +78,33 @@ protected Tuple<DocumentMapper, Mapping> docMapper(String type) {
*/
int performBatchRecovery(Engine engine, Iterable<Translog.Operation> operations) {
int numOps = 0;
for (Translog.Operation operation : operations) {
performRecoveryOperation(engine, operation, false);
numOps++;
try {
for (Translog.Operation operation : operations) {
performRecoveryOperation(engine, operation, false);
numOps++;
}
} catch (Throwable t) {
throw new BatchOperationException(shardId, "failed to apply batch translog operation [" + t.getMessage() + "]", numOps, t);
}
return numOps;
}

public static class BatchOperationException extends IndexShardException {

private final int completedOperations;

public BatchOperationException(ShardId shardId, String msg, int completedOperations, Throwable cause) {
super(shardId, msg, cause);
this.completedOperations = completedOperations;
}


/** the number of succesful operations performed before the exception was thrown */
public int completedOperations() {
return completedOperations;
}
}

private void maybeAddMappingUpdate(String type, Mapping update, String docId, boolean allowMappingUpdates) {
if (update == null) {
return;
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -506,6 +505,13 @@ public synchronized void incrementRecoveredOperations() {
assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]";
}

public synchronized void decrementRecoveredOperations(int ops) {
recovered -= ops;
assert recovered >= 0 : "recovered operations must be non-negative. Because [" + recovered + "] after decrementing [" + ops + "]";
assert total == UNKNOWN || total >= recovered : "total, if known, should be > recovered. total [" + total + "], recovered [" + recovered + "]";
}


/**
* returns the total number of translog operations recovered so far
*/
Expand Down
Expand Up @@ -47,10 +47,7 @@
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesLifecycle;
Expand Down Expand Up @@ -308,10 +305,14 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin
assert recoveryStatus.indexShard().recoveryState() == recoveryStatus.state();
try {
recoveryStatus.indexShard().performBatchRecovery(request.operations());
} catch (MapperException mapperException) {
} catch (TranslogRecoveryPerformer.BatchOperationException exception) {
if (ExceptionsHelper.unwrapCause(exception) instanceof MapperException == false) {
throw exception;
}
// in very rare cases a translog replay from primary is processed before a mapping update on this node
// which causes local mapping changes. we want to wait until these mappings are processed.
logger.trace("delaying recovery due to missing mapping changes", mapperException);
logger.trace("delaying recovery due to missing mapping changes (rolling back stats for [{}] ops)", exception, exception.completedOperations());
translog.decrementRecoveredOperations(exception.completedOperations());
// we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
// canceled)
observer.waitForNextChange(new ClusterStateObserver.Listener() {
Expand Down
Expand Up @@ -1820,7 +1820,7 @@ public static class TranslogHandler extends TranslogRecoveryPerformer {
public final AtomicInteger recoveredOps = new AtomicInteger(0);

public TranslogHandler(String indexName) {
super(null, new MapperAnalyzer(null), null, null, null);
super(new ShardId("test", 0), null, new MapperAnalyzer(null), null, null, null);
Settings settings = Settings.settingsBuilder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder("test");
Index index = new Index(indexName);
Expand Down
Expand Up @@ -389,6 +389,10 @@ Translog createObj() {
for (int j = iterationOps; j > 0; j--) {
ops++;
translog.incrementRecoveredOperations();
if (randomBoolean()) {
translog.decrementRecoveredOperations(1);
translog.incrementRecoveredOperations();
}
}
assertThat(translog.recoveredOperations(), equalTo(ops));
assertThat(translog.totalOperations(), equalTo(totalOps));
Expand Down

0 comments on commit 10adb71

Please sign in to comment.