Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart recovery upon mapping changes during translog replay #11363

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -62,9 +62,10 @@ public ClusterStateObserver(ClusterService clusterService, ESLogger logger) {
/**
* @param clusterService
* @param timeout a global timeout for this observer. After it has expired the observer
* will fail any existing or new #waitForNextChange calls.
* will fail any existing or new #waitForNextChange calls. Set to null
* to wait indefinitely
*/
public ClusterStateObserver(ClusterService clusterService, TimeValue timeout, ESLogger logger) {
public ClusterStateObserver(ClusterService clusterService, @Nullable TimeValue timeout, ESLogger logger) {
this.clusterService = clusterService;
this.lastObservedState = new AtomicReference<>(new ObservedState(clusterService.state()));
this.timeOutValue = timeout;
Expand Down
Expand Up @@ -20,15 +20,10 @@
package org.elasticsearch.index.engine;

import com.google.common.collect.Lists;

import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.*;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -219,7 +214,7 @@ protected void recoverFromTranslog(EngineConfig engineConfig, Translog.TranslogG
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
try {
handler.performRecoveryOperation(this, operation);
handler.performRecoveryOperation(this, operation, true);
opsRecovered++;
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.BAD_REQUEST) {
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -21,7 +21,6 @@

import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;

import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -801,8 +800,8 @@ public void prepareForIndexRecovery() {

/**
* Applies all operations in the iterable to the current engine and returns the number of operations applied.
* This operation will stop applying operations once an opertion failed to apply.
* Note: This method is typically used in peer recovery to replay remote tansaction log entries.
* This operation will stop applying operations once an operation failed to apply.
* Note: This method is typically used in peer recovery to replay remote transaction log entries.
*/
public int performBatchRecovery(Iterable<Translog.Operation> operations) {
if (state != IndexShardState.RECOVERING) {
Expand Down Expand Up @@ -1386,7 +1385,7 @@ public void sync(Translog.Location location) {
* Returns the current translog durability mode
*/
public Translog.Durabilty getTranslogDurability() {
return translogConfig.getDurabilty();
return translogConfig.getDurabilty();
}

private static Translog.Durabilty getFromSettings(ESLogger logger, Settings settings, Translog.Durabilty defaultValue) {
Expand Down
Expand Up @@ -24,12 +24,7 @@
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperAnalyzer;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MapperUtils;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.translog.Translog;

Expand Down Expand Up @@ -62,20 +57,28 @@ protected Tuple<DocumentMapper, Mapping> docMapper(String type) {
return mapperService.documentMapperWithAutoCreate(type); // protected for testing
}

/*
/**
* Applies all operations in the iterable to the current engine and returns the number of operations applied.
* This operation will stop applying operations once an opertion failed to apply.
* This operation will stop applying operations once an operation failed to apply.
*
* Throws a {@link MapperException} to be thrown if a mapping update is encountered.
*/
int performBatchRecovery(Engine engine, Iterable<Translog.Operation> operations) {
int numOps = 0;
for (Translog.Operation operation : operations) {
performRecoveryOperation(engine, operation);
performRecoveryOperation(engine, operation, false);
numOps++;
}
return numOps;
}

private void addMappingUpdate(String type, Mapping update) {
private void maybeAddMappingUpdate(String type, Mapping update, String docId, boolean allowMappingUpdates) {
if (update == null) {
return;
}
if (allowMappingUpdates == false) {
throw new MapperException("mapping updates are not allowed (type: [" + type + "], id: [" + docId + "])");
}
Mapping currentUpdate = recoveredTypes.get(type);
if (currentUpdate == null) {
recoveredTypes.put(type, update);
Expand All @@ -85,10 +88,13 @@ private void addMappingUpdate(String type, Mapping update) {
}

/**
* Performs a single recovery operation, and returns the indexing operation (or null if its not an indexing operation)
* that can then be used for mapping updates (for example) if needed.
* Performs a single recovery operation.
*
* @param allowMappingUpdates true if mapping update should be accepted (but collected). Setting it to false will
* cause a {@link MapperException} to be thrown if an update
* is encountered.
*/
public void performRecoveryOperation(Engine engine, Translog.Operation operation) {
public void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates) {
try {
switch (operation.opType()) {
case CREATE:
Expand All @@ -98,21 +104,17 @@ public void performRecoveryOperation(Engine engine, Translog.Operation operation
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false);
mapperAnalyzer.setType(create.type()); // this is a PITA - once mappings are per index not per type this can go away an we can just simply move this to the engine eventually :)
maybeAddMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate(), engineCreate.id(), allowMappingUpdates);
engine.create(engineCreate);
if (engineCreate.parsedDoc().dynamicMappingsUpdate() != null) {
addMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate());
}
break;
case SAVE:
Translog.Index index = (Translog.Index) operation;
Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), source(index.source()).type(index.type()).id(index.id())
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true);
mapperAnalyzer.setType(index.type());
maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
engine.index(engineIndex);
if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) {
addMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate());
}
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;
Expand Down
Expand Up @@ -26,8 +26,12 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -41,6 +45,7 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -294,13 +299,51 @@ public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportCh
class TranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryTranslogOperationsRequest> {

@Override
public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception {
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel) throws Exception {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
final RecoveryStatus recoveryStatus = statusRef.status();
final RecoveryState.Translog translog = recoveryStatus.state().getTranslog();
translog.totalOperations(request.totalTranslogOps());
assert recoveryStatus.indexShard().recoveryState() == recoveryStatus.state();
recoveryStatus.indexShard().performBatchRecovery(request.operations());
try {
recoveryStatus.indexShard().performBatchRecovery(request.operations());
} catch (MapperException mapperException) {
// in very rare cases a translog replay from primary is processed before a mapping update on this node
// which causes local mapping changes. we want to wait until these mappings are processed.
logger.trace("delaying recovery due to missing mapping changes", mapperException);
// we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
// canceled)
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
try {
messageReceived(request, channel);
} catch (Exception e) {
onFailure(e);
}
}

protected void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
logger.warn("failed to send error back to recovery source", e1);
}
}

@Override
public void onClusterServiceClose() {
onFailure(new ElasticsearchException("cluster service was closed while waiting for mapping updates"));
}

@Override
public void onTimeout(TimeValue timeout) {
// note that we do not use a timeout (see comment above)
onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates (timeout [" + timeout + "])"));
}
});
}
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);

Expand Down