Skip to content

Commit

Permalink
Internal: IndexService - synchronize close to prevent race condition …
Browse files Browse the repository at this point in the history
…with shard creation

During node shutdown we have a race condition between processing cluster state updates (creating shards) and closing down the index service. This may cause shards to leak and not be closed properly.

This commit removes the concurrency in shard closing as InternalIndexService.removeShard has been synchronized for a long time now.

On the other hand, the commit restores the parallel shutdown of indices lost in 7e1d8a6

Closes #8557
  • Loading branch information
bleskes committed Nov 19, 2014
1 parent 69ac838 commit fb81a32
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
Expand All @@ -30,7 +29,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
Expand Down Expand Up @@ -84,16 +82,12 @@
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ShardsPluginsModule;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -109,8 +103,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde

private final Settings indexSettings;

private final ThreadPool threadPool;

private final PluginsService pluginsService;

private final InternalIndicesLifecycle indicesLifecycle;
Expand Down Expand Up @@ -148,14 +140,13 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private final AtomicBoolean closed = new AtomicBoolean(false);

@Inject
public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool,
public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv,
AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService,
SimilarityService similarityService, IndexAliasesService aliasesService, IndexCache indexCache, IndexEngine indexEngine,
IndexGateway indexGateway, IndexStore indexStore, IndexSettingsService settingsService, IndexFieldDataService indexFieldData,
BitsetFilterCache bitSetFilterCache ) {
super(index, indexSettings);
this.injector = injector;
this.threadPool = threadPool;
this.indexSettings = indexSettings;
this.analysisService = analysisService;
this.mapperService = mapperService;
Expand Down Expand Up @@ -279,37 +270,19 @@ public IndexEngine engine() {
return indexEngine;
}

public void close(final String reason, @Nullable Executor executor, final IndicesService.IndexCloseListener listener) {
if (closed.compareAndSet(false, true)) {
final Set<Integer> shardIds = shardIds();
final CountDownLatch latch = new CountDownLatch(shardIds.size());
final IndicesService.IndexCloseListener innerListener = listener == null ? null :
new PerShardIndexCloseListener(shardIds, listener);
for (final int shardId : shardIds) {
executor = executor == null ? threadPool.generic() : executor;
executor.execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("failed to close shard", t);
}

@Override
public void doRun() {
try {
removeShard(shardId, reason, innerListener);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
logger.debug("Interrupted closing index [{}]", e, index().name());
Thread.currentThread().interrupt();
}
}
public synchronized void close(final String reason, final IndicesService.IndexCloseListener listener) {
if (closed.compareAndSet(false, true)) {
final Set<Integer> shardIds = shardIds();
final IndicesService.IndexCloseListener innerListener = listener == null ? null :
new PerShardIndexCloseListener(shardIds, listener);
for (final int shardId : shardIds) {
try {
removeShard(shardId, reason, innerListener);
} catch (Throwable t) {
logger.warn("failed to close shard", t);
}
}
}
}

@Override
Expand Down
47 changes: 24 additions & 23 deletions src/main/java/org/elasticsearch/indices/InternalIndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.IndexStoreModule;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
Expand All @@ -75,8 +74,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
Expand Down Expand Up @@ -128,22 +129,25 @@ protected void doStop() throws ElasticsearchException {
final CountDownLatch latch = new CountDownLatch(indices.size());

final ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("indices_shutdown"));
final ExecutorService shardsStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("shards_shutdown"));

for (final String index : indices) {
indicesStopExecutor.execute(new Runnable() {
@Override
public void run() {
try {
removeIndex(index, "shutdown", shardsStopExecutor, new IndexCloseListener() {
removeIndex(index, "shutdown", new IndexCloseListener() {
@Override
public void onAllShardsClosed(Index index, List<Throwable> failures) {
latch.countDown();
}

@Override
public void onShardClosed(ShardId shardId) {}
public void onShardClosed(ShardId shardId) {
}

@Override
public void onShardCloseFailed(ShardId shardId, Throwable t) {}
public void onShardCloseFailed(ShardId shardId, Throwable t) {
}
});
} catch (Throwable e) {
latch.countDown();
Expand All @@ -159,7 +163,6 @@ public void onShardCloseFailed(ShardId shardId, Throwable t) {}
} catch (InterruptedException e) {
// ignore
} finally {
shardsStopExecutor.shutdown();
indicesStopExecutor.shutdown();
}
}
Expand Down Expand Up @@ -325,34 +328,32 @@ public synchronized IndexService createIndex(String sIndexName, Settings setting

@Override
public void removeIndex(String index, String reason) throws ElasticsearchException {
removeIndex(index, reason, null, null);
removeIndex(index, reason, null);
}

@Override
public void removeIndex(String index, String reason, @Nullable IndexCloseListener listener) throws ElasticsearchException {
removeIndex(index, reason, null, listener);
}
final IndexService indexService;
final Injector indexInjector;
synchronized (this) {
indexInjector = indicesInjectors.remove(index);
if (indexInjector == null) {
return;
}

private synchronized void removeIndex(String index, String reason, @Nullable Executor executor, @Nullable IndexCloseListener listener) throws ElasticsearchException {
IndexService indexService;
Injector indexInjector = indicesInjectors.remove(index);
if (indexInjector == null) {
return;
logger.debug("[{}] closing ... (reason [{}])", index, reason);
Map<String, IndexService> tmpMap = newHashMap(indices);
indexService = tmpMap.remove(index);
indices = ImmutableMap.copyOf(tmpMap);
}

logger.debug("[{}] closing ... (reason [{}])", index, reason);
Map<String, IndexService> tmpMap = newHashMap(indices);
indexService = tmpMap.remove(index);
indices = ImmutableMap.copyOf(tmpMap);

indicesLifecycle.beforeIndexClosed(indexService);

for (Class<? extends CloseableIndexComponent> closeable : pluginsService.indexServices()) {
indexInjector.getInstance(closeable).close();
}

logger.debug("[{}] closing index service", index, reason);
((InternalIndexService) indexService).close(reason, executor, listener);
((InternalIndexService) indexService).close(reason, listener);

logger.debug("[{}] closing index cache", index, reason);
indexInjector.getInstance(IndexCache.class).close();
Expand Down

0 comments on commit fb81a32

Please sign in to comment.