Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove IndexCloseListener & Store.OnCloseListener #9009

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 11 additions & 10 deletions src/main/java/org/elasticsearch/env/NodeEnvironment.java
Expand Up @@ -22,11 +22,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.*;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -42,7 +38,8 @@
import java.io.IOException;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -166,10 +163,11 @@ public void deleteShardDirectorySafe(ShardId shardId) throws IOException {
* non of the shards will be deleted
*
* @param index the index to delete
* @param lockTimeoutMS how long to wait for acquiring the indices shard locks
* @throws Exception if any of the shards data directories can't be locked or deleted
*/
public void deleteIndexDirectorySafe(Index index) throws IOException {
final List<ShardLock> locks = lockAllForIndex(index);
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS) throws IOException {
final List<ShardLock> locks = lockAllForIndex(index, lockTimeoutMS);
try {
final Path[] indexPaths = new Path[nodeIndicesPaths.length];
for (int i = 0; i < indexPaths.length; i++) {
Expand All @@ -188,16 +186,19 @@ public void deleteIndexDirectorySafe(Index index) throws IOException {
* an {@link LockObtainFailedException} is thrown and all previously acquired locks are released.
*
* @param index the index to lock shards for
* @param lockTimeoutMS how long to wait for acquiring the indices shard locks
* @return the {@link ShardLock} instances for this index.
* @throws IOException if an IOException occurs.
*/
public List<ShardLock> lockAllForIndex(Index index) throws IOException {
public List<ShardLock> lockAllForIndex(Index index, long lockTimeoutMS) throws IOException {
Set<ShardId> allShardIds = findAllShardIds(index);
List<ShardLock> allLocks = new ArrayList<>();
boolean success = false;
long startTime = System.currentTimeMillis();
try {
for (ShardId shardId : allShardIds) {
allLocks.add(shardLock(shardId));
long timeoutLeft = Math.max(0, lockTimeoutMS - (System.currentTimeMillis() - startTime));
allLocks.add(shardLock(shardId, timeoutLeft));
}
success = true;
} finally {
Expand Down
28 changes: 20 additions & 8 deletions src/main/java/org/elasticsearch/gateway/GatewayMetaState.java
Expand Up @@ -43,7 +43,10 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -72,6 +75,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
private static final String DEPRECATED_SETTING_ROUTING_HASH_FUNCTION = "cluster.routing.operation.hash.type";
private static final String DEPRECATED_SETTING_ROUTING_USE_TYPE = "cluster.routing.operation.use_type";
public static final String GATEWAY_DANGLING_TIMEOUT = "gateway.dangling_timeout";
public static final String GATEWAY_DELETE_TIMEOUT = "gateway.delete_timeout";
public static final String GATEWAY_AUTO_IMPORT_DANGLED = "gateway.auto_import_dangled";
// legacy - this used to be in a different package
private static final String GATEWAY_LOCAL_DANGLING_TIMEOUT = "gateway.local.dangling_timeout";
Expand Down Expand Up @@ -127,6 +131,7 @@ public static AutoImportDangledState fromString(String value) {

private final AutoImportDangledState autoImportDangled;
private final TimeValue danglingTimeout;
private final TimeValue deleteTimeout;
private final Map<String, DanglingIndex> danglingIndices = ConcurrentCollections.newConcurrentMap();
private final Object danglingMutex = new Object();

Expand Down Expand Up @@ -159,8 +164,12 @@ public GatewayMetaState(Settings settings, ThreadPool threadPool, NodeEnvironmen

this.autoImportDangled = AutoImportDangledState.fromString(settings.get(GATEWAY_AUTO_IMPORT_DANGLED, settings.get(GATEWAY_LOCAL_AUTO_IMPORT_DANGLED, AutoImportDangledState.YES.toString())));
this.danglingTimeout = settings.getAsTime(GATEWAY_DANGLING_TIMEOUT, settings.getAsTime(GATEWAY_LOCAL_DANGLING_TIMEOUT, TimeValue.timeValueHours(2)));
this.deleteTimeout = settings.getAsTime(GATEWAY_DELETE_TIMEOUT, TimeValue.timeValueSeconds(30));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did this arrive at 30 seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. The delete index has an ack timeout of 30s (default) and ideally you would like to wait longer than this for the delete not to be acked (we can have a different solution to make sure it's not acked, this is change is meant to small and temporary) but on the other hand not to delay cluster state processing beyond the 30s the master waits to avoid having multiple cluster states in flight. I agree this is not ideal at all but I felt it was good enough as a temporary measure until a proper solution is in place.


logger.debug("using gateway.local.auto_import_dangled [{}], with gateway.dangling_timeout [{}]", this.autoImportDangled, this.danglingTimeout);
logger.debug("using {} [{}], {} [{}], with {} [{}]",
GATEWAY_AUTO_IMPORT_DANGLED, this.autoImportDangled,
GATEWAY_DELETE_TIMEOUT, this.deleteTimeout,
GATEWAY_DANGLING_TIMEOUT, this.danglingTimeout);
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
nodeEnv.ensureAtomicMoveSupported();
}
Expand Down Expand Up @@ -258,7 +267,10 @@ public void clusterChanged(ClusterChangedEvent event) {
try {
final Index idx = new Index(current.index());
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(idx));
nodeEnv.deleteIndexDirectorySafe(idx);
// it may take a couple of seconds for outstanding shard reference
// to release their refs (for example, on going recoveries)
// we are working on a better solution see: https://github.com/elasticsearch/elasticsearch/pull/8608
nodeEnv.deleteIndexDirectorySafe(idx, deleteTimeout.millis());
} catch (LockObtainFailedException ex) {
logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, current.index());
} catch (Exception ex) {
Expand Down Expand Up @@ -302,14 +314,14 @@ public void clusterChanged(ClusterChangedEvent event) {
try {
// the index deletion might not have worked due to shards still being locked
// we have three cases here:
// - we acquired all shards locks here --> we can import the dangeling index
// - we acquired all shards locks here --> we can import the dangling index
// - we failed to acquire the lock --> somebody else uses it - DON'T IMPORT
// - we acquired successfully but the lock list is empty --> no shards present - DON'T IMPORT
// in the last case we should in-fact try to delete the directory since it might be a leftover...
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index);
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, 0);
if (shardLocks.isEmpty()) {
// no shards - try to remove the directory
nodeEnv.deleteIndexDirectorySafe(index);
nodeEnv.deleteIndexDirectorySafe(index, 0);
continue;
}
IOUtils.closeWhileHandlingException(shardLocks);
Expand All @@ -323,7 +335,7 @@ public void clusterChanged(ClusterChangedEvent event) {
} else if (danglingTimeout.millis() == 0) {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
try {
nodeEnv.deleteIndexDirectorySafe(index);
nodeEnv.deleteIndexDirectorySafe(index, 0);
} catch (LockObtainFailedException ex) {
logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, indexName);
} catch (Exception ex) {
Expand Down Expand Up @@ -558,7 +570,7 @@ public void run() {

try {
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index));
nodeEnv.deleteIndexDirectorySafe(index);
nodeEnv.deleteIndexDirectorySafe(index, 0);
} catch (Exception ex) {
logger.debug("failed to delete dangling index", ex);
}
Expand Down
70 changes: 3 additions & 67 deletions src/main/java/org/elasticsearch/index/IndexService.java
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.aliases.IndexAliasesService;
Expand Down Expand Up @@ -74,17 +73,14 @@
import org.elasticsearch.index.translog.TranslogModule;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ShardsPluginsModule;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -241,14 +237,12 @@ public IndexAliasesService aliasesService() {
return aliasesService;
}

public synchronized void close(final String reason, final IndicesService.IndexCloseListener listener) {
public synchronized void close(final String reason) {
if (closed.compareAndSet(false, true)) {
final Set<Integer> shardIds = shardIds();
final IndicesService.IndexCloseListener innerListener = listener == null ? null :
new PerShardIndexCloseListener(shardIds, listener);
for (final int shardId : shardIds) {
try {
removeShard(shardId, reason, innerListener);
removeShard(shardId, reason);
} catch (Throwable t) {
logger.warn("failed to close shard", t);
}
Expand Down Expand Up @@ -350,12 +344,7 @@ public synchronized IndexShard createShard(int sShardId) throws ElasticsearchExc
}
}

public void removeShard(int shardId, String reason) throws ElasticsearchException {
removeShard(shardId, reason, null);
}

public synchronized void removeShard(int shardId, String reason, @Nullable final IndicesService.IndexCloseListener listener) throws ElasticsearchException {
boolean listenerPassed = false;
public synchronized void removeShard(int shardId, String reason) throws ElasticsearchException {
final ShardId sId = new ShardId(index, shardId);
try {
final Injector shardInjector;
Expand Down Expand Up @@ -441,69 +430,16 @@ public synchronized void removeShard(int shardId, String reason, @Nullable final
final Store store = shardInjector.getInstance(Store.class);
// and close it
try {
listenerPassed = true;
if (listener == null) {
store.close();
} else {
store.close(new Store.OnCloseListener() {
@Override
public void onClose(ShardId shardId) {
listener.onShardClosed(shardId);
}
});
}
} catch (Throwable e) {
logger.warn("[{}] failed to close store on shard deletion", e, shardId);
}
Injectors.close(injector);

logger.debug("[{}] closed (reason: [{}])", shardId, reason);
} catch (Throwable t) {
if (listenerPassed == false && listener != null) { // only notify if the listener wasn't passed to the store
listener.onShardCloseFailed(sId, t);
}
throw t;
}

}

private static final class PerShardIndexCloseListener implements IndicesService.IndexCloseListener {
final CountDown countDown;
final List<Throwable> failures;
private final Set<Integer> shardIds;
private final IndicesService.IndexCloseListener listener;

public PerShardIndexCloseListener(Set<Integer> shardIds, IndicesService.IndexCloseListener listener) {
this.shardIds = shardIds;
this.listener = listener;
countDown = new CountDown(shardIds.size());
failures = new CopyOnWriteArrayList<>();
}

@Override
public void onAllShardsClosed(Index index, List<Throwable> failures) {
assert false : "nobody should call this";
}

@Override
public void onShardClosed(ShardId shardId) {
assert countDown.isCountedDown() == false;
assert shardIds.contains(shardId.getId()) : "Unknown shard id";
listener.onShardClosed(shardId);
if (countDown.countDown()) {
listener.onAllShardsClosed(shardId.index(), failures);
}
}

@Override
public void onShardCloseFailed(ShardId shardId, Throwable t) {
assert countDown.isCountedDown() == false;
assert shardIds.contains(shardId.getId()) : "Unknown shard id";
listener.onShardCloseFailed(shardId, t);
failures.add(t);
if (countDown.countDown()) {
listener.onAllShardsClosed(shardId.index(), failures);
}
}
}
}