Skip to content

Commit

Permalink
Mapping update task back references already closed index shard
Browse files Browse the repository at this point in the history
In the ShardRecoveryHandler we issue cluster update tasks to update the
mapping. The annonymous inner class backreferences the ShardRecoveryHandler
which holds a potentially large IndexShard object (which references buffers & caches etc)
If the queue of update tasks piles up and recoveries get cancled and/or shards are closed
the ShardRecoveryHandler can't be GCed. This commit moves the update task into a static
inner class to allos the GC to do its job.
  • Loading branch information
s1monw committed Feb 9, 2015
1 parent 1167bee commit b3b1a11
Showing 1 changed file with 70 additions and 43 deletions.
Expand Up @@ -482,50 +482,9 @@ private void updateMappingOnMaster() {
final BlockingQueue<DocumentMapper> documentMappersToUpdate = ConcurrentCollections.newBlockingQueue();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> mappingCheckException = new AtomicReference<>();
// we use immediate as this is a very light weight check and we don't wait to delay recovery
clusterService.submitStateUpdateTask("recovery_mapping_check", Priority.IMMEDIATE, new TimeoutClusterStateUpdateTask() {

@Override
public boolean runOnlyOnMaster() {
return false;
}

@Override
public TimeValue timeout() {
return recoverySettings.internalActionTimeout();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName());
ImmutableOpenMap<String, MappingMetaData> metaDataMappings = null;
if (indexMetaData != null) {
metaDataMappings = indexMetaData.getMappings();
}
// default mapping should not be sent back, it can only be updated by put mapping API, and its
// a full in place replace, we don't want to override a potential update coming it
for (DocumentMapper documentMapper : indexService.mapperService().docMappers(false)) {

MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type());
if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) {
// not on master yet in the right form
documentMappersToUpdate.add(documentMapper);
}
}
return currentState;
}

@Override
public void onFailure(String source, Throwable t) {
mappingCheckException.set(t);
latch.countDown();
}
});
// we use immediate as this is a very light weight check and we don't wait to delay recovery
clusterService.submitStateUpdateTask("recovery_mapping_check", Priority.IMMEDIATE, new MappingUpdateTask(clusterService, indexService, recoverySettings, latch, documentMappersToUpdate, mappingCheckException, this.cancellableThreads));
cancellableThreads.execute(new Interruptable() {
@Override
public void run() throws InterruptedException {
Expand Down Expand Up @@ -659,4 +618,72 @@ public String toString() {
", targetNode=" + request.targetNode() +
'}';
}

// this is a static class since we are holding an instance to the IndexShard
// on ShardRecoveryHandler which can not be GCed if the recovery is canceled
// but this task is still stuck in the queue. This can be problematic if the
// queue piles up and recoveries fail and can lead to OOM or memory pressure if lots of shards
// are created and removed.
private static class MappingUpdateTask extends TimeoutClusterStateUpdateTask {
private final CountDownLatch latch;
private final BlockingQueue<DocumentMapper> documentMappersToUpdate;
private final AtomicReference<Throwable> mappingCheckException;
private final CancellableThreads cancellableThreads;
private ClusterService clusterService;
private IndexService indexService;
private RecoverySettings recoverySettings;

public MappingUpdateTask(ClusterService clusterService, IndexService indexService, RecoverySettings recoverySettings, CountDownLatch latch, BlockingQueue<DocumentMapper> documentMappersToUpdate, AtomicReference<Throwable> mappingCheckException, CancellableThreads cancellableThreads) {
this.latch = latch;
this.documentMappersToUpdate = documentMappersToUpdate;
this.mappingCheckException = mappingCheckException;
this.clusterService = clusterService;
this.indexService = indexService;
this.recoverySettings = recoverySettings;
this.cancellableThreads = cancellableThreads;
}

@Override
public boolean runOnlyOnMaster() {
return false;
}

@Override
public TimeValue timeout() {
return recoverySettings.internalActionTimeout();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
if (cancellableThreads.isCancelled() == false) { // no need to run this if recovery is canceled
IndexMetaData indexMetaData = clusterService.state().metaData().getIndices().get(indexService.index().getName());
ImmutableOpenMap<String, MappingMetaData> metaDataMappings = null;
if (indexMetaData != null) {
metaDataMappings = indexMetaData.getMappings();
}
// default mapping should not be sent back, it can only be updated by put mapping API, and its
// a full in place replace, we don't want to override a potential update coming into it
for (DocumentMapper documentMapper : indexService.mapperService().docMappers(false)) {

MappingMetaData mappingMetaData = metaDataMappings == null ? null : metaDataMappings.get(documentMapper.type());
if (mappingMetaData == null || !documentMapper.refreshSource().equals(mappingMetaData.source())) {
// not on master yet in the right form
documentMappersToUpdate.add(documentMapper);
}
}
}
return currentState;
}

@Override
public void onFailure(String source, Throwable t) {
mappingCheckException.set(t);
latch.countDown();
}
}
}

0 comments on commit b3b1a11

Please sign in to comment.