Skip to content

Commit

Permalink
Recovery: mapping check during phase2 should be done in cluster state…
Browse files Browse the repository at this point in the history
… update task

Before phase2 we check verify that the local mapping is in sync with the cluster state mapping (and send & wait on a master update mapping task if not). This check should be done under a cluster state update task to make sure an incoming cluster state update to do not change things while we check.

Closes #7744
  • Loading branch information
bleskes committed Oct 5, 2014
1 parent 02d5883 commit 18a1f5f
Showing 1 changed file with 49 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand All @@ -43,6 +46,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
Expand All @@ -60,10 +64,7 @@

import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -307,43 +308,69 @@ public void phase2(Translog.Snapshot snapshot) throws ElasticsearchException {
}

private void updateMappingOnMaster() {
IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName());
ImmutableOpenMap<String, MappingMetaData> metaDataMappings = null;
if (indexMetaData != null) {
metaDataMappings = indexMetaData.getMappings();
}
List<DocumentMapper> documentMappersToUpdate = Lists.newArrayList();
// default mapping should not be sent back, it can only be updated by put mapping API, and its
// a full in place replace, we don't want to override a potential update coming it
for (DocumentMapper documentMapper : indexService.mapperService().docMappers(false)) {

MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type());
if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) {
// not on master yet in the right form
documentMappersToUpdate.add(documentMapper);
// we test that the cluster state is in sync with our in memory mapping stored by the mapperService
// we have to do it under the "cluster state update" thread to make sure that one doesn't modify it
// while we're checking
final BlockingQueue<DocumentMapper> documentMappersToUpdate = ConcurrentCollections.newBlockingQueue();
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("recovery_mapping_check", new ProcessedClusterStateUpdateTask() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName());
ImmutableOpenMap<String, MappingMetaData> metaDataMappings = null;
if (indexMetaData != null) {
metaDataMappings = indexMetaData.getMappings();
}
// default mapping should not be sent back, it can only be updated by put mapping API, and its
// a full in place replace, we don't want to override a potential update coming it
for (DocumentMapper documentMapper : indexService.mapperService().docMappers(false)) {

MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type());
if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) {
// not on master yet in the right form
documentMappersToUpdate.add(documentMapper);
}
}
return currentState;
}

@Override
public void onFailure(String source, @Nullable Throwable t) {
logger.error("unexpected error while checking for pending mapping changes", t);
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (documentMappersToUpdate.isEmpty()) {
return;
}
final CountDownLatch countDownLatch = new CountDownLatch(documentMappersToUpdate.size());
final CountDownLatch updatedOnMaster = new CountDownLatch(documentMappersToUpdate.size());
MappingUpdatedAction.MappingUpdateListener listener = new MappingUpdatedAction.MappingUpdateListener() {
@Override
public void onMappingUpdate() {
countDownLatch.countDown();
updatedOnMaster.countDown();
}

@Override
public void onFailure(Throwable t) {
logger.debug("{} recovery to {}: failed to update mapping on master", request.shardId(), request.targetNode(), t);
countDownLatch.countDown();
updatedOnMaster.countDown();
}
};
for (DocumentMapper documentMapper : documentMappersToUpdate) {
mappingUpdatedAction.updateMappingOnMaster(indexService.index().getName(), documentMapper, indexService.indexUUID(), listener);
}
try {
if (!countDownLatch.await(internalActionTimeout.millis(), TimeUnit.MILLISECONDS)) {
if (!updatedOnMaster.await(internalActionTimeout.millis(), TimeUnit.MILLISECONDS)) {
logger.debug("{} recovery [phase2] to {}: waiting on pending mapping update timed out. waited [{}]", request.shardId(), request.targetNode(), internalActionTimeout);
}
} catch (InterruptedException e) {
Expand Down

0 comments on commit 18a1f5f

Please sign in to comment.