Skip to content

Commit

Permalink
Consolidate directory lock obtain code
Browse files Browse the repository at this point in the history
The Directory#makeLock API is trappy and can easily lead to unexpected
lock release if native locks are used. see LUCENE-6507 for details.
This commit consolidates the lock lock into one place and only returns
the lock instance if we actually acquired it.
  • Loading branch information
s1monw committed May 28, 2015
1 parent 91e9caa commit 6419102
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 29 deletions.
32 changes: 24 additions & 8 deletions src/main/java/org/elasticsearch/common/lucene/Lucene.java
Expand Up @@ -173,6 +173,28 @@ private static SegmentInfos readSegmentInfos(String segmentsFileName, Directory
return SegmentInfos.readCommit(directory, segmentsFileName);
}

/**
* Tries to acquire the {@link IndexWriter#WRITE_LOCK_NAME} on the given directory. The returned lock must be closed once
* the lock is released. If the lock can't be obtained a {@link LockObtainFailedException} is thrown.
* This method uses the {@link IndexWriterConfig#getDefaultWriteLockTimeout()} as the lock timeout.
*/
public static Lock acquireWriteLock(Directory directory) throws IOException {
return acuquireLock(directory, IndexWriter.WRITE_LOCK_NAME, IndexWriterConfig.getDefaultWriteLockTimeout());
}

/**
* Tries to acquire a lock on the given directory. The returned lock must be closed once
* the lock is released. If the lock can't be obtained a {@link LockObtainFailedException} is thrown.
*/
@SuppressForbidden(reason = "this method uses trappy Directory#makeLock API")
public static Lock acuquireLock(Directory directory, String lockName, long timeout) throws IOException {
final Lock writeLock = directory.makeLock(lockName);
if (writeLock.obtain(timeout) == false) {
throw new LockObtainFailedException("failed to obtain lock: " + writeLock);
}
return writeLock;
}

/**
* This method removes all files from the given directory that are not referenced by the given segments file.
* This method will open an IndexWriter and relies on index file deleter to remove all unreferenced files. Segment files
Expand All @@ -184,10 +206,7 @@ private static SegmentInfos readSegmentInfos(String segmentsFileName, Directory
*/
public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Directory directory) throws IOException {
final SegmentInfos si = readSegmentInfos(segmentsFileName, directory);
try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock
throw new LockObtainFailedException("Index locked for write: " + writeLock);
}
try (Lock writeLock = acquireWriteLock(directory)) {
int foundSegmentFiles = 0;
for (final String file : directory.listAll()) {
/**
Expand Down Expand Up @@ -226,10 +245,7 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc
* this operation fails.
*/
public static void cleanLuceneIndex(Directory directory) throws IOException {
try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock
throw new LockObtainFailedException("Index locked for write: " + writeLock);
}
try (Lock writeLock = acquireWriteLock(directory)) {
for (final String file : directory.listAll()) {
if (file.startsWith(IndexFileNames.SEGMENTS) || file.equals(IndexFileNames.OLD_SEGMENTS_GEN)) {
directory.deleteFile(file); // remove all segment_N files
Expand Down
Expand Up @@ -25,13 +25,15 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
Expand Down Expand Up @@ -84,13 +86,12 @@ public void upgrade(ShardId shard, ShardPath targetPath) throws IOException {
ShardStateMetaData.FORMAT.write(loaded, loaded.version, targetPath.getShardStatePath());
Files.createDirectories(targetPath.resolveIndex());
try (SimpleFSDirectory directory = new SimpleFSDirectory(targetPath.resolveIndex())) {
try (final Lock lock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
if (lock.obtain(5000)) {
upgradeFiles(shard, targetPath, targetPath.resolveIndex(), ShardPath.INDEX_FOLDER_NAME, paths);
} else {
throw new IllegalStateException("Can't obtain lock on " + targetPath.resolveIndex());
}
try (final Lock lock = Lucene.acquireWriteLock(directory)) {
upgradeFiles(shard, targetPath, targetPath.resolveIndex(), ShardPath.INDEX_FOLDER_NAME, paths);
} catch (LockObtainFailedException ex) {
throw new IllegalStateException("Can't obtain lock on " + targetPath.resolveIndex(), ex);
}

}


Expand Down
15 changes: 8 additions & 7 deletions src/main/java/org/elasticsearch/env/NodeEnvironment.java
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -146,18 +147,17 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce

try (Directory luceneDir = FSDirectory.open(dir, NativeFSLockFactory.INSTANCE)) {
logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath());
Lock tmpLock = luceneDir.makeLock(NODE_LOCK_FILENAME);
boolean obtained = tmpLock.obtain();
if (obtained) {
try {
locks[dirIndex] = Lucene.acuquireLock(luceneDir, NODE_LOCK_FILENAME, 0);
nodePaths[dirIndex] = new NodePath(dir, environment);
locks[dirIndex] = tmpLock;
localNodeId = possibleLockId;
} else {
} catch (LockObtainFailedException ex) {
logger.trace("failed to obtain node lock on {}", dir.toAbsolutePath());
// release all the ones that were obtained up until now
releaseAndNullLocks(locks);
break;
}

} catch (IOException e) {
logger.trace("failed to obtain node lock on {}", e, dir.toAbsolutePath());
lastException = new IOException("failed to obtain lock on " + dir.toAbsolutePath(), e);
Expand Down Expand Up @@ -314,8 +314,9 @@ public static void acquireFSLockForPaths(@IndexSettings Settings indexSettings,
// open a directory (will be immediately closed) on the shard's location
dirs[i] = new SimpleFSDirectory(p, FsDirectoryService.buildLockFactory(indexSettings));
// create a lock for the "write.lock" file
locks[i] = dirs[i].makeLock(IndexWriter.WRITE_LOCK_NAME);
if (locks[i].obtain() == false) {
try {
locks[i] = Lucene.acquireWriteLock(dirs[i]);
} catch (IOException ex) {
throw new ElasticsearchException("unable to acquire " +
IndexWriter.WRITE_LOCK_NAME + " for " + p);
}
Expand Down
10 changes: 2 additions & 8 deletions src/main/java/org/elasticsearch/index/store/Store.java
Expand Up @@ -259,10 +259,7 @@ public int compare(Map.Entry<String, String> o1, Map.Entry<String, String> o2) {
metadataLock.writeLock().lock();
// we make sure that nobody fetches the metadata while we do this rename operation here to ensure we don't
// get exceptions if files are still open.
try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock
throw new LockObtainFailedException("Index locked for write: " + writeLock);
}
try (Lock writeLock = Lucene.acquireWriteLock(directory())) {
for (Map.Entry<String, String> entry : entries) {
String tempFile = entry.getKey();
String origFile = entry.getValue();
Expand Down Expand Up @@ -586,10 +583,7 @@ private static final void failIfCorrupted(Directory directory, ShardId shardId)
*/
public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) throws IOException {
metadataLock.writeLock().lock();
try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock
throw new LockObtainFailedException("Index locked for write: " + writeLock);
}
try (Lock writeLock = Lucene.acquireWriteLock(directory)) {
final StoreDirectory dir = directory;
for (String existingFile : dir.listAll()) {
if (existingFile.equals(IndexWriter.WRITE_LOCK_NAME) || Store.isChecksum(existingFile) || sourceMetaData.contains(existingFile)) {
Expand Down

0 comments on commit 6419102

Please sign in to comment.