Introduce shard level locks to prevent concurrent shard modifications #8436

Merged
merged 1 commit into from Nov 16, 2014

Projects

None yet

5 participants

@s1monw
Contributor
s1monw commented Nov 11, 2014

Today it's possible that the data directory for a single shard is used by more than on
IndexShard->Store instances. While one shard is already closed but has a concurrent recovery
running and a new shard is creating it's engine files can conflict and data can potentially
be lost. We also remove shards data without checking if there are still users of the files
or if files are still open which can cause pending writes / flushes or the delete operation
to fail. If the latter is the case the index might be treated as a dangeling index and is brought
back to life at a later point in time.

This commit introduces a shard level lock that prevents modifications to the shard data
while it's still in use. Locks are created per shard and maintained in NodeEnvironment.java.
In contrast to most java concurrency primitives those locks are not reentrant.

This commit also adds infrastructure that checks if all shard locks are released after tests.

@dakrone dakrone commented on an outdated diff Nov 11, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ final Path[] paths = shardPaths(shardId);
+ try (Closeable lock = shardLock(shardId, 0)) {
+ IOUtils.rm(paths);
+ }
+ }
+
+ /**
+ * Deletes an indexes data directory recursively iff all of the indexes
+ * shards locks were successfully acquired. If any of the indexes shard directories can't be locked
+ * or deleted the top level index directory will not be deleted either. Instead all other shards sub-directories
+ * are deleted ie. the index directory is partially deleted.
+ *
+ * @param index the index to delete
+ * @throws Exception if any of the shards data directories can't be locked or deleted
+ */
+ public void deleteIndexDirecotrySafe(Index index) throws Exception {
@dakrone
dakrone Nov 11, 2014 Member

"Directory" is misspelled as "Direcotry" here

@dakrone dakrone commented on an outdated diff Nov 11, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ try (Closeable lock = shardLock(shardId, 0)) {
+ IOUtils.rm(paths);
+ }
+ }
+
+ /**
+ * Deletes an indexes data directory recursively iff all of the indexes
+ * shards locks were successfully acquired. If any of the indexes shard directories can't be locked
+ * or deleted the top level index directory will not be deleted either. Instead all other shards sub-directories
+ * are deleted ie. the index directory is partially deleted.
+ *
+ * @param index the index to delete
+ * @throws Exception if any of the shards data directories can't be locked or deleted
+ */
+ public void deleteIndexDirecotrySafe(Index index) throws Exception {
+ List<Closeable> locks = new ArrayList<>();
@dakrone
dakrone Nov 11, 2014 Member

I think enforcing this as a List of ShardLocks would be better, type safety wise

@dakrone dakrone and 1 other commented on an outdated diff Nov 11, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ Closeable lock = shardLock(shardId, 0);
+ locks.add(lock);
+ IOUtils.rm(paths);
+ } catch (Exception ex) {
+ exceptions.add(ex);
+ }
+ }
+ ExceptionsHelper.rethrowAndSuppress(exceptions);
+ success = true;
+ } finally {
+ try {
+ if (success) {
+ IOUtils.rm(FileSystemUtils.toPaths(indexLocations(index)));
+ }
+ } finally {
+ IOUtils.closeWhileHandlingException(locks);
@dakrone
dakrone Nov 11, 2014 Member

Is it okay that the locks will still be cleared even if there was an exception removing the paths?

@s1monw
s1monw Nov 11, 2014 Contributor

yes

@dakrone dakrone commented on the diff Nov 11, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ return allLocks;
+ }
+
+ /**
+ * Tries to lock the given shards ID. A shard lock is required to perform any kind of
+ * write operation on a shards data directory like deleting files, creating a new index writer
+ * or recover from a different shard instance into it. If the shard lock can not be acquired
+ * an {@link ElasticsearchIllegalStateException} is thrown.
+ *
+ * Note: this method will return immediately if the lock can't be acquired.
+ *
+ * @param id the shard ID to lock
+ * @return the shard lock. Call {@link ShardLock#close()} to release the lock
+ * @throws IOException if an IOException occurs.
+ */
+ public ShardLock shardLock(ShardId id) throws IOException {
@dakrone
dakrone Nov 11, 2014 Member

You defined this method as a helper, but are calling shardLock(id, 0) manually all throughout the code still, it would be good to change all of those instances to shardLock(id)

@s1monw
s1monw Nov 11, 2014 Contributor

fixed

@dakrone dakrone commented on an outdated diff Nov 11, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ * @throws IOException if an IOException occurs.
+ */
+ public ShardLock shardLock(ShardId id, long lockTimeout) throws IOException {
+ final Lock[] locks = new Lock[nodePaths.length];
+ boolean success = false;
+ try {
+ final String lockName = "shard_" + id.id() + ".lock";
+ for (int i = 0; i < nodePaths.length; i++) {
+ final Path path = nodePaths[i].resolve(Paths.get(SHARD_LOCK_DIR, id.index().name()));
+ if (Files.exists(path) == false) {
+ Files.createDirectories(path);
+ }
+ NativeFSLockFactory lockFactory = new NativeFSLockFactory(path);
+ Lock lock = lockFactory.makeLock(lockName);
+ if (lock.obtain(lockTimeout) == false) {
+ throw new ElasticsearchIllegalStateException("Lock timed out after ["+ lockTimeout + "ms]");
@dakrone
dakrone Nov 11, 2014 Member

This exception doesn't make sense, because the actual timeout exception will be a LockObtainFailedException that is thrown and not caught. The boolean is only returned if the FileChannel couldn't be locked

@dakrone dakrone commented on an outdated diff Nov 11, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
@@ -180,6 +366,19 @@ public boolean hasNodeFile() {
return shardLocations;
}
+ /**
+ * Returns all data paths for the given shards ID
+ */
+ public Path[] shardPaths(ShardId shardId) {
+ // TODO remove shardLocations in favor of the Path API
@dakrone
dakrone Nov 11, 2014 Member

Can you clarify this? shardLocations is already a Path[], is there a different Path API you want to use?

@dakrone dakrone commented on an outdated diff Nov 11, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
@@ -200,13 +399,13 @@ public boolean hasNodeFile() {
return indices;
}
- public Set<ShardId> findAllShardIds() throws Exception {
+ public Set<ShardId> findAllShardIds(final Index index) throws IOException {
@dakrone
dakrone Nov 11, 2014 Member

Can you add javadocs explaining that a null index means all indices and mark index as @Nullable for this method

@dakrone dakrone commented on an outdated diff Nov 11, 2014
...elasticsearch/index/service/InternalIndexService.java
@@ -138,7 +143,7 @@ public InternalIndexService(Injector injector, Index index, @IndexSettings Setti
AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService,
SimilarityService similarityService, IndexAliasesService aliasesService, IndexCache indexCache, IndexEngine indexEngine,
IndexGateway indexGateway, IndexStore indexStore, IndexSettingsService settingsService, IndexFieldDataService indexFieldData,
- BitsetFilterCache bitSetFilterCache) {
+ BitsetFilterCache bitSetFilterCache, NodeEnvironment env) {
@dakrone
dakrone Nov 11, 2014 Member

NodeEnvironment is already injected in this constructor, as nodeEnv (it's in the top line)

@dakrone dakrone commented on an outdated diff Nov 11, 2014
...ava/org/elasticsearch/index/service/IndexService.java
/**
* Removes the shard, does not delete local data or the gateway.
*/
- void removeShard(int shardId, String reason) throws ElasticsearchException;
+ void removeShard(int shardId, String reason, Store.OnCloseListener listener) throws ElasticsearchException;
@dakrone
dakrone Nov 11, 2014 Member

Can you mark this as @Nullable (here and in the implementer) since it's called with null

@dakrone dakrone commented on the diff Nov 11, 2014
...a/org/elasticsearch/index/store/DirectoryService.java
- long throttleTimeInNanos();
+ public abstract Directory[] build() throws IOException;
+
+ public abstract long throttleTimeInNanos();
@dakrone
dakrone Nov 11, 2014 Member

Can you add javadocs for what this throttling corresponds to? (ie, merge throttling? recovery throttling?)

@s1monw
s1monw Nov 11, 2014 Contributor

I have no idea - out of scope

@dakrone dakrone commented on an outdated diff Nov 11, 2014
src/main/java/org/elasticsearch/index/store/Store.java
} catch (NoSuchFileException | FileNotFoundException e) {
// ignore
} catch (IOException e) {
lastException = e;
}
- }
- if (lastException != null) {
- throw lastException;
+ if (lastException != null) {
+ throw lastException;
+ }
@dakrone
dakrone Nov 11, 2014 Member

By moving this inside the for loop it throws after the first IOException, so it would be better to throw inside the catch block (unless moving it inside was not intended)

@dakrone dakrone and 1 other commented on an outdated diff Nov 11, 2014
src/main/java/org/elasticsearch/index/store/Store.java
@@ -348,6 +358,15 @@ private void closeInternal() {
directory.innerClose(); // this closes the distributorDirectory as well
} catch (IOException e) {
logger.debug("failed to close directory", e);
+ } finally {
+ try {
+ if (onClose != null) { // we are still holding the shard lock while we execute this.
+ onClose.onClose(shardId);
@dakrone
dakrone Nov 11, 2014 Member

What should we do if an exception is thrown here? Is it safe to log it and continue?

@s1monw
s1monw Nov 11, 2014 Contributor

yeah sure we need to unlock here I don't care if there is an exception here to be honest but logging is ok

@dakrone dakrone commented on an outdated diff Nov 11, 2014
...earch/indices/cluster/IndicesClusterStateService.java
@@ -254,7 +260,26 @@ private void applyDeletedIndices(final ClusterChangedEvent event) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index, no longer part of the metadata", index);
}
- removeIndex(index, "index no longer part of the metadata");
+ removeIndex(index, "index no longer part of the metadata", new Store.OnCloseListener() {
+ @Override
+ public void onClose(ShardId shardId) {
+ // this is executed while we still hold the lock on that shard
+ try {
+ File[] files = nodeEnvironment.shardLocations(shardId);
@dakrone
dakrone Nov 11, 2014 Member

Can this use nodeEnvironment.shardPaths(shardId) instead? Then it doesn't need to use toPaths

@dakrone dakrone commented on an outdated diff Nov 11, 2014
...earch/indices/cluster/IndicesClusterStateService.java
+ removeIndex(index, "index no longer part of the metadata", new Store.OnCloseListener() {
+ @Override
+ public void onClose(ShardId shardId) {
+ // this is executed while we still hold the lock on that shard
+ try {
+ File[] files = nodeEnvironment.shardLocations(shardId);
+ IOUtils.rm(FileSystemUtils.toPaths(files));
+ logger.debug("deleted shard [{}] from filesystem", shardId);
+ } catch (IOException e) {
+ logger.warn("Can't delete shard: " + shardId, e);
+ }
+ try {
+ nodeEnvironment.deleteIndexDirecotrySafe(shardId.index());
+ logger.debug("deleted index [{}] from filesystem", shardId.index());
+ } catch (Exception e) {
+ // ignore - still some shards locked here
@dakrone
dakrone Nov 11, 2014 Member

Should there be debug or trace logging here for debugging purposes at least?

@dakrone dakrone commented on an outdated diff Nov 11, 2014
.../org/elasticsearch/index/store/CorruptedFileTest.java
@@ -69,6 +69,7 @@
import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.test.junit.annotations.TestLogging;
@dakrone
dakrone Nov 11, 2014 Member

Unneeded import

@s1monw
Contributor
s1monw commented Nov 11, 2014

pushed more changes @dakrone

@kimchy
Member
kimchy commented Nov 11, 2014

@s1monw quick question regarding the path of the lock file, does it make sense to have it encapsulated in the shard location itself (similar to write.lock in Lucene)? I think its nicer since then there is a single place that holds the shard data.

Update: I think I get why its a different directory, the ability to delete while holding the lock, if thats the case, then it makes sense. On crappy internet, will try and complete the review later tonight....

@kimchy kimchy commented on the diff Nov 11, 2014
...h/gateway/local/state/meta/LocalGatewayMetaState.java
@@ -246,7 +245,7 @@ public void clusterChanged(ClusterChangedEvent event) {
logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keys());
if (nodeEnv.hasNodeFile()) {
try {
- IOUtils.rm(FileSystemUtils.toPaths(nodeEnv.indexLocations(new Index(current.index()))));
+ nodeEnv.deleteIndexDirectorySafe(new Index(current.index()));
@kimchy
kimchy Nov 11, 2014 Member

should we warn on this failure now? because of we have a bug and we can't obtain the lock, we won't delete anything, which is a different semantics than deleting the files and letting the OS handle dangling open file handles? This in theory should not happen because a lock is there

@s1monw
s1monw Nov 12, 2014 Contributor

yeah I made it a warn... btw. I think it's a preexisting but ie on windows you might have this behaviour already since somebody can hold a ref to the files preventing the deletion. I wonder if we should actually schedule a deletion task or use a list of pending deletions that are executed before we apply new clusterstates and before we import dangeling indeices?

@kimchy kimchy commented on the diff Nov 11, 2014
...h/gateway/local/state/meta/LocalGatewayMetaState.java
@@ -287,11 +286,17 @@ public void clusterChanged(ClusterChangedEvent event) {
if (danglingTimeout.millis() == 0) {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
try {
- IOUtils.rm(FileSystemUtils.toPaths(nodeEnv.indexLocations(new Index(indexName))));
+ nodeEnv.deleteIndexDirectorySafe(new Index(indexName));
@kimchy
kimchy Nov 11, 2014 Member

same here regarding the logging as above if applicable

@kimchy kimchy commented on an outdated diff Nov 11, 2014
...ava/org/elasticsearch/index/service/IndexService.java
@@ -38,6 +38,7 @@
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
+import org.elasticsearch.index.store.Store;
@kimchy
kimchy Nov 11, 2014 Member

do we need this import?

@kimchy kimchy commented on an outdated diff Nov 11, 2014
src/main/java/org/elasticsearch/index/store/Store.java
if (isClosed.compareAndSet(false, true)) {
+ this.onClose = onClose;
@kimchy
kimchy Nov 11, 2014 Member

can we nullify the local onClose variable in the finally clause of closeInternal and assert its null here? the volatile variable is just a way to pass the data along because of the use of the ref counter, right?

@bleskes bleskes commented on an outdated diff Nov 11, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
*/
public class NodeEnvironment extends AbstractComponent {
+ @Deprecated // TODO cuto over to the PATH API
@bleskes
bleskes Nov 11, 2014 Member

A little typo in the comment

@bleskes bleskes commented on an outdated diff Nov 11, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
private final File[] nodeFiles;
+ private final Path[] nodePaths;
+ @Deprecated // TODO cuto over to the PATH API
@bleskes
bleskes Nov 11, 2014 Member

same typo - copy paste probably

@bleskes bleskes commented on an outdated diff Nov 11, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
private final File[] nodeIndicesLocations;
+ private final Path[] nodeIndicesPaths;
@bleskes
bleskes Nov 11, 2014 Member

this one is not used - can we remove it until we cat over to the paths API

@bleskes bleskes commented on an outdated diff Nov 11, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ final Path lockDir = path.resolve(Paths.get(SHARD_LOCK_DIR));
+ if (Files.exists(lockDir) && Files.isDirectory(lockDir)) {
+ Files.walkFileTree(lockDir, new FileVisitor<Path>() {
+ NativeFSLockFactory lockFactory = null;
+
+ @Override
+ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
+ lockFactory = new NativeFSLockFactory(dir);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ assert lockFactory != null;
+ if (file.getFileName().toString().endsWith(".lock")) {
+ lockFactory.makeLock(file.getFileName().toString()).close();
@bleskes
bleskes Nov 11, 2014 Member

is this needed? as far as I can tell, the implementation does nothing?:

 try {
      if (lock != null) { <-- **lock is null**
        try {
          lock.release();
          lock = null;
        } finally {
          clearLockHeld(path);
        }
      }
    } finally {
      IOUtils.close(channel); <-- channel is null
      channel = null;
    }
@bleskes bleskes commented on an outdated diff Nov 11, 2014
...earch/indices/cluster/IndicesClusterStateService.java
@@ -254,7 +261,26 @@ private void applyDeletedIndices(final ClusterChangedEvent event) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index, no longer part of the metadata", index);
}
- removeIndex(index, "index no longer part of the metadata");
+ removeIndex(index, "index no longer part of the metadata", new Store.OnCloseListener() {
+ @Override
+ public void onClose(ShardId shardId) {
+ // this is executed while we still hold the lock on that shard
+ try {
+ final Path[] files = nodeEnvironment.shardPaths(shardId);
+ IOUtils.rm(files);
+ logger.debug("deleted shard [{}] from filesystem", shardId);
+ } catch (IOException e) {
+ logger.warn("Can't delete shard: " + shardId, e);
+ }
+ try {
+ nodeEnvironment.deleteIndexDirectorySafe(shardId.index());
@bleskes
bleskes Nov 11, 2014 Member

I think we can better call this in the Store.OnCloseListener.afterIndexClosed(). Feels cleaner to call this once we think all shards have bee removed.

@bleskes
bleskes Nov 11, 2014 Member

Ignore that. Got my listeners confused.

@bleskes bleskes and 1 other commented on an outdated diff Nov 11, 2014
...earch/indices/cluster/IndicesClusterStateService.java
@@ -254,7 +261,26 @@ private void applyDeletedIndices(final ClusterChangedEvent event) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index, no longer part of the metadata", index);
}
- removeIndex(index, "index no longer part of the metadata");
+ removeIndex(index, "index no longer part of the metadata", new Store.OnCloseListener() {
+ @Override
+ public void onClose(ShardId shardId) {
+ // this is executed while we still hold the lock on that shard
@bleskes
bleskes Nov 11, 2014 Member

Any way we can assert on this?

@s1monw
s1monw Nov 12, 2014 Contributor

not that I know of it's part of the contract

@bleskes bleskes commented on the diff Nov 11, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ final Path[] paths = shardPaths(shardId);
+ try (Closeable lock = shardLock(shardId)) {
+ IOUtils.rm(paths);
+ }
+ }
+
+ /**
+ * Deletes an indexes data directory recursively iff all of the indexes
+ * shards locks were successfully acquired. If any of the indexes shard directories can't be locked
+ * or deleted the top level index directory will not be deleted either. Instead all other shards sub-directories
+ * are deleted ie. the index directory is partially deleted.
+ *
+ * @param index the index to delete
+ * @throws Exception if any of the shards data directories can't be locked or deleted
+ */
+ public void deleteIndexDirectorySafe(Index index) throws IOException {
@bleskes
bleskes Nov 11, 2014 Member

I think the name of the method is misleading. Maybe call it purgeIndexDirectory? as it doesn't really delete it but rather removes all unlocked shards and if all succeeds removes the index folder as well

@s1monw
s1monw Nov 12, 2014 Contributor

I rewrote the method to be transactional

@bleskes
bleskes Nov 13, 2014 Member

much cleaner. thx.

@dakrone
dakrone Nov 13, 2014 Member

It looks like if index is null here, we will end up locking all shards for all indices, then hit an NPE, then release all the locks. Would it be better to bail early if index is null without trying to acquire locks? It seems a little strange here since a null Index is used in some of the other methods to indicate "all indices".

@bleskes bleskes commented on the diff Nov 11, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+
+ /**
+ * Deletes a shard data directory iff the shards locks were successfully acquired.
+ *
+ * @param shardId the id of the shard to delete to delete
+ * @throws IOException if an IOException occurs
+ */
+ public void cleanShardSafe(ShardId shardId) throws IOException {
+ final Path[] paths = shardPaths(shardId);
+ try (Closeable lock = shardLock(shardId)) {
+ IOUtils.rm(paths);
+ }
+ }
+
+ /**
+ * Deletes an indexes data directory recursively iff all of the indexes
@bleskes
bleskes Nov 11, 2014 Member

Imho this is a confusing description. Maybe "Deletes all shards directories of an index for which a lock can be acquired. If all shards dirs are removed, the top level index directory is removed as well."

@bleskes bleskes commented on an outdated diff Nov 11, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ ExceptionsHelper.rethrowAndSuppress(exceptions);
+ success = true;
+ } finally {
+ try {
+ if (success) {
+ IOUtils.rm(FileSystemUtils.toPaths(indexLocations(index)));
+ }
+ } finally {
+ IOUtils.closeWhileHandlingException(locks);
+ }
+
+ }
+ }
+
+ /**
+ * Tries to lock all local shards for the given index. If any of the indexes shards lock can not be acquired
@bleskes
bleskes Nov 11, 2014 Member

minor typo - "indexes shards lock" -> "shard locks"

@bleskes bleskes commented on an outdated diff Nov 11, 2014
...h/gateway/local/state/meta/LocalGatewayMetaState.java
} catch (Exception ex) {
logger.debug("[{}] failed to delete dangling index", ex, indexName);
}
} else {
+ try { // the index deletion might not have worked due to shards still being locked
+ IOUtils.closeWhileHandlingException(nodeEnv.lockAllForIndex(new Index(indexName)));
+ } catch (IOException ex) {
+ logger.debug("[{}] skipping locked dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled);
@bleskes
bleskes Nov 11, 2014 Member

I think this message is misleading - we don't actually schedule the delete of this index but rather ignore it. Maybe change to "failed to lock a dangling index [{}], probably in the process in being deleted, ignoring." . I also think we should include the exception as it may not be a LockObtainFailedException but something else.

@s1monw
Contributor
s1monw commented Nov 11, 2014

Update: I think I get why its a different directory, the ability to delete while holding the lock, if thats the case, then it makes sense. On crappy internet, will try and complete the review later tonight....

correct that is the only reason.

@bleskes bleskes commented on an outdated diff Nov 11, 2014
...h/gateway/local/state/meta/LocalGatewayMetaState.java
@@ -246,7 +245,7 @@ public void clusterChanged(ClusterChangedEvent event) {
logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keys());
if (nodeEnv.hasNodeFile()) {
try {
- IOUtils.rm(FileSystemUtils.toPaths(nodeEnv.indexLocations(new Index(current.index()))));
+ nodeEnv.deleteIndexDirectorySafe(new Index(current.index()));
} catch (Exception ex) {
logger.debug("[{}] failed to delete index", ex, current.index());
@bleskes
bleskes Nov 11, 2014 Member

if I understand correctly,this can happen when the store is not yet closed and the index will be removed once the store is closed. Can we add a comment about this? also, do we want a distinction between a LockObtainFailedException (ok) and other IOExceptions (not ok and thus log as error)?

@bleskes bleskes commented on an outdated diff Nov 11, 2014
...ava/org/elasticsearch/index/service/IndexService.java
@@ -72,9 +73,6 @@
IndexShard createShard(int sShardId) throws ElasticsearchException;
- /**
- * Removes the shard, does not delete local data or the gateway.
- */
@bleskes
bleskes Nov 11, 2014 Member

I wonder why you removed this comment?

@bleskes bleskes commented on an outdated diff Nov 11, 2014
...elasticsearch/index/service/InternalIndexService.java
try {
- shardInjector = modules.createChildInjector(injector);
- } catch (CreationException e) {
- throw new IndexShardCreationException(shardId, Injectors.getFirstErrorFailure(e));
- } catch (Throwable e) {
- throw new IndexShardCreationException(shardId, e);
- }
-
- shardsInjectors = newMapBuilder(shardsInjectors).put(shardId.id(), shardInjector).immutableMap();
+ lock = nodeEnv.shardLock(shardId);
@bleskes
bleskes Nov 11, 2014 Member

Why not be lenient here and use some short timeout to allow background things, like old recoveries, to wrap up?

@bleskes bleskes commented on an outdated diff Nov 11, 2014
...elasticsearch/index/service/InternalIndexService.java
}
@Override
- public synchronized void removeShard(int shardId, String reason) throws ElasticsearchException {
+ public void removeShard(int shardId, String reason) throws ElasticsearchException {
+ removeShard(shardId, reason, null);
+ }
+
+ public synchronized void removeShard(int shardId, String reason, Store.OnCloseListener listener) throws ElasticsearchException {
@bleskes
bleskes Nov 11, 2014 Member

can we mark listener as @nullable?

@bleskes bleskes and 1 other commented on an outdated diff Nov 11, 2014
...org/elasticsearch/index/shard/service/IndexShard.java
@@ -183,6 +184,8 @@
void readAllowed(Mode mode) throws IllegalIndexShardStateException;
+ Store store();
@bleskes
bleskes Nov 11, 2014 Member

Do we need this? The store is a tricky animal (refcounted and all). Code compiles fine without this so there is no need to expose it?

@s1monw
s1monw Nov 12, 2014 Contributor

leftover

@bleskes bleskes commented on the diff Nov 11, 2014
...est/java/org/elasticsearch/index/store/StoreTest.java
@@ -52,7 +52,7 @@
public void testRefCount() throws IOException {
@bleskes
bleskes Nov 11, 2014 Member

can we add the new listener to the test?

@bleskes bleskes commented on the diff Nov 11, 2014
.../elasticsearch/test/store/MockFSDirectoryService.java
@@ -168,4 +170,9 @@ public StoreRateLimiting rateLimiting() {
public long throttleTimeInNanos() {
return delegateService.throttleTimeInNanos();
}
+
+ @Override
+ public Directory newFromDistributor(Distributor distributor) throws IOException {
+ return helper.wrap(super.newFromDistributor(distributor));
@bleskes
bleskes Nov 11, 2014 Member

I'm not 100% sure, but this translates to double wrapping - both of the distributor and it's underlying directories. Wouldn't that result in a much higher chance of exception throwing? maybe we should not wrap the underlying dirs anymore?

@s1monw
s1monw Nov 12, 2014 Contributor

yeah so I am not sure if we shouldn't or not it just gives us a second layer and we can finally run checkindex?

@bleskes
Member
bleskes commented Nov 11, 2014

I like the change. Left comments here and there.

I might have missed, but where do we clean up lock files of indices /shards that were deleted/relocated and everything is fine? I assume we can only delete a file if someone else doesn't have a lock. O.w. this doesn't work.

One more thing I was wondering is whether InternalNode.close should try and acquire all shard locks (and wait for that) to ensure all background tasks complete gracefully. Once acquired, I think we can also delete the lock files to clean up the directory? (ala the current code for startup)

@s1monw
Contributor
s1monw commented Nov 12, 2014

I might have missed, but where do we clean up lock files of indices /shards that were deleted/relocated and everything is fine? I assume we can only delete a file if someone else doesn't have a lock. O.w. this doesn't work.

we do only do that when we create the NodeEnvironment ie on node-startup. Unfortunately there is a race so we can't really delete these files while we are running the node.

One more thing I was wondering is whether InternalNode.close should try and acquire all shard locks (and wait for that) to ensure all background tasks complete gracefully. Once acquired, I think we can also delete the lock files to clean up the directory? (ala the current code for startup)

can you explain the benefit of this?

@dakrone dakrone commented on an outdated diff Nov 12, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
@@ -200,13 +415,21 @@ public boolean hasNodeFile() {
return indices;
}
- public Set<ShardId> findAllShardIds() throws Exception {
+ /**
+ * Tries to find all allocated shards for the given index or for all indices iff the given index is <code>null</code>
+ * on the current node. NOTE: This methods is prone to race-conditions on the filesystem layer since it might now
@dakrone
dakrone Nov 12, 2014 Member

"now" should be "not"

@dakrone dakrone commented on an outdated diff Nov 12, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
@@ -234,6 +460,18 @@ public boolean hasNodeFile() {
return shardIds;
}
+ /**
+ * Tries to find all allocated shards for all indices iff the given index on the current node. NOTE: This methods
+ * is prone to race-conditions on the filesystem layer since it might now see directories created concurrently or
@dakrone
dakrone Nov 12, 2014 Member

Same "now" -> "not" typo here

@dakrone dakrone commented on an outdated diff Nov 12, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
@@ -269,19 +507,21 @@ private boolean assertEnvIsLocked() {
* This method cleans up all files even in the case of an error.
*/
public void ensureAtomicMoveSupported() throws IOException {
- for (File file : nodeFiles) {
- assert file.isDirectory();
- final Path src = new File(file, "__es__.tmp").toPath();
+ final Path[] nodePaths = nodeDataPaths();
+ for (Path file : nodePaths) {
@dakrone
dakrone Nov 12, 2014 Member

I think it would be clearer to rename file to dir or directory here

@dakrone dakrone commented on an outdated diff Nov 12, 2014
...elasticsearch/index/service/InternalIndexService.java
@@ -264,7 +270,7 @@ public IndexEngine engine() {
return indexEngine;
}
- public void close(final String reason, @Nullable Executor executor) {
+ public void close(final String reason, @Nullable Executor executor, final Store.OnCloseListener listener) {
@dakrone
dakrone Nov 12, 2014 Member

listener should be marked @Nullable here

@dakrone dakrone commented on the diff Nov 12, 2014
...g/elasticsearch/index/store/DistributorDirectory.java
- for (Directory d : all) {
- for (String file : d.listAll()) {
- final Directory directory = dir.nameDirMapping.get(file);
- if (directory == null) {
- consistent = false;
- builder.append("File ").append(file)
- .append(" was not mapped to a directory but exists in one of the distributors directories")
- .append(System.lineSeparator());
- } else if (directory != d) {
- consistent = false;
- builder.append("File ").append(file).append(" was mapped to a directory ").append(directory)
- .append(" but exists in another distributor directory").append(d)
- .append(System.lineSeparator());
- }
-
+ private synchronized boolean assertConsistency() throws IOException {
@dakrone
dakrone Nov 12, 2014 Member

It looks like this includes the change from #8383? (nothing that needs to change, just curious)

@s1monw
s1monw Nov 12, 2014 Contributor

yes some of them. The problem here is that the actual problems that I fix here in this PR were never triggered since we catch the assert failures in our test. This PR now wraps the dirstributor dir in a MockDirWrapper which make tests fail that is why I added them

@dakrone
Member
dakrone commented Nov 12, 2014

Is it possible to add unit testing of NodeEnvironment's ShardLock locking and unlocking methods, so threaded locking and unlocking can be tested for race conditions?

@dakrone dakrone commented on an outdated diff Nov 12, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
@@ -234,6 +422,18 @@ public boolean hasNodeFile() {
return shardIds;
}
+ /**
+ * Tries to find all allocated shards for all indices iff the given index on the current node. NOTE: This methods
+ * is prone to race-conditions on the filesystem layer since it might now see directories created concurrently or
@dakrone
dakrone Nov 12, 2014 Member

"now" -> "not"

@dakrone
dakrone Nov 12, 2014 Member

Also, not sure why this javadoc got suddenly indented incorrectly past the function declaration

@dakrone dakrone commented on an outdated diff Nov 12, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ */
+ public ShardLock shardLock(ShardId id) throws IOException {
+ return shardLock(id, 0);
+ }
+
+ /**
+ * Tries to lock the given shards ID. A shard lock is required to perform any kind of
+ * write operation on a shards data directory like deleting files, creating a new index writer
+ * or recover from a different shard instance into it. If the shard lock can not be acquired
+ * an {@link LockObtainFailedException} is thrown
+ * @param id the shard ID to lock
+ * @param lockTimeout the lock timeout
+ * @return the shard lock. Call {@link ShardLock#close()} to release the lock
+ * @throws IOException if an IOException occurs.
+ */
+ public ShardLock shardLock(ShardId id, long lockTimeout) throws IOException {
@dakrone
dakrone Nov 12, 2014 Member

Can you add what unit lockTimeout is (milliseconds I think?) to the javadoc, or rename it to lockTimeoutMs

@dakrone dakrone commented on an outdated diff Nov 12, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
- }
- Integer shardId = Ints.tryParse(shardLocation.getName());
- if (shardId != null) {
- shardIds.add(new ShardId(indexName, shardId));
+ for (final Path location : locations) {
+ if (Files.exists(location) && Files.isDirectory(location)) {
+ try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(location)) {
+ for (Path indexPath : indexStream) {
+ if (index == null || index.equals(indexPath.getFileName().toString())) {
+ if (Files.exists(indexPath) && Files.isDirectory(indexPath)) {
+ try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
+ for (Path shardPath : stream) {
+ if (Files.exists(shardPath) && Files.isDirectory(shardPath)) {
+ Integer shardId = Ints.tryParse(shardPath.getFileName().toString());
+ if (shardId != null) {
+ shardIds.add(new ShardId(index, shardId));
@dakrone
dakrone Nov 12, 2014 Member

index can be null here, which causes an NPE because the ShardId constructor constructs a new Index object which in turn interns the name and dereferences the null object.

@s1monw
Contributor
s1monw commented Nov 12, 2014

@dakrone @bleskes I pushed a lot of changes along your comments - I didn't fix the unittests yet for the locks.... will do tonight or tomorrow.

@s1monw
Contributor
s1monw commented Nov 12, 2014

@dakrone added some unittests for NodeEnviroment too

@bleskes bleskes commented on an outdated diff Nov 13, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
}
}
}
return indices;
}
- public Set<ShardId> findAllShardIds() throws Exception {
- if (nodeFiles == null || locks == null) {
+ /**
+ * Tries to find all allocated shards for the given index or for all indices iff the given index is <code>null</code>
+ * on the current node. NOTE: This methods is prone to race-conditions on the filesystem layer since it might now
@bleskes
bleskes Nov 13, 2014 Member

typo : now -> not

@bleskes bleskes commented on an outdated diff Nov 13, 2014
...elasticsearch/index/service/InternalIndexService.java
+ } else {
+ store.close(new Store.OnCloseListener() {
+ @Override
+ public void onClose(ShardId shardId) {
+ listener.onShardClosed(shardId);
+ }
+ });
+ }
+ } catch (Throwable e) {
+ logger.warn("[{}] failed to close store on shard deletion", e, shardId);
+ }
+ Injectors.close(injector);
+
+ logger.debug("[{}] closed (reason: [{}])", shardId, reason);
+ } catch (Throwable t) {
+ if (listenerPassed == false) { // only notify if the listener wasn't passed to the store
@bleskes
bleskes Nov 13, 2014 Member

Listener can be null here.

@bleskes bleskes commented on an outdated diff Nov 13, 2014
...elasticsearch/index/service/InternalIndexService.java
}
- // call this before we close the store, so we can release resources for it
- indicesLifecycle.afterIndexShardClosed(sId, indexShard);
- // if we delete or have no gateway or the store is not persistent, clean the store...
- Store store = shardInjector.getInstance(Store.class);
- // and close it
- try {
- store.close();
- } catch (Throwable e) {
- logger.warn("[{}] failed to close store on shard deletion", e, shardId);
+ @Override
+ public void onShardClosed(ShardId shardId) {
+ assert countDown.isCountedDown() == false;
+ assert shardIds.contains(shardId.getId()) : "Unkown shard id";
@bleskes
bleskes Nov 13, 2014 Member

unkown -> uknown

@bleskes bleskes commented on the diff Nov 13, 2014
...elasticsearch/index/service/InternalIndexService.java
- logger.debug("[{}] closed (reason: [{}])", shardId, reason);
+ @Override
+ public void onShardCloseFailed(ShardId shardId, Throwable t) {
+ assert countDown.isCountedDown() == false;
+ assert shardIds.contains(shardId.getId()) : "Unkown shard id";
@bleskes
bleskes Nov 13, 2014 Member

unkown -> uknown

@bleskes bleskes commented on an outdated diff Nov 13, 2014
src/main/java/org/elasticsearch/index/store/Store.java
try {
directory.innerClose(); // this closes the distributorDirectory as well
} catch (IOException e) {
logger.debug("failed to close directory", e);
+ } finally {
+ IOUtils.closeWhileHandlingException(shardLock);
+ try {
+ if (listener != null) { // we are still holding the shard lock while we execute this.
@bleskes
bleskes Nov 13, 2014 Member

the comment is no longer true

@bleskes bleskes commented on an outdated diff Nov 13, 2014
src/main/java/org/elasticsearch/index/store/Store.java
@@ -1101,4 +1117,18 @@ public void markStoreCorrupted(IOException exception) throws IOException {
directory().sync(Collections.singleton(uuid));
}
}
+
+ /**
+ * A listener that is called once this store is closed and all references are released
+ */
+ public static interface OnCloseListener {
+ /**
+ * Called once the store is closed and all references are released. Note:
+ * This methods is called while the shards lock is still hold by the caller of this method. Operations
@bleskes
bleskes Nov 13, 2014 Member

we need to change these docs. The lock is not held while the method is called.

@bleskes bleskes commented on the diff Nov 13, 2014
...org/elasticsearch/indices/InternalIndicesService.java
latch.countDown();
+ logger.warn("failed to delete index on stop [" + index + "]", e);
}
}
});
@bleskes
bleskes Nov 13, 2014 Member

I can't place the comment on the line because it's out of this diff ([1]) but I think we should add a safety timeout to the latch.await() in this function. We now have a listener that is being passed through many layers and we run the risk of some exception being handled wrong (now or in the future).

[1] code referred to

  try {
             latch.await();
         } catch (InterruptedException e) {
             // ignore
         } finally {
             shardsStopExecutor.shutdown();
             indicesStopExecutor.shutdown();
         }
@s1monw
s1monw Nov 13, 2014 Contributor

I pushed a 30 sec timeout

@bleskes bleskes commented on an outdated diff Nov 13, 2014
.../java/org/elasticsearch/env/NodeEnvironmentTests.java
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class NodeEnvironmentTests extends ElasticsearchTestCase {
+
+ @Test
+ public void testNodeLockSingleEnvironment() {
+ File file = newTempDir();
+ Settings settings = ImmutableSettings.builder()
+ .put("path.home", file.getAbsolutePath())
@bleskes
bleskes Nov 13, 2014 Member

maybe randomly use multiple paths here?

@bleskes bleskes commented on an outdated diff Nov 13, 2014
.../java/org/elasticsearch/env/NodeEnvironmentTests.java
+ }
+ env.close();
+
+ // now can recreate and lock it
+ env = new NodeEnvironment(settings, new Environment(settings));
+ assertEquals(env.nodeDataPaths().length, 1);
+ assertEquals(paths.length, 1);
+ assertEquals(env.nodeDataPaths()[0], paths[0]);
+ env.close();
+ }
+
+ @Test
+ public void testNodeLockMultipleEnvironment() throws IOException {
+ File file = newTempDir();
+ Settings settings = ImmutableSettings.builder()
+ .put("path.home", file.getAbsolutePath())
@bleskes
bleskes Nov 13, 2014 Member

same comment here about multiple paths

@bleskes bleskes commented on an outdated diff Nov 13, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ synchronized (this) {
+ if (shardLocks.containsKey(id)) {
+ shardLock = shardLocks.get(id);
+ } else {
+ shardLock = new Semaphore(1);
+ try {
+ shardLock.acquire();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ shardLocks.put(id, shardLock);
+ return new InternalLock(id);
+ }
+ }
+
+ // this uses a countdown latch rather than a lock since locks are reentrant and
@bleskes
bleskes Nov 13, 2014 Member

left over reference to a countdown latch

@bleskes bleskes and 1 other commented on an outdated diff Nov 13, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ Thread.currentThread().interrupt();
+ }
+ shardLocks.put(id, shardLock);
+ return new InternalLock(id);
+ }
+ }
+
+ // this uses a countdown latch rather than a lock since locks are reentrant and
+ // we don't want the semantics here.
+ try {
+ // try the wait outside of the sync block
+ if (shardLock.tryAcquire(lockTimeoutMS, TimeUnit.MILLISECONDS)) {
+ synchronized (this) {
+ // put it back there it was release
+ assert shardLocks.get(id) == null || shardLocks.get(id) == shardLock;
+ shardLocks.put(id, shardLock);
@bleskes
bleskes Nov 13, 2014 Member

now that we use a semaphore, we don't need to put it back - we can remove the synchronised as well

@s1monw
s1monw Nov 13, 2014 Contributor

well we can't since we remove it in the InternalLock it's part of the design!

@bleskes bleskes and 1 other commented on an outdated diff Nov 13, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ public Set<ShardId> lockedShards() {
+ synchronized (this) {
+ ImmutableSet.Builder<ShardId> builder = ImmutableSet.builder();
+ return builder.addAll(shardLocks.keySet()).build();
+ }
+ }
+
+ private final class InternalLock extends ShardLock {
+
+ InternalLock(ShardId id) {
+ super(id);
+ }
+
+ @Override
+ protected void closeInternal() {
+ synchronized (NodeEnvironment.this) {
@bleskes
bleskes Nov 13, 2014 Member

I think this should just wrap the semaphore and call .release() on it. This will make it simpler and cleaner, no?

@s1monw
s1monw Nov 13, 2014 Contributor

how do we then clean up the map? I think this is pretty clean and should be just like it is here otherwise the map is never pruned?

@bleskes
bleskes Nov 13, 2014 Member

In our discussion about semaphores I understood a different model we keep a semaphore per index/shard directory (like the on the disk locks but in memory). That would be pruned when the folders are pruned. I see where you were heading. I'm fine with either way.

@bleskes bleskes commented on an outdated diff Nov 13, 2014
.../java/org/elasticsearch/env/NodeEnvironmentTests.java
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class NodeEnvironmentTests extends ElasticsearchTestCase {
+
+ @Test
+ public void testNodeLockSingleEnvironment() {
+ Settings settings = ImmutableSettings.builder()
+ .put("path.home", tmpPaths())
@bleskes
bleskes Nov 13, 2014 Member

these should be :

                .putArray("path.data", tmpPaths())
@bleskes bleskes commented on an outdated diff Nov 13, 2014
.../java/org/elasticsearch/env/NodeEnvironmentTests.java
+
+ }
+ env.close();
+
+ // now can recreate and lock it
+ env = new NodeEnvironment(settings, new Environment(settings));
+ assertEquals(env.nodeDataPaths().length, 1);
+ assertEquals(paths.length, 1);
+ assertEquals(env.nodeDataPaths()[0], paths[0]);
+ env.close();
+ }
+
+ @Test
+ public void testNodeLockMultipleEnvironment() throws IOException {
+ Settings settings = ImmutableSettings.builder()
+ .put("path.home", tmpPaths())
@bleskes
bleskes Nov 13, 2014 Member

same as above

@bleskes bleskes commented on an outdated diff Nov 13, 2014
.../java/org/elasticsearch/env/NodeEnvironmentTests.java
+ public void testNodeLockMultipleEnvironment() throws IOException {
+ Settings settings = ImmutableSettings.builder()
+ .put("path.home", tmpPaths())
+ .put("node.max_local_storage_nodes", 2).build();
+ NodeEnvironment first = new NodeEnvironment(settings, new Environment(settings));
+ NodeEnvironment second = new NodeEnvironment(settings, new Environment(settings));
+ assertEquals(first.nodeDataPaths().length, 1);
+ assertEquals(second.nodeDataPaths().length, 1);
+ assertEquals(first.nodeDataPaths()[0].getParent(), second.nodeDataPaths()[0].getParent());
+ IOUtils.close(first, second);
+ }
+
+ @Test
+ public void testShardLock() throws IOException {
+ Settings settings = ImmutableSettings.builder()
+ .put("path.home", tmpPaths())
@bleskes
bleskes Nov 13, 2014 Member

same as above

@bleskes bleskes commented on an outdated diff Nov 13, 2014
.../java/org/elasticsearch/env/NodeEnvironmentTests.java
+ env.shardLock(new ShardId("foo", 1)).close();
+
+ List<ShardLock> locks = env.lockAllForIndex(new Index("foo"));
+ try {
+ env.shardLock(new ShardId("foo", randomBoolean() ? 1 : 2));
+ fail("shard is locked");
+ } catch (IOException ex) {
+ // expected
+ }
+ IOUtils.close(locks);
+ }
+
+ @Test
+ public void testGetAllIndices() throws Exception {
+ Settings settings = ImmutableSettings.builder()
+ .put("path.home", tmpPaths())
@bleskes
bleskes Nov 13, 2014 Member

same as above

@bleskes bleskes commented on an outdated diff Nov 13, 2014
.../java/org/elasticsearch/env/NodeEnvironmentTests.java
+ for (int i = 0; i < numIndices; i++) {
+ for (Path path : env.indexPaths(new Index("foo" + i))) {
+ Files.createDirectories(path);
+ }
+ }
+ Set<String> indices = env.findAllIndices();
+ assertEquals(indices.size(), numIndices);
+ for (int i = 0; i < numIndices; i++) {
+ assertTrue(indices.contains("foo" + i));
+ }
+ }
+
+ @Test
+ public void testDeleteSafe() throws IOException {
+ Settings settings = ImmutableSettings.builder()
+ .put("path.home", tmpPaths())
@bleskes
bleskes Nov 13, 2014 Member

same as above

@dakrone dakrone commented on an outdated diff Nov 13, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
private final File[] nodeFiles;
- private final File[] nodeIndicesLocations;
+ private final Path[] nodePaths;
+ private final Path[] nodeIndicesPaths;
@dakrone
dakrone Nov 13, 2014 Member

Can you add a comment line for each of these so someone can tell what the difference between nodePaths and nodeIndicesPaths is? I am guessing that nodePaths is the data.path setting and nodeIndicesPaths is the indices folder underneath that?

@bleskes
Member
bleskes commented Nov 13, 2014

I love the new listeners and the fact that we moved to in memory locks. left some comments about left over clean ups due to the many iterations.

@dakrone dakrone commented on an outdated diff Nov 13, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ }
+ }
+
+ // this uses a countdown latch rather than a lock since locks are reentrant and
+ // we don't want the semantics here.
+ try {
+ // try the wait outside of the sync block
+ if (shardLock.tryAcquire(lockTimeoutMS, TimeUnit.MILLISECONDS)) {
+ synchronized (this) {
+ // put it back there it was release
+ assert shardLocks.get(id) == null || shardLocks.get(id) == shardLock;
+ shardLocks.put(id, shardLock);
+ return new InternalLock(id);
+ }
+ } else {
+ throw new LockObtainFailedException("Can't lock shard " + id + ", already locked");
@dakrone
dakrone Nov 13, 2014 Member

Can you add that the lock acquiring timed out after <lockTimeoutMS> milliseconds to the exception message?

@dakrone dakrone commented on an outdated diff Nov 13, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ throw new LockObtainFailedException("Can't lock shard " + id + ", interrupted", ex);
+
+ }
+ }
+
+ /**
+ * Returns all currently lock shards
+ */
+ public Set<ShardId> lockedShards() {
+ synchronized (this) {
+ ImmutableSet.Builder<ShardId> builder = ImmutableSet.builder();
+ return builder.addAll(shardLocks.keySet()).build();
+ }
+ }
+
+ private final class InternalLock extends ShardLock {
@dakrone
dakrone Nov 13, 2014 Member

Can this be named InternalShardLock so its name doesn't conflict with InternalEngine.InternalLock?

@dakrone
dakrone Nov 13, 2014 Member

Also, would it be helpful for this to implement Releasable also?

@dakrone
dakrone Nov 13, 2014 Member

Oh, nevermind on the second point, I see ShardLock implements Closable already.

@dakrone dakrone commented on an outdated diff Nov 13, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
- Integer shardId = Ints.tryParse(shardLocation.getName());
- if (shardId != null) {
- shardIds.add(new ShardId(indexName, shardId));
+ for (final Path location : locations) {
+ if (Files.exists(location) && Files.isDirectory(location)) {
+ try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(location)) {
+ for (Path indexPath : indexStream) {
+ final String currentIndex = indexPath.getFileName().toString();
+ if (index == null || index.equals(currentIndex)) {
+ if (Files.exists(indexPath) && Files.isDirectory(indexPath)) {
+ try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
+ for (Path shardPath : stream) {
+ if (Files.exists(shardPath) && Files.isDirectory(shardPath)) {
+ Integer shardId = Ints.tryParse(shardPath.getFileName().toString());
+ if (shardId != null) {
+ shardIds.add(new ShardId(currentIndex, shardId));
@dakrone
dakrone Nov 13, 2014 Member

This many levels of nesting hurts my head! How about refactoring the inner half into a private findShardIds(@Nullable String index, Path indexPath) method so it's easier to read? I'm worried about the potential for future typos for anyone else touching this code

@dakrone dakrone commented on an outdated diff Nov 13, 2014
...ava/org/elasticsearch/index/service/IndexService.java
@@ -35,6 +35,7 @@
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.settings.IndexSettingsService;
+import org.elasticsearch.index.shard.ShardId;
@dakrone
dakrone Nov 13, 2014 Member

Unneeded import and extra line addition to this file

@dakrone dakrone commented on an outdated diff Nov 13, 2014
...elasticsearch/index/service/InternalIndexService.java
- try {
- latch.await();
- } catch (InterruptedException e) {
- logger.debug("Interrupted closing index [{}]", e, index().name());
- Thread.currentThread().interrupt();
- }
+ public void close(final String reason, @Nullable Executor executor, final IndicesService.IndexCloseListener listener) {
+ if (closed.compareAndSet(false, true)) {
+ final Set<Integer> shardIds = shardIds();
+ final CountDownLatch latch = new CountDownLatch(shardIds.size());
+ final IndicesService.IndexCloseListener innerListener = listener == null ? null :
+ new PerShardIndexCloseListener(shardIds, listener);
+ for (final int shardId : shardIds) {
+
+ executor = executor == null ? threadPool.generic() : executor;
+ executor.execute(new Runnable() {
@dakrone
dakrone Nov 13, 2014 Member

I think this should be an AbstractRunnable, so it can either set isForcedExecution to true, or handle rejections gracefully, in the event the executor is ever set to a threadpool other than the generic one, or if we limit the queue_size for the generic threadpool (which we have discussed doing)

@dakrone dakrone commented on an outdated diff Nov 13, 2014
...elasticsearch/index/service/InternalIndexService.java
- IndexShard indexShard = shardInjector.getInstance(IndexShard.class);
+ indicesLifecycle.beforeIndexShardCreated(shardId);
+
+ logger.debug("creating shard_id [{}]", shardId.id());
@dakrone
dakrone Nov 13, 2014 Member

It would be better to change this to:

logger.debug("creating shard_id {}", shardId);

To log both the index and shard ID, since ShardId has a nice .toString() method

@dakrone dakrone commented on the diff Nov 13, 2014
...n/java/org/elasticsearch/index/store/StoreModule.java
@@ -53,6 +56,7 @@ public void setDistributor(Class<? extends Distributor> distributor) {
protected void configure() {
bind(DirectoryService.class).to(indexStore.shardDirectory()).asEagerSingleton();
bind(Store.class).asEagerSingleton();
+ bind(ShardLock.class).toInstance(lock);
@dakrone
dakrone Nov 13, 2014 Member

I'm not that familiar with Guice, so this is just a question, but does this need to have .asEagerSingleton() added to it?

@bleskes
bleskes Nov 13, 2014 Member

I chased this one up, wondering the same thing and there is no asEagerSingelton for instances. We seem to do a similar thing in many other places.

@kimchy
kimchy Nov 15, 2014 Member

yea, there is no need, if you bind an actual instance, it is by definition a singleton (how would it not be, can't really create another instance of it..)

@dakrone dakrone commented on an outdated diff Nov 13, 2014
...earch/indices/cluster/IndicesClusterStateService.java
+ try {
+ nodeEnvironment.deleteIndexDirectorySafe(index);
+ logger.debug("deleted index [{}] from filesystem", index);
+ } catch (Exception e) {
+ logger.debug("failed to deleted index [{}] from filesystem", e, index);
+ // ignore - still some shards locked here
+ }
+ }
+
+ @Override
+ public void onShardClosed(ShardId shardId) {
+ try {
+ nodeEnvironment.deleteShardDirectorySafe(shardId);
+ logger.debug("deleted shard [{}] from filesystem", shardId);
+ } catch (IOException e) {
+ logger.warn("Can't delete shard: " + shardId, e);
@dakrone
dakrone Nov 13, 2014 Member

Minor comment to use logging formatting syntax instead of string concatenation

@dakrone dakrone commented on an outdated diff Nov 13, 2014
.../java/org/elasticsearch/env/NodeEnvironmentTests.java
+ for (Path path : env.indexPaths(new Index("foo" + i))) {
+ Files.createDirectories(path);
+ }
+ }
+ Set<String> indices = env.findAllIndices();
+ assertEquals(indices.size(), numIndices);
+ for (int i = 0; i < numIndices; i++) {
+ assertTrue(indices.contains("foo" + i));
+ }
+ }
+
+ @Test
+ public void testDeleteSafe() throws IOException {
+ Settings settings = ImmutableSettings.builder()
+ .put("path.home", tmpPaths())
+ .put("node.max_local_storage_nodes", 2).build();
@dakrone
dakrone Nov 13, 2014 Member

It would make sense to refactor this into private Settings newNodeEnvSettings() since every test requires it, in case it ever has to be changed in the future.

@dakrone dakrone commented on an outdated diff Nov 13, 2014
.../java/org/elasticsearch/env/NodeEnvironmentTests.java
+ }
+
+ @Test
+ public void testShardLock() throws IOException {
+ Settings settings = ImmutableSettings.builder()
+ .put("path.home", tmpPaths())
+ .put("node.max_local_storage_nodes", 2).build();
+ NodeEnvironment env = new NodeEnvironment(settings, new Environment(settings));
+
+ ShardLock fooLock = env.shardLock(new ShardId("foo", 1));
+ assertEquals(new ShardId("foo", 1), fooLock.getShardId());
+
+ try {
+ env.shardLock(new ShardId("foo", 1));
+ fail("shard is locked");
+ } catch (IOException ex) {
@dakrone
dakrone Nov 13, 2014 Member

Can this be a more specific LockObtainFailedException exception?

@dakrone dakrone commented on an outdated diff Nov 13, 2014
.../java/org/elasticsearch/env/NodeEnvironmentTests.java
+
+ try {
+ env.shardLock(new ShardId("foo", 1));
+ fail("shard is locked");
+ } catch (IOException ex) {
+ // expected
+ }
+ for (Path path : env.indexPaths(new Index("foo"))) {
+ Files.createDirectories(path.resolve("1"));
+ Files.createDirectories(path.resolve("2"));
+ }
+
+ try {
+ env.lockAllForIndex(new Index("foo"));
+ fail("shard 1 is locked");
+ } catch (IOException ex) {
@dakrone
dakrone Nov 13, 2014 Member

Same here

@dakrone dakrone commented on an outdated diff Nov 13, 2014
...est/java/org/elasticsearch/index/store/StoreTest.java
@@ -69,7 +71,15 @@ public void testRefCount() throws IOException {
}
store.incRef();
- store.close();
+ final AtomicBoolean called = new AtomicBoolean(false);
+ Store.OnCloseListener listener = new Store.OnCloseListener() {
+ @Override
+ public void onClose(ShardId shardId) {
+ assertFalse(called.get());
+ called.set(true);
@dakrone
dakrone Nov 13, 2014 Member

This can be assertTrue(called.compareAndSet(false, true)) to avoid the race condition between the assert and the set.

@dakrone dakrone commented on an outdated diff Nov 13, 2014
...est/java/org/elasticsearch/index/store/StoreTest.java
@@ -101,6 +113,28 @@ public void testRefCount() throws IOException {
}
@Test
+ public void testListenerCanThrowException() throws IOException {
+ final ShardId shardId = new ShardId(new Index("index"), 1);
+ DirectoryService directoryService = new LuceneManagedDirectoryService(random());
+ final ShardLock shardLock = new DummyShardLock(shardId);
+ Store store = new Store(shardId, ImmutableSettings.EMPTY, null, directoryService, randomDistributor(directoryService), shardLock);
+ final AtomicBoolean called = new AtomicBoolean(false);
+ Store.OnCloseListener listener = new Store.OnCloseListener() {
+ @Override
+ public void onClose(ShardId shardId) {
+ assertFalse(called.get());
+ called.set(true);
@dakrone
dakrone Nov 13, 2014 Member

Same here for compareAndSet

@dakrone dakrone commented on an outdated diff Nov 13, 2014
src/test/java/org/elasticsearch/test/EmptyShardLock.java
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.test;
+
+import org.elasticsearch.env.ShardLock;
+import org.elasticsearch.index.shard.ShardId;
+
+/*
+ * A ShardLock that does nothing... for tests only
+ */
+public class EmptyShardLock extends ShardLock {
@dakrone
dakrone Nov 13, 2014 Member

This class is a duplicate of DummyShardLock and isn't used anywhere, it should be removed

@dakrone
Member
dakrone commented Nov 13, 2014

Left lots of comments, @s1monw does it make sense to have ShardLock implement java.util.concurrent.locks.Lock so it can use the .lock, .tryLock and .unlock naming conventions?

@s1monw
Contributor
s1monw commented Nov 13, 2014

Left lots of comments, @s1monw does it make sense to have ShardLock implement java.util.concurrent.locks.Lock so it can use the .lock, .tryLock and .unlock naming conventions?

thanks for the comments - I think I addressed them all... Regarding Lock I don't really want anybody to lock this outside of NodeEnironment it really has the semantics of a file based lock not like monitor. not sure how this would look like to be honest so no I don't think we should make it more complicated.

@dakrone dakrone commented on an outdated diff Nov 14, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ decWaitCount();
+ }
+
+ void incWaitCount() {
+ synchronized (shardLocks) {
+ assert waitCount > 0 : "waitCount is " + waitCount + " but should be > 0";
+ waitCount++;
+ }
+
+ }
+
+ private void decWaitCount() {
+ synchronized (shardLocks) {
+ assert waitCount > 0 : "waitCount is " + waitCount + " but should be > 0";
+ if (--waitCount == 0) {
+ shardLocks.remove(shardId);
@dakrone
dakrone Nov 14, 2014 Member

Do we want to assert that .remove didn't return null here?

@dakrone dakrone commented on an outdated diff Nov 14, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ }
+ }
+
+ void acquire(long timeoutInMillis) throws LockObtainFailedException{
+ boolean success = false;
+ try {
+ if (mutex.tryAcquire(timeoutInMillis, TimeUnit.MILLISECONDS) == false) {
+ throw new LockObtainFailedException("Can't lock shard " + shardId + ", timed out after " + timeoutInMillis + "ms");
+ }
+ success = true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new LockObtainFailedException("Can't lock shard " + shardId + ", interrupted", e);
+ } finally {
+ if (success == false) {
+ decWaitCount();
@dakrone
dakrone Nov 14, 2014 Member

Waiting on an acquire that fails looks like it can decrement the wait count without having incremented it (since the increment is outside of this function), would it be better to move the decWaitCount up one level to a finally block in the ShardLock method so the increment and decrement don't span two different locations/classes?

@dakrone dakrone commented on an outdated diff Nov 14, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
+ success = true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new LockObtainFailedException("Can't lock shard " + shardId + ", interrupted", e);
+ } finally {
+ if (success == false) {
+ decWaitCount();
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "InternalShardLock{" +
+ "waitCount=" + waitCount +
+ '}';
@dakrone
dakrone Nov 14, 2014 Member

Log the ShardId of the lock here too?

@dakrone dakrone commented on an outdated diff Nov 14, 2014
src/main/java/org/elasticsearch/env/NodeEnvironment.java
@@ -234,6 +456,34 @@ public boolean hasNodeFile() {
return shardIds;
}
+ private static void findAllShardsForIndex(Set<ShardId> shardIds, Path indexPath) throws IOException {
@dakrone
dakrone Nov 14, 2014 Member

I would definitely prefer a pure function here that returns a Set<ShardId> instead of mutating the method parameter

@dakrone dakrone commented on an outdated diff Nov 14, 2014
.../org/elasticsearch/index/store/CorruptedFileTest.java
@@ -92,7 +92,7 @@
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder()
// we really need local GW here since this also checks for corruption etc.
- // and we need to make sure primaries are not just trashed if we don'tmvn have replicas
+ // and we need to make sure primaries are not just trashed if we don't mvn have replicas
@dakrone
dakrone Nov 14, 2014 Member

"mvn"? :)

@s1monw
Contributor
s1monw commented Nov 15, 2014

I fixed all comments - I think it's ready.. refactorings can happen later this really goes out of date quickly...

@bleskes bleskes commented on an outdated diff Nov 15, 2014
...h/gateway/local/state/meta/LocalGatewayMetaState.java
} catch (Exception ex) {
- logger.debug("[{}] failed to delete index", ex, current.index());
+ logger.warn("[{}] failed to delete index", ex, current.index());
@bleskes
bleskes Nov 15, 2014 Member

I have second thoughts about this warn - it might be "normal" in the case that an ongoing recovery is holding a reference to the store while the index is deleted. The new recovery refcount and locking this is expected behavior. Should we log debug if LockObtainFailedException is caught (expected) and only do warn o.w.?

@bleskes bleskes commented on an outdated diff Nov 15, 2014
...h/gateway/local/state/meta/LocalGatewayMetaState.java
} catch (Exception ex) {
- logger.debug("[{}] failed to delete dangling index", ex, indexName);
+ logger.warn("[{}] failed to delete dangling index", ex, indexName);
@bleskes
bleskes Nov 15, 2014 Member

same comment about logging levels as above. If there is an ongoing recovery, we'll probably both have both log lines logged (one for trying to delete the index, one for ignoring it as dangling). Imho locked indices are not dangling (they are in use). Should we just log trace in the case of LockObtainFailedException?

@bleskes
Member
bleskes commented Nov 15, 2014

Left two comments about log levels. LGTM other wise. Agreed we need to push it and continue from here.

@dakrone
Member
dakrone commented Nov 15, 2014

LGTM!

@kimchy
Member
kimchy commented Nov 15, 2014

LGTM, I love the move to in memory locks.

@s1monw s1monw [CORE] Intorduce shards level locks to prevent concurrent shard modif…
…ications

Today it's possible that the data directory for a single shard is used by more than on
IndexShard->Store instances. While one shard is already closed but has a concurrent recovery
running and a new shard is creating it's engine files can conflict and data can potentially
be lost. We also remove shards data without checking if there are still users of the files
or if files are still open which can cause pending writes / flushes or the delete operation
to fail. If the latter is the case the index might be treated as a dangeling index and is brought
back to life at a later point in time.

This commit introduces a shard level lock that prevents modifications to the shard data
while it's still in use. Locks are created per shard and maintined in NodeEnvironment.java.
In contrast to most java concurrency primitives those locks are not reentrant.

This commit also adds infrastructure that checks if all shard locks are released after tests.
1c64a11
@s1monw s1monw merged commit 1c64a11 into elastic:master Nov 16, 2014
@s1monw s1monw removed the review label Nov 16, 2014
@s1monw s1monw deleted the s1monw:lock_store branch Nov 16, 2014
@s1monw
Contributor
s1monw commented Nov 16, 2014

thanks everybody for the intense & time consuming reviews it's a pretty low level change and lots of places are involved.

@bleskes bleskes added a commit to bleskes/elasticsearch that referenced this pull request Dec 19, 2014
@bleskes bleskes Internal: remove IndexCloseListener & Store.OnCloseListener
#8436 has introduced shard level locks in order to prevent directory reuse during fast deletion & creation of indices. As part for the change, close listeners were introduced to delete the folders once all out standing references were released. The new change has created race conditions causing shard folders not to be deleted (causing test failures due to left over corruption markers). This commit removes the listeners in favor of a simple timeout based solution to be use until a better listener based solution is ready ( #8608 ).
043605b
@bleskes bleskes added a commit to bleskes/elasticsearch that referenced this pull request Dec 19, 2014
@bleskes bleskes Internal: remove IndexCloseListener & Store.OnCloseListener
#8436 has introduced shard level locks in order to prevent directory reuse during fast deletion & creation of indices. As part for the change, close listeners were introduced to delete the folders once all out standing references were released. The new change has created race conditions causing shard folders not to be deleted (causing test failures due to left over corruption markers). This commit removes the listeners in favour of a simple timeout based solution to be use until a better listener based solution is ready ( #8608 ).

Closes #9009
a3980ca
@clintongormley clintongormley changed the title from [CORE] Intorduce shards level locks to prevent concurrent shard modifications to [CORE] Introduce shard level locks to prevent concurrent shard modifications Mar 19, 2015
@clintongormley clintongormley changed the title from [CORE] Introduce shard level locks to prevent concurrent shard modifications to Introduce shard level locks to prevent concurrent shard modifications Jun 6, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment