diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java index a5efd0e911386..d8cad05f09d2f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/FileSystemUtils.java @@ -120,6 +120,14 @@ public static boolean hasExtensions(File root, String... extensions) { return false; } + public static boolean deleteRecursively(File[] roots) { + boolean deleted = true; + for (File root : roots) { + deleted &= deleteRecursively(root); + } + return deleted; + } + public static boolean deleteRecursively(File root) { return deleteRecursively(root, true); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/settings/ImmutableSettings.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/settings/ImmutableSettings.java index d259c5dad61a7..e917cf8437b63 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/settings/ImmutableSettings.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/settings/ImmutableSettings.java @@ -492,6 +492,7 @@ public Builder put(String setting, long value, ByteSizeUnit sizeUnit) { * @return The builder */ public Builder putArray(String setting, String... values) { + remove(setting); int counter = 0; while (true) { String value = map.remove(setting + '.' + (counter++)); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/env/Environment.java b/modules/elasticsearch/src/main/java/org/elasticsearch/env/Environment.java index 953d3de29be09..ae1826bc12cb8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/env/Environment.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/env/Environment.java @@ -46,9 +46,9 @@ public class Environment { private final File workWithClusterFile; - private final File dataFile; + private final File[] dataFiles; - private final File dataWithClusterFile; + private final File[] dataWithClusterFiles; private final File configFile; @@ -86,12 +86,18 @@ public Environment(Settings settings) { } workWithClusterFile = new File(workFile, ClusterName.clusterNameFromSettings(settings).value()); - if (settings.get("path.data") != null) { - dataFile = new File(cleanPath(settings.get("path.data"))); + String[] dataPaths = settings.getAsArray("path.data"); + if (dataPaths.length > 0) { + dataFiles = new File[dataPaths.length]; + dataWithClusterFiles = new File[dataPaths.length]; + for (int i = 0; i < dataPaths.length; i++) { + dataFiles[i] = new File(dataPaths[i]); + dataWithClusterFiles[i] = new File(dataFiles[i], ClusterName.clusterNameFromSettings(settings).value()); + } } else { - dataFile = new File(homeFile, "data"); + dataFiles = new File[]{new File(homeFile, "data")}; + dataWithClusterFiles = new File[]{new File(new File(homeFile, "data"), ClusterName.clusterNameFromSettings(settings).value())}; } - dataWithClusterFile = new File(dataFile, ClusterName.clusterNameFromSettings(settings).value()); if (settings.get("path.logs") != null) { logsFile = new File(cleanPath(settings.get("path.logs"))); @@ -124,15 +130,15 @@ public File workWithClusterFile() { /** * The data location. */ - public File dataFile() { - return dataFile; + public File[] dataFiles() { + return dataFiles; } /** * The data location with the cluster name as a sub directory. */ - public File dataWithClusterFile() { - return dataWithClusterFile; + public File[] dataWithClusterFiles() { + return dataWithClusterFiles; } /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java index f7bf85f32c59e..3a4c273f67385 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -38,9 +38,10 @@ */ public class NodeEnvironment extends AbstractComponent { - private final File nodeFile; + private final File[] nodeFiles; + private final File[] nodeIndicesLocations; - private final Lock lock; + private final Lock[] locks; private final int localNodeId; @@ -48,46 +49,83 @@ public class NodeEnvironment extends AbstractComponent { super(settings); if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) { - nodeFile = null; - lock = null; + nodeFiles = null; + nodeIndicesLocations = null; + locks = null; localNodeId = -1; return; } - Lock lock = null; - File dir = null; + File[] nodesFiles = new File[environment.dataWithClusterFiles().length]; + Lock[] locks = new Lock[environment.dataWithClusterFiles().length]; int localNodeId = -1; IOException lastException = null; - for (int i = 0; i < 50; i++) { - dir = new File(new File(environment.dataWithClusterFile(), "nodes"), Integer.toString(i)); - if (!dir.exists()) { - FileSystemUtils.mkdirs(dir); - } - logger.trace("obtaining node lock on {} ...", dir.getAbsolutePath()); - try { - NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir); - Lock tmpLock = lockFactory.makeLock("node.lock"); - boolean obtained = tmpLock.obtain(); - if (obtained) { - lock = tmpLock; - localNodeId = i; + for (int possibleLockId = 0; possibleLockId < 50; possibleLockId++) { + for (int dirIndex = 0; dirIndex < environment.dataWithClusterFiles().length; dirIndex++) { + File dir = new File(new File(environment.dataWithClusterFiles()[dirIndex], "nodes"), Integer.toString(possibleLockId)); + if (!dir.exists()) { + FileSystemUtils.mkdirs(dir); + } + logger.trace("obtaining node lock on {} ...", dir.getAbsolutePath()); + try { + NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir); + Lock tmpLock = lockFactory.makeLock("node.lock"); + boolean obtained = tmpLock.obtain(); + if (obtained) { + locks[dirIndex] = tmpLock; + nodesFiles[dirIndex] = dir; + localNodeId = possibleLockId; + } else { + logger.trace("failed to obtain node lock on {}", dir.getAbsolutePath()); + // release all the ones that were obtained up until now + for (int i = 0; i < locks.length; i++) { + if (locks[i] != null) { + try { + locks[i].release(); + } catch (Exception e1) { + // ignore + } + } + locks[i] = null; + } + break; + } + } catch (IOException e) { + logger.trace("failed to obtain node lock on {}", e, dir.getAbsolutePath()); + lastException = e; + // release all the ones that were obtained up until now + for (int i = 0; i < locks.length; i++) { + if (locks[i] != null) { + try { + locks[i].release(); + } catch (Exception e1) { + // ignore + } + } + locks[i] = null; + } break; - } else { - logger.trace("failed to obtain node lock on {}", dir.getAbsolutePath()); } - } catch (IOException e) { - logger.trace("failed to obtain node lock on {}", e, dir.getAbsolutePath()); - lastException = e; + } + if (locks[0] != null) { + // we found a lock, break + break; } } - if (lock == null) { + if (locks[0] == null) { throw new IOException("Failed to obtain node lock", lastException); } + this.localNodeId = localNodeId; - this.lock = lock; - this.nodeFile = dir; + this.locks = locks; + this.nodeFiles = nodesFiles; if (logger.isDebugEnabled()) { - logger.debug("using node location [{}], local_node_id [{}]", dir, localNodeId); + logger.debug("using node location [{}], local_node_id [{}]", nodesFiles, localNodeId); + } + + this.nodeIndicesLocations = new File[nodeFiles.length]; + for (int i = 0; i < nodeFiles.length; i++) { + nodeIndicesLocations[i] = new File(nodeFiles[i], "indices"); } } @@ -96,34 +134,44 @@ public int localNodeId() { } public boolean hasNodeFile() { - return nodeFile != null && lock != null; + return nodeFiles != null && locks != null; } - public File nodeDataLocation() { - if (nodeFile == null || lock == null) { + public File[] nodeDataLocations() { + if (nodeFiles == null || locks == null) { throw new ElasticSearchIllegalStateException("node is not configured to store local location"); } - return nodeFile; + return nodeFiles; } - public File indicesLocation() { - return new File(nodeDataLocation(), "indices"); + public File[] indicesLocations() { + return nodeIndicesLocations; } - public File indexLocation(Index index) { - return new File(indicesLocation(), index.name()); + public File[] indexLocations(Index index) { + File[] indexLocations = new File[nodeFiles.length]; + for (int i = 0; i < nodeFiles.length; i++) { + indexLocations[i] = new File(new File(nodeFiles[i], "indices"), index.name()); + } + return indexLocations; } - public File shardLocation(ShardId shardId) { - return new File(indexLocation(shardId.index()), Integer.toString(shardId.id())); + public File[] shardLocations(ShardId shardId) { + File[] shardLocations = new File[nodeFiles.length]; + for (int i = 0; i < nodeFiles.length; i++) { + shardLocations[i] = new File(new File(new File(nodeFiles[i], "indices"), shardId.index().name()), Integer.toString(shardId.id())); + } + return shardLocations; } public void close() { - if (lock != null) { - try { - lock.release(); - } catch (IOException e) { - // ignore + if (locks != null) { + for (Lock lock : locks) { + try { + lock.release(); + } catch (IOException e) { + // ignore + } } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java index e4ff0e3432c4b..102b0888bceda 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java @@ -53,7 +53,7 @@ public class FsGateway extends BlobStoreGateway { String location = componentSettings.get("location"); if (location == null) { logger.warn("using local fs location for gateway, should be changed to be a shared location across nodes"); - gatewayFile = new File(environment.dataFile(), "gateway"); + gatewayFile = new File(environment.dataFiles()[0], "gateway"); } else { gatewayFile = new File(location); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index 758d9d77266c0..1a96c94949517 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -180,7 +180,7 @@ public LocalGatewayStartedShards currentStartedShards() { } @Override public void reset() throws Exception { - FileSystemUtils.deleteRecursively(nodeEnv.nodeDataLocation()); + FileSystemUtils.deleteRecursively(nodeEnv.nodeDataLocations()); } @Override public void clusterChanged(final ClusterChangedEvent event) { @@ -263,7 +263,8 @@ private synchronized void lazyInitialize() { location = null; } else { // create the location where the state will be stored - this.location = new File(nodeEnv.nodeDataLocation(), "_state"); + // TODO: we might want to persist states on all data locations + this.location = new File(nodeEnv.nodeDataLocations()[0], "_state"); FileSystemUtils.mkdirs(this.location); if (clusterService.localNode().masterNode()) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index f008039b28fb0..6e42255ff6e31 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -128,19 +128,30 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen // move an existing translog, if exists, to "recovering" state, and start reading from it FsTranslog translog = (FsTranslog) indexShard.translog(); - File recoveringTranslogFile = new File(translog.location(), "translog-" + translogId + ".recovering"); - if (!recoveringTranslogFile.exists()) { - File translogFile = new File(translog.location(), "translog-" + translogId); - if (translogFile.exists()) { - for (int i = 0; i < 3; i++) { - if (translogFile.renameTo(recoveringTranslogFile)) { - break; + String translogName = "translog-" + translogId; + String recoverTranslogName = translogName + ".recovering"; + + + File recoveringTranslogFile = null; + for (File translogLocation : translog.locations()) { + File tmpRecoveringFile = new File(translogLocation, recoverTranslogName); + if (!tmpRecoveringFile.exists()) { + File tmpTranslogFile = new File(translogLocation, translogName); + if (tmpTranslogFile.exists()) { + for (int i = 0; i < 3; i++) { + if (tmpTranslogFile.renameTo(tmpRecoveringFile)) { + recoveringTranslogFile = tmpRecoveringFile; + break; + } } } + } else { + recoveringTranslogFile = tmpRecoveringFile; + break; } } - if (!recoveringTranslogFile.exists()) { + if (recoveringTranslogFile == null || !recoveringTranslogFile.exists()) { // no translog to recovery from, start and bail // no translog files, bail indexShard.start("post recovery from gateway, no translog"); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index ce3882a50209c..12cf19c415cdb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -436,7 +436,7 @@ private void deleteShard(int shardId, boolean delete, boolean snapshotGateway, b // delete the shard location if needed if (delete || indexGateway.type().equals(NoneGateway.TYPE)) { - FileSystemUtils.deleteRecursively(nodeEnv.shardLocation(sId)); + FileSystemUtils.deleteRecursively(nodeEnv.shardLocations(sId)); } } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java index 9bbd67e61a9a2..d890a03b36516 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/Store.java @@ -35,11 +35,13 @@ import org.elasticsearch.common.lucene.Directories; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.support.ForceSyncDirectory; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -161,7 +163,22 @@ public void renameFile(String from, String to) throws IOException { } } - public static Map readChecksums(Directory dir) throws IOException { + public static Map readChecksums(File[] locations) throws IOException { + for (File location : locations) { + FSDirectory directory = FSDirectory.open(location); + try { + Map checksums = readChecksums(directory, null); + if (checksums != null) { + return checksums; + } + } finally { + directory.close(); + } + } + return null; + } + + static Map readChecksums(Directory dir, Map defaultValue) throws IOException { long lastFound = -1; for (String name : dir.listAll()) { if (!isChecksum(name)) { @@ -173,7 +190,7 @@ public static Map readChecksums(Directory dir) throws IOExceptio } } if (lastFound == -1) { - return ImmutableMap.of(); + return defaultValue; } IndexInput indexInput = dir.openInput(CHECKSUMS_PREFIX + lastFound); try { @@ -181,7 +198,7 @@ public static Map readChecksums(Directory dir) throws IOExceptio return indexInput.readStringStringMap(); } catch (Exception e) { // failed to load checksums, ignore and return an empty map - return new HashMap(); + return defaultValue; } finally { indexInput.close(); } @@ -265,7 +282,7 @@ protected class StoreDirectory extends Directory implements ForceSyncDirectory { this.delegates = delegates; synchronized (mutex) { MapBuilder builder = MapBuilder.newMapBuilder(); - Map checksums = readChecksums(delegates[0]); + Map checksums = readChecksums(delegates[0], new HashMap()); for (Directory delegate : delegates) { for (String file : delegate.listAll()) { // BACKWARD CKS SUPPORT @@ -398,6 +415,8 @@ public IndexOutput createOutput(String name, boolean computeChecksum) throws IOE if (currentSize < size) { size = currentSize; directory = delegate; + } else if (currentSize == size && ThreadLocalRandom.current().nextBoolean()) { + directory = delegate; } } else { directory = delegate; // really, make sense to have multiple directories for FS diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java index 46135e702827f..c184135572a4a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java @@ -40,15 +40,15 @@ public abstract class FsIndexStore extends AbstractIndexStore { private final NodeEnvironment nodeEnv; - private final File location; + private final File[] locations; public FsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) { super(index, indexSettings, indexService); this.nodeEnv = nodeEnv; if (nodeEnv.hasNodeFile()) { - this.location = nodeEnv.indexLocation(index); + this.locations = nodeEnv.indexLocations(index); } else { - this.location = null; + this.locations = null; } } @@ -57,58 +57,73 @@ public FsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexSer } @Override public ByteSizeValue backingStoreTotalSpace() { - if (location == null) { + if (locations == null) { return new ByteSizeValue(0); } - long totalSpace = location.getTotalSpace(); - if (totalSpace == 0) { - totalSpace = 0; + long totalSpace = 0; + for (File location : locations) { + totalSpace += location.getTotalSpace(); } return new ByteSizeValue(totalSpace); } @Override public ByteSizeValue backingStoreFreeSpace() { - if (location == null) { + if (locations == null) { return new ByteSizeValue(0); } - long usableSpace = location.getUsableSpace(); - if (usableSpace == 0) { - usableSpace = 0; + long usableSpace = 0; + for (File location : locations) { + usableSpace += location.getUsableSpace(); } return new ByteSizeValue(usableSpace); } @Override public boolean canDeleteUnallocated(ShardId shardId) { - if (location == null) { + if (locations == null) { return false; } if (indexService.hasShard(shardId.id())) { return false; } - return shardLocation(shardId).exists(); + for (File location : shardLocations(shardId)) { + if (location.exists()) { + return true; + } + } + return false; } @Override public void deleteUnallocated(ShardId shardId) throws IOException { - if (location == null) { + if (locations == null) { return; } if (indexService.hasShard(shardId.id())) { throw new ElasticSearchIllegalStateException(shardId + " allocated, can't be deleted"); } - FileSystemUtils.deleteRecursively(shardLocation(shardId)); + FileSystemUtils.deleteRecursively(shardLocations(shardId)); } - public File shardLocation(ShardId shardId) { - return nodeEnv.shardLocation(shardId); + public File[] shardLocations(ShardId shardId) { + return nodeEnv.shardLocations(shardId); } - public File shardIndexLocation(ShardId shardId) { - return new File(shardLocation(shardId), "index"); + public File[] shardIndexLocations(ShardId shardId) { + File[] shardLocations = shardLocations(shardId); + File[] shardIndexLocations = new File[shardLocations.length]; + for (int i = 0; i < shardLocations.length; i++) { + shardIndexLocations[i] = new File(shardLocations[i], "index"); + } + return shardIndexLocations; } // not used currently, but here to state that this store also defined a file based translog location - public File shardTranslogLocation(ShardId shardId) { - return new File(shardLocation(shardId), "translog"); + public File[] shardTranslogLocations(ShardId shardId) { + File[] shardLocations = shardLocations(shardId); + File[] shardTranslogLocations = new File[shardLocations.length]; + for (int i = 0; i < shardLocations.length; i++) { + shardTranslogLocations[i] = new File(shardLocations[i], "translog"); + } + return shardTranslogLocations; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java index c54066fc16a06..7a12ca5754ecd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java @@ -40,8 +40,12 @@ public class MmapFsDirectoryService extends FsDirectoryService { } @Override public Directory[] build() throws IOException { - File location = indexStore.shardIndexLocation(shardId); - FileSystemUtils.mkdirs(location); - return new Directory[]{new MMapDirectory(location, buildLockFactory())}; + File[] locations = indexStore.shardIndexLocations(shardId); + Directory[] dirs = new Directory[locations.length]; + for (int i = 0; i < dirs.length; i++) { + FileSystemUtils.mkdirs(locations[i]); + dirs[i] = new MMapDirectory(locations[i], buildLockFactory()); + } + return dirs; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java index e020b9a349e8a..5f1efb09c223c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java @@ -40,8 +40,12 @@ public class NioFsDirectoryService extends FsDirectoryService { } @Override public Directory[] build() throws IOException { - File location = indexStore.shardIndexLocation(shardId); - FileSystemUtils.mkdirs(location); - return new Directory[]{new NIOFSDirectory(location, buildLockFactory())}; + File[] locations = indexStore.shardIndexLocations(shardId); + Directory[] dirs = new Directory[locations.length]; + for (int i = 0; i < dirs.length; i++) { + FileSystemUtils.mkdirs(locations[i]); + dirs[i] = new NIOFSDirectory(locations[i], buildLockFactory()); + } + return dirs; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java index dee62bf263714..16514438bc4b0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java @@ -40,8 +40,12 @@ public class SimpleFsDirectoryService extends FsDirectoryService { } @Override public Directory[] build() throws IOException { - File location = indexStore.shardIndexLocation(shardId); - FileSystemUtils.mkdirs(location); - return new Directory[]{new SimpleFSDirectory(location, buildLockFactory())}; + File[] locations = indexStore.shardIndexLocations(shardId); + Directory[] dirs = new Directory[locations.length]; + for (int i = 0; i < dirs.length; i++) { + FileSystemUtils.mkdirs(locations[i]); + dirs[i] = new SimpleFSDirectory(locations[i], buildLockFactory()); + } + return dirs; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 324713782fdd8..5489ab6824f6a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.CachedStreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; @@ -44,7 +45,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog { private final ReadWriteLock rwl = new ReentrantReadWriteLock(); - private final File location; + private final File[] locations; private volatile FsTranslogFile current; private volatile FsTranslogFile trans; @@ -53,18 +54,22 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog @Inject public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) { super(shardId, indexSettings); - this.location = new File(nodeEnv.shardLocation(shardId), "translog"); - FileSystemUtils.mkdirs(this.location); + File[] shardLocations = nodeEnv.shardLocations(shardId); + this.locations = new File[shardLocations.length]; + for (int i = 0; i < shardLocations.length; i++) { + locations[i] = new File(shardLocations[i], "translog"); + FileSystemUtils.mkdirs(locations[i]); + } } public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, File location) { super(shardId, indexSettings); - this.location = location; - FileSystemUtils.mkdirs(this.location); + this.locations = new File[]{location}; + FileSystemUtils.mkdirs(location); } - public File location() { - return location; + public File[] locations() { + return locations; } @Override public long currentId() { @@ -98,19 +103,21 @@ public File location() { @Override public void clearUnreferenced() { rwl.writeLock().lock(); try { - File[] files = location.listFiles(); - if (files != null) { - for (File file : files) { - if (file.getName().equals("translog-" + current.id())) { - continue; - } - if (trans != null && file.getName().equals("translog-" + trans.id())) { - continue; - } - try { - file.delete(); - } catch (Exception e) { - // ignore + for (File location : locations) { + File[] files = location.listFiles(); + if (files != null) { + for (File file : files) { + if (file.getName().equals("translog-" + current.id())) { + continue; + } + if (trans != null && file.getName().equals("translog-" + trans.id())) { + continue; + } + try { + file.delete(); + } catch (Exception e) { + // ignore + } } } } @@ -123,6 +130,17 @@ public File location() { rwl.writeLock().lock(); try { FsTranslogFile newFile; + long size = Long.MAX_VALUE; + File location = null; + for (File file : locations) { + long currentFree = file.getFreeSpace(); + if (currentFree < size) { + size = currentFree; + location = file; + } else if (currentFree == size && ThreadLocalRandom.current().nextBoolean()) { + location = file; + } + } try { newFile = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); } catch (IOException e) { @@ -147,6 +165,17 @@ public File location() { rwl.writeLock().lock(); try { assert this.trans == null; + long size = Long.MAX_VALUE; + File location = null; + for (File file : locations) { + long currentFree = file.getFreeSpace(); + if (currentFree < size) { + size = currentFree; + location = file; + } else if (currentFree == size && ThreadLocalRandom.current().nextBoolean()) { + location = file; + } + } this.trans = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id))); } catch (IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index 1ac25b53553e5..44c1a66cd2b57 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -352,7 +352,7 @@ private void deleteIndex(String index, boolean delete, String reason, @Nullable indicesLifecycle.afterIndexClosed(indexService.index(), delete); if (delete) { - FileSystemUtils.deleteRecursively(nodeEnv.indexLocation(new Index(index))); + FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(index))); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index f140a2abed5a0..24dfa3456c9bc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -131,25 +131,28 @@ public void close() { } if (shardCanBeDeleted) { ShardId shardId = indexShardRoutingTable.shardId(); - File shardLocation = nodeEnv.shardLocation(shardId); - if (shardLocation.exists()) { - logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); - FileSystemUtils.deleteRecursively(shardLocation); + for (File shardLocation : nodeEnv.shardLocations(shardId)) { + if (shardLocation.exists()) { + logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); + FileSystemUtils.deleteRecursively(shardLocation); + } } } } } // delete indices that are no longer part of the metadata - File[] files = nodeEnv.indicesLocation().listFiles(); - if (files != null) { - for (File file : files) { - // if we have the index on the metadata, don't delete it - if (event.state().metaData().hasIndex(file.getName())) { - continue; + for (File indicesLocation : nodeEnv.indicesLocations()) { + File[] files = indicesLocation.listFiles(); + if (files != null) { + for (File file : files) { + // if we have the index on the metadata, don't delete it + if (event.state().metaData().hasIndex(file.getName())) { + continue; + } + logger.debug("[{}] deleting index that is no longer in the cluster meta_date from [{}]", file.getName(), file); + FileSystemUtils.deleteRecursively(file); } - logger.debug("[{}] deleting index that is no longer in the cluster meta_date", file.getName()); - FileSystemUtils.deleteRecursively(file); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index a25624cf18414..c36b31536ee19 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -19,7 +19,6 @@ package org.elasticsearch.indices.store; -import org.apache.lucene.store.FSDirectory; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.FailedNodeException; @@ -33,12 +32,10 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Unicode; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -163,17 +160,34 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException if (!storeType.contains("fs")) { return new StoreFilesMetaData(false, shardId, ImmutableMap.of()); } - File indexFile = new File(nodeEnv.shardLocation(shardId), "index"); - if (!indexFile.exists()) { + File[] shardLocations = nodeEnv.shardLocations(shardId); + File[] shardIndexLocations = new File[shardLocations.length]; + for (int i = 0; i < shardLocations.length; i++) { + shardIndexLocations[i] = new File(shardLocations[i], "index"); + } + boolean exists = false; + for (File shardIndexLocation : shardIndexLocations) { + if (shardIndexLocation.exists()) { + exists = true; + break; + } + } + if (!exists) { return new StoreFilesMetaData(false, shardId, ImmutableMap.of()); } + + Map checksums = Store.readChecksums(shardIndexLocations); + if (checksums == null) { + checksums = ImmutableMap.of(); + } + Map files = Maps.newHashMap(); - // read the checksums file - FSDirectory directory = FSDirectory.open(indexFile); - Map checksums = null; - try { - checksums = Store.readChecksums(directory); - for (File file : indexFile.listFiles()) { + for (File shardIndexLocation : shardIndexLocations) { + File[] listedFiles = shardIndexLocation.listFiles(); + if (listedFiles == null) { + continue; + } + for (File file : listedFiles) { // BACKWARD CKS SUPPORT if (file.getName().endsWith(".cks")) { continue; @@ -183,28 +197,6 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException } files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), checksums.get(file.getName()))); } - } finally { - directory.close(); - } - - // BACKWARD CKS SUPPORT - for (File file : indexFile.listFiles()) { - if (file.getName().endsWith(".cks")) { - continue; - } - if (file.getName().startsWith("_checksums")) { - continue; - } - // try and load the checksum - String checksum = null; - File checksumFile = new File(file.getParentFile(), file.getName() + ".cks"); - if (checksumFile.exists() && (checksums == null || !checksums.containsKey(file.getName()))) { - byte[] checksumBytes = Streams.copyToByteArray(checksumFile); - if (checksumBytes.length > 0) { - checksum = Unicode.fromBytes(checksumBytes); - } - files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified(), checksum)); - } } return new StoreFilesMetaData(false, shardId, files); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalSettingsPerparer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalSettingsPerparer.java index 50194a4a91363..735a30db7ee68 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalSettingsPerparer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalSettingsPerparer.java @@ -108,9 +108,11 @@ public static Tuple prepareSettings(Settings pSettings, b settingsBuilder = settingsBuilder().put(v1); settingsBuilder.put("path.home", cleanPath(environment.homeFile().getAbsolutePath())); settingsBuilder.put("path.work", cleanPath(environment.workFile().getAbsolutePath())); - settingsBuilder.put("path.work_with_cluster", cleanPath(environment.workWithClusterFile().getAbsolutePath())); - settingsBuilder.put("path.data", cleanPath(environment.dataFile().getAbsolutePath())); - settingsBuilder.put("path.data_with_cluster", cleanPath(environment.dataWithClusterFile().getAbsolutePath())); + String[] paths = new String[environment.dataFiles().length]; + for (int i = 0; i < environment.dataFiles().length; i++) { + paths[i] = cleanPath(environment.dataFiles()[i].getAbsolutePath()); + } + settingsBuilder.putArray("path.data", paths); settingsBuilder.put("path.logs", cleanPath(environment.logsFile().getAbsolutePath())); v1 = settingsBuilder.build(); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java index 3cccbebe0b0c6..a75ce07e12d04 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java @@ -165,7 +165,7 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests logger.info("Closing the server"); closeNode("server1"); logger.info("Clearing cluster data dir, so there will be a full recovery from the gateway"); - FileSystemUtils.deleteRecursively(environment.dataWithClusterFile()); + FileSystemUtils.deleteRecursively(environment.dataWithClusterFiles()); logger.info("Starting the server, should recover from the gateway (both index and translog) without reusing work dir"); startNode("server1"); @@ -282,7 +282,7 @@ private void testLoad(boolean fullRecovery) { closeNode("server1"); if (fullRecovery) { logger.info("Clearing cluster data dir, so there will be a full recovery from the gateway"); - FileSystemUtils.deleteRecursively(environment.dataWithClusterFile()); + FileSystemUtils.deleteRecursively(environment.dataWithClusterFiles()); logger.info("Starting the server, should recover from the gateway (both index and translog) without reusing work dir"); } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/store/IndicesStoreTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/store/IndicesStoreTests.java index 133ca1d831305..7a46104c97c63 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/store/IndicesStoreTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/store/IndicesStoreTests.java @@ -32,8 +32,7 @@ import java.io.File; import static org.elasticsearch.client.Requests.*; -import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; - +import static org.elasticsearch.common.settings.ImmutableSettings.*; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -118,7 +117,7 @@ public void shardsCleanup() { private File shardDirectory(String server, String index, int shard) { InternalNode node = ((InternalNode) node(server)); NodeEnvironment env = node.injector().getInstance(NodeEnvironment.class); - return env.shardLocation(new ShardId(index, shard)); + return env.shardLocations(new ShardId(index, shard))[0]; } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java index ac8e60afadaa6..41d1b9588fc5f 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/fullrestart/FullRestartStressTest.java @@ -197,10 +197,10 @@ public void run() throws Exception { client.close(); for (Node node : nodes) { - File nodeWork = ((InternalNode) node).injector().getInstance(NodeEnvironment.class).nodeDataLocation(); + File[] nodeDatas = ((InternalNode) node).injector().getInstance(NodeEnvironment.class).nodeDataLocations(); node.close(); if (clearNodeWork && !settings.get("gateway.type").equals("local")) { - FileSystemUtils.deleteRecursively(nodeWork); + FileSystemUtils.deleteRecursively(nodeDatas); } } @@ -221,6 +221,7 @@ public static void main(String[] args) throws Exception { .put("gateway.type", "local") .put("gateway.recover_after_nodes", numberOfNodes) .put("index.number_of_shards", 1) + .put("path.data", "data/data1,data/data2") .build(); FullRestartStressTest test = new FullRestartStressTest() diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java index ed730d00576fe..8e6fb3567a487 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java @@ -167,7 +167,7 @@ public void run() throws Exception { // start doing the rolling restart int nodeIndex = 0; while (true) { - File nodeData = ((InternalNode) nodes[nodeIndex]).injector().getInstance(NodeEnvironment.class).nodeDataLocation(); + File[] nodeData = ((InternalNode) nodes[nodeIndex]).injector().getInstance(NodeEnvironment.class).nodeDataLocations(); nodes[nodeIndex].close(); if (clearNodeData) { FileSystemUtils.deleteRecursively(nodeData); @@ -310,7 +310,7 @@ private class Indexer extends Thread { } private void indexDoc() throws Exception { - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); XContentBuilder json = XContentFactory.jsonBuilder().startObject() .field("field", "value" + ThreadLocalRandom.current().nextInt()); @@ -341,6 +341,7 @@ public static void main(String[] args) throws Exception { Settings settings = settingsBuilder() .put("index.shard.check_index", true) .put("gateway.type", "none") + .put("path.data", "data/data1,data/data2") .build(); RollingRestartStressTest test = new RollingRestartStressTest()