Skip to content

Commit

Permalink
Raise default DeleteIndex Timeout
Browse files Browse the repository at this point in the history
Currently the timeout for an delete index operation is set to 10 seconds.
Yet, if a full flush is running while we delete and index this can
easily exceed 10 seconds. The timeout is not dramatic ie. the index
will be deleted eventually but the client request is not acked which
can cause confusion. We should raise it to prevent unnecessary confusion
especially in client tests where this can happen if the machine is pretty busy.

The new timeout is set to 60 seconds.

Closes #3498
  • Loading branch information
s1monw committed Aug 13, 2013
1 parent 94579fb commit 7749f3e
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 110 deletions.
Expand Up @@ -38,7 +38,7 @@ public class DeleteIndexRequest extends MasterNodeOperationRequest<DeleteIndexRe

private String[] indices;

private TimeValue timeout = timeValueSeconds(10);
private TimeValue timeout = timeValueSeconds(60);

DeleteIndexRequest() {
}
Expand Down
Expand Up @@ -36,7 +36,7 @@ public DeleteIndexRequestBuilder(IndicesAdminClient indicesClient, String... ind

/**
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
* to <tt>60s</tt>.
*/
public DeleteIndexRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
Expand Down
Expand Up @@ -102,7 +102,7 @@ public void run() {
}

private void deleteIndex(final Request request, final Listener userListener, Semaphore mdLock) {
final DeleteIndexListener listener = new DeleteIndexListener(mdLock, request, userListener);
final DeleteIndexListener listener = new DeleteIndexListener(mdLock, userListener);
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {

@Override
Expand Down Expand Up @@ -141,6 +141,7 @@ public ClusterState execute(final ClusterState currentState) {
// add the notifications that the store was deleted from *data* nodes
count += currentState.nodes().dataNodes().size();
final AtomicInteger counter = new AtomicInteger(count);
// this listener will be notified once we get back a notification based on the cluster state change below.
final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() {
@Override
public void onNodeIndexDeleted(String index, String nodeId) {
Expand Down Expand Up @@ -185,13 +186,11 @@ class DeleteIndexListener implements Listener {

private final AtomicBoolean notified = new AtomicBoolean();
private final Semaphore mdLock;
private final Request request;
private final Listener listener;
volatile ScheduledFuture future;
volatile ScheduledFuture<?> future;

private DeleteIndexListener(Semaphore mdLock, Request request, Listener listener) {
private DeleteIndexListener(Semaphore mdLock, Listener listener) {
this.mdLock = mdLock;
this.request = request;
this.listener = listener;
}

Expand Down
Expand Up @@ -177,7 +177,7 @@ public void clusterChanged(ClusterChangedEvent event) {
if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, newMetaData)) {
try {
writeGlobalState("changed", newMetaData, currentMetaData);
} catch (Exception e) {
} catch (Throwable e) {
success = false;
}
}
Expand Down Expand Up @@ -205,7 +205,7 @@ public void clusterChanged(ClusterChangedEvent event) {

try {
writeIndex(writeReason, indexMetaData, currentIndexMetaData);
} catch (Exception e) {
} catch (Throwable e) {
success = false;
}
}
Expand All @@ -228,7 +228,7 @@ public void clusterChanged(ClusterChangedEvent event) {
}
try {
nodeIndexDeletedAction.nodeIndexStoreDeleted(current.index(), event.state().nodes().masterNodeId());
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("[{}] failed to notify master on local index store deletion", e, current.index());
}
}
Expand Down Expand Up @@ -268,7 +268,7 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}
}
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("failed to find dangling indices", e);
}
}
Expand Down Expand Up @@ -306,7 +306,7 @@ public void onFailure(Throwable e) {
logger.info("failed to send allocated dangled", e);
}
});
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("failed to send allocate dangled", e);
}
}
Expand Down Expand Up @@ -337,7 +337,7 @@ private void writeIndex(String reason, IndexMetaData indexMetaData, @Nullable In
builder.flush();

String stateFileName = "state-" + indexMetaData.version();
Exception lastFailure = null;
Throwable lastFailure = null;
boolean wroteAtLeastOnce = false;
for (File indexLocation : nodeEnv.indexLocations(new Index(indexMetaData.index()))) {
File stateLocation = new File(indexLocation, "_state");
Expand All @@ -352,7 +352,7 @@ private void writeIndex(String reason, IndexMetaData indexMetaData, @Nullable In
fos.getChannel().force(true);
fos.close();
wroteAtLeastOnce = true;
} catch (Exception e) {
} catch (Throwable e) {
lastFailure = e;
} finally {
IOUtils.closeWhileHandlingException(fos);
Expand Down Expand Up @@ -396,7 +396,7 @@ private void writeGlobalState(String reason, MetaData metaData, @Nullable MetaDa
builder.flush();

String globalFileName = "global-" + globalMetaData.version();
Exception lastFailure = null;
Throwable lastFailure = null;
boolean wroteAtLeastOnce = false;
for (File dataLocation : nodeEnv.nodeDataLocations()) {
File stateLocation = new File(dataLocation, "_state");
Expand All @@ -411,7 +411,7 @@ private void writeGlobalState(String reason, MetaData metaData, @Nullable MetaDa
fos.getChannel().force(true);
fos.close();
wroteAtLeastOnce = true;
} catch (Exception e) {
} catch (Throwable e) {
lastFailure = e;
} finally {
IOUtils.closeWhileHandlingException(fos);
Expand Down Expand Up @@ -498,7 +498,7 @@ private IndexMetaData loadIndex(String index) {
}
}
}
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("[{}]: failed to read [" + stateFile.getAbsolutePath() + "], ignoring...", e, index);
}
}
Expand Down Expand Up @@ -543,7 +543,7 @@ private MetaData loadGlobalState() {
}
}
}
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to load global state from [{}]", e, stateFile.getAbsolutePath());
}
}
Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.*;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.analysis.AnalysisService;
Expand Down Expand Up @@ -90,8 +89,6 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde

private final Settings indexSettings;

private final NodeEnvironment nodeEnv;

private final ThreadPool threadPool;

private final PluginsService pluginsService;
Expand Down Expand Up @@ -129,14 +126,13 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private volatile boolean closed = false;

@Inject
public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool,
public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, ThreadPool threadPool,
PercolatorService percolatorService, AnalysisService analysisService, MapperService mapperService,
IndexQueryParserService queryParserService, SimilarityService similarityService, IndexAliasesService aliasesService,
IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore, IndexSettingsService settingsService,
IndexFieldDataService indexFieldData) {
super(index, indexSettings);
this.injector = injector;
this.nodeEnv = nodeEnv;
this.threadPool = threadPool;
this.indexSettings = indexSettings;
this.percolatorService = percolatorService;
Expand Down Expand Up @@ -304,6 +300,11 @@ public Injector shardInjectorSafe(int shardId) throws IndexShardMissingException

@Override
public synchronized IndexShard createShard(int sShardId) throws ElasticSearchException {
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
* keep it synced.
*/
if (closed) {
throw new ElasticSearchIllegalStateException("Can't create shard [" + index.name() + "][" + sShardId + "], closed");
}
Expand Down Expand Up @@ -355,98 +356,85 @@ public synchronized IndexShard createShard(int sShardId) throws ElasticSearchExc

@Override
public synchronized void removeShard(int shardId, String reason) throws ElasticSearchException {
Injector shardInjector;
IndexShard indexShard;
synchronized (this) {
Map<Integer, Injector> tmpShardInjectors = newHashMap(shardsInjectors);
shardInjector = tmpShardInjectors.remove(shardId);
if (shardInjector == null) {
return;
}
shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors);

Map<Integer, IndexShard> tmpShardsMap = newHashMap(shards);
indexShard = tmpShardsMap.remove(shardId);
shards = ImmutableMap.copyOf(tmpShardsMap);
final Injector shardInjector;
final IndexShard indexShard;
final ShardId sId = new ShardId(index, shardId);
Map<Integer, Injector> tmpShardInjectors = newHashMap(shardsInjectors);
shardInjector = tmpShardInjectors.remove(shardId);
if (shardInjector == null) {
return;
}

ShardId sId = new ShardId(index, shardId);

shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors);
Map<Integer, IndexShard> tmpShardsMap = newHashMap(shards);
indexShard = tmpShardsMap.remove(shardId);
shards = ImmutableMap.copyOf(tmpShardsMap);
indicesLifecycle.beforeIndexShardClosed(sId, indexShard);

for (Class<? extends CloseableIndexComponent> closeable : pluginsService.shardServices()) {
try {
shardInjector.getInstance(closeable).close();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to clean plugin shard service [{}]", e, closeable);
}
}

try {
// now we can close the translog service, we need to close it before the we close the shard
shardInjector.getInstance(TranslogService.class).close();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to close translog service", e);
// ignore
}

// this logic is tricky, we want to close the engine so we rollback the changes done to it
// and close the shard so no operations are allowed to it
if (indexShard != null) {
try {
((InternalIndexShard) indexShard).close(reason);
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to close index shard", e);
// ignore
}
}
try {
shardInjector.getInstance(Engine.class).close();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to close engine", e);
// ignore
}

try {
shardInjector.getInstance(MergePolicyProvider.class).close();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to close merge policy provider", e);
// ignore
}

try {
shardInjector.getInstance(IndexShardGatewayService.class).snapshotOnClose();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to snapshot index shard gateway on close", e);
// ignore
}

try {
shardInjector.getInstance(IndexShardGatewayService.class).close();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to close index shard gateway", e);
// ignore
}
try {
// now we can close the translog
shardInjector.getInstance(Translog.class).close();
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to close translog", e);
// ignore
}

// call this before we close the store, so we can release resources for it
indicesLifecycle.afterIndexShardClosed(sId);

// if we delete or have no gateway or the store is not persistent, clean the store...
Store store = shardInjector.getInstance(Store.class);
// and close it
try {
store.close();
} catch (Exception e) {
} catch (Throwable e) {
logger.warn("failed to close store on shard deletion", e);
}

Injectors.close(injector);
}
}
Expand Up @@ -547,7 +547,9 @@ public void close(String reason) {
mergeScheduleFuture = null;
}
}
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.CLOSED, reason);
if (logger.isDebugEnabled()) {
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.CLOSED, reason);
}
state = IndexShardState.CLOSED;
}
}
Expand Down
31 changes: 10 additions & 21 deletions src/main/java/org/elasticsearch/indices/InternalIndicesService.java
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.index.*;
import org.elasticsearch.index.aliases.IndexAliasesServiceModule;
Expand Down Expand Up @@ -78,7 +77,6 @@
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.plugins.IndexPluginsModule;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -100,10 +98,6 @@
*/
public class InternalIndicesService extends AbstractLifecycleComponent<IndicesService> implements IndicesService {

private final NodeEnvironment nodeEnv;

private final ThreadPool threadPool;

private final InternalIndicesLifecycle indicesLifecycle;

private final IndicesAnalysisService indicesAnalysisService;
Expand All @@ -121,10 +115,8 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
private final OldShardsStats oldShardsStats = new OldShardsStats();

@Inject
public InternalIndicesService(Settings settings, NodeEnvironment nodeEnv, ThreadPool threadPool, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, IndicesStore indicesStore, Injector injector) {
public InternalIndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, IndicesStore indicesStore, Injector injector) {
super(settings);
this.nodeEnv = nodeEnv;
this.threadPool = threadPool;
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indicesAnalysisService = indicesAnalysisService;
this.indicesStore = indicesStore;
Expand Down Expand Up @@ -396,24 +388,21 @@ public synchronized IndexService createIndex(String sIndexName, Settings setting
}

@Override
public synchronized void removeIndex(String index, String reason) throws ElasticSearchException {
public void removeIndex(String index, String reason) throws ElasticSearchException {
removeIndex(index, reason, null);
}

private void removeIndex(String index, String reason, @Nullable Executor executor) throws ElasticSearchException {
Injector indexInjector;
private synchronized void removeIndex(String index, String reason, @Nullable Executor executor) throws ElasticSearchException {
IndexService indexService;
synchronized (this) {
indexInjector = indicesInjectors.remove(index);
if (indexInjector == null) {
return;
}

Map<String, IndexService> tmpMap = newHashMap(indices);
indexService = tmpMap.remove(index);
indices = ImmutableMap.copyOf(tmpMap);
Injector indexInjector = indicesInjectors.remove(index);
if (indexInjector == null) {
return;
}

Map<String, IndexService> tmpMap = newHashMap(indices);
indexService = tmpMap.remove(index);
indices = ImmutableMap.copyOf(tmpMap);

indicesLifecycle.beforeIndexClosed(indexService);

for (Class<? extends CloseableIndexComponent> closeable : pluginsService.indexServices()) {
Expand Down

0 comments on commit 7749f3e

Please sign in to comment.