Skip to content

Commit

Permalink
Allow to stripe the data location over multiple locations, closes #1356.
Browse files Browse the repository at this point in the history
  • Loading branch information
kimchy committed Sep 22, 2011
1 parent c1ca21f commit 8d7aaa7
Show file tree
Hide file tree
Showing 22 changed files with 325 additions and 177 deletions.
Expand Up @@ -120,6 +120,14 @@ public static boolean hasExtensions(File root, String... extensions) {
return false;
}

public static boolean deleteRecursively(File[] roots) {
boolean deleted = true;
for (File root : roots) {
deleted &= deleteRecursively(root);
}
return deleted;
}

public static boolean deleteRecursively(File root) {
return deleteRecursively(root, true);
}
Expand Down
Expand Up @@ -492,6 +492,7 @@ public Builder put(String setting, long value, ByteSizeUnit sizeUnit) {
* @return The builder
*/
public Builder putArray(String setting, String... values) {
remove(setting);
int counter = 0;
while (true) {
String value = map.remove(setting + '.' + (counter++));
Expand Down
Expand Up @@ -46,9 +46,9 @@ public class Environment {

private final File workWithClusterFile;

private final File dataFile;
private final File[] dataFiles;

private final File dataWithClusterFile;
private final File[] dataWithClusterFiles;

private final File configFile;

Expand Down Expand Up @@ -86,12 +86,18 @@ public Environment(Settings settings) {
}
workWithClusterFile = new File(workFile, ClusterName.clusterNameFromSettings(settings).value());

if (settings.get("path.data") != null) {
dataFile = new File(cleanPath(settings.get("path.data")));
String[] dataPaths = settings.getAsArray("path.data");
if (dataPaths.length > 0) {
dataFiles = new File[dataPaths.length];
dataWithClusterFiles = new File[dataPaths.length];
for (int i = 0; i < dataPaths.length; i++) {
dataFiles[i] = new File(dataPaths[i]);
dataWithClusterFiles[i] = new File(dataFiles[i], ClusterName.clusterNameFromSettings(settings).value());
}
} else {
dataFile = new File(homeFile, "data");
dataFiles = new File[]{new File(homeFile, "data")};
dataWithClusterFiles = new File[]{new File(new File(homeFile, "data"), ClusterName.clusterNameFromSettings(settings).value())};
}
dataWithClusterFile = new File(dataFile, ClusterName.clusterNameFromSettings(settings).value());

if (settings.get("path.logs") != null) {
logsFile = new File(cleanPath(settings.get("path.logs")));
Expand Down Expand Up @@ -124,15 +130,15 @@ public File workWithClusterFile() {
/**
* The data location.
*/
public File dataFile() {
return dataFile;
public File[] dataFiles() {
return dataFiles;
}

/**
* The data location with the cluster name as a sub directory.
*/
public File dataWithClusterFile() {
return dataWithClusterFile;
public File[] dataWithClusterFiles() {
return dataWithClusterFiles;
}

/**
Expand Down
Expand Up @@ -38,56 +38,94 @@
*/
public class NodeEnvironment extends AbstractComponent {

private final File nodeFile;
private final File[] nodeFiles;
private final File[] nodeIndicesLocations;

private final Lock lock;
private final Lock[] locks;

private final int localNodeId;

@Inject public NodeEnvironment(Settings settings, Environment environment) throws IOException {
super(settings);

if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
nodeFile = null;
lock = null;
nodeFiles = null;
nodeIndicesLocations = null;
locks = null;
localNodeId = -1;
return;
}

Lock lock = null;
File dir = null;
File[] nodesFiles = new File[environment.dataWithClusterFiles().length];
Lock[] locks = new Lock[environment.dataWithClusterFiles().length];
int localNodeId = -1;
IOException lastException = null;
for (int i = 0; i < 50; i++) {
dir = new File(new File(environment.dataWithClusterFile(), "nodes"), Integer.toString(i));
if (!dir.exists()) {
FileSystemUtils.mkdirs(dir);
}
logger.trace("obtaining node lock on {} ...", dir.getAbsolutePath());
try {
NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir);
Lock tmpLock = lockFactory.makeLock("node.lock");
boolean obtained = tmpLock.obtain();
if (obtained) {
lock = tmpLock;
localNodeId = i;
for (int possibleLockId = 0; possibleLockId < 50; possibleLockId++) {
for (int dirIndex = 0; dirIndex < environment.dataWithClusterFiles().length; dirIndex++) {
File dir = new File(new File(environment.dataWithClusterFiles()[dirIndex], "nodes"), Integer.toString(possibleLockId));
if (!dir.exists()) {
FileSystemUtils.mkdirs(dir);
}
logger.trace("obtaining node lock on {} ...", dir.getAbsolutePath());
try {
NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir);
Lock tmpLock = lockFactory.makeLock("node.lock");
boolean obtained = tmpLock.obtain();
if (obtained) {
locks[dirIndex] = tmpLock;
nodesFiles[dirIndex] = dir;
localNodeId = possibleLockId;
} else {
logger.trace("failed to obtain node lock on {}", dir.getAbsolutePath());
// release all the ones that were obtained up until now
for (int i = 0; i < locks.length; i++) {
if (locks[i] != null) {
try {
locks[i].release();
} catch (Exception e1) {
// ignore
}
}
locks[i] = null;
}
break;
}
} catch (IOException e) {
logger.trace("failed to obtain node lock on {}", e, dir.getAbsolutePath());
lastException = e;
// release all the ones that were obtained up until now
for (int i = 0; i < locks.length; i++) {
if (locks[i] != null) {
try {
locks[i].release();
} catch (Exception e1) {
// ignore
}
}
locks[i] = null;
}
break;
} else {
logger.trace("failed to obtain node lock on {}", dir.getAbsolutePath());
}
} catch (IOException e) {
logger.trace("failed to obtain node lock on {}", e, dir.getAbsolutePath());
lastException = e;
}
if (locks[0] != null) {
// we found a lock, break
break;
}
}
if (lock == null) {
if (locks[0] == null) {
throw new IOException("Failed to obtain node lock", lastException);
}

this.localNodeId = localNodeId;
this.lock = lock;
this.nodeFile = dir;
this.locks = locks;
this.nodeFiles = nodesFiles;
if (logger.isDebugEnabled()) {
logger.debug("using node location [{}], local_node_id [{}]", dir, localNodeId);
logger.debug("using node location [{}], local_node_id [{}]", nodesFiles, localNodeId);
}

this.nodeIndicesLocations = new File[nodeFiles.length];
for (int i = 0; i < nodeFiles.length; i++) {
nodeIndicesLocations[i] = new File(nodeFiles[i], "indices");
}
}

Expand All @@ -96,34 +134,44 @@ public int localNodeId() {
}

public boolean hasNodeFile() {
return nodeFile != null && lock != null;
return nodeFiles != null && locks != null;
}

public File nodeDataLocation() {
if (nodeFile == null || lock == null) {
public File[] nodeDataLocations() {
if (nodeFiles == null || locks == null) {
throw new ElasticSearchIllegalStateException("node is not configured to store local location");
}
return nodeFile;
return nodeFiles;
}

public File indicesLocation() {
return new File(nodeDataLocation(), "indices");
public File[] indicesLocations() {
return nodeIndicesLocations;
}

public File indexLocation(Index index) {
return new File(indicesLocation(), index.name());
public File[] indexLocations(Index index) {
File[] indexLocations = new File[nodeFiles.length];
for (int i = 0; i < nodeFiles.length; i++) {
indexLocations[i] = new File(new File(nodeFiles[i], "indices"), index.name());
}
return indexLocations;
}

public File shardLocation(ShardId shardId) {
return new File(indexLocation(shardId.index()), Integer.toString(shardId.id()));
public File[] shardLocations(ShardId shardId) {
File[] shardLocations = new File[nodeFiles.length];
for (int i = 0; i < nodeFiles.length; i++) {
shardLocations[i] = new File(new File(new File(nodeFiles[i], "indices"), shardId.index().name()), Integer.toString(shardId.id()));
}
return shardLocations;
}

public void close() {
if (lock != null) {
try {
lock.release();
} catch (IOException e) {
// ignore
if (locks != null) {
for (Lock lock : locks) {
try {
lock.release();
} catch (IOException e) {
// ignore
}
}
}
}
Expand Down
Expand Up @@ -53,7 +53,7 @@ public class FsGateway extends BlobStoreGateway {
String location = componentSettings.get("location");
if (location == null) {
logger.warn("using local fs location for gateway, should be changed to be a shared location across nodes");
gatewayFile = new File(environment.dataFile(), "gateway");
gatewayFile = new File(environment.dataFiles()[0], "gateway");
} else {
gatewayFile = new File(location);
}
Expand Down
Expand Up @@ -180,7 +180,7 @@ public LocalGatewayStartedShards currentStartedShards() {
}

@Override public void reset() throws Exception {
FileSystemUtils.deleteRecursively(nodeEnv.nodeDataLocation());
FileSystemUtils.deleteRecursively(nodeEnv.nodeDataLocations());
}

@Override public void clusterChanged(final ClusterChangedEvent event) {
Expand Down Expand Up @@ -263,7 +263,8 @@ private synchronized void lazyInitialize() {
location = null;
} else {
// create the location where the state will be stored
this.location = new File(nodeEnv.nodeDataLocation(), "_state");
// TODO: we might want to persist states on all data locations
this.location = new File(nodeEnv.nodeDataLocations()[0], "_state");
FileSystemUtils.mkdirs(this.location);

if (clusterService.localNode().masterNode()) {
Expand Down
Expand Up @@ -128,19 +128,30 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen

// move an existing translog, if exists, to "recovering" state, and start reading from it
FsTranslog translog = (FsTranslog) indexShard.translog();
File recoveringTranslogFile = new File(translog.location(), "translog-" + translogId + ".recovering");
if (!recoveringTranslogFile.exists()) {
File translogFile = new File(translog.location(), "translog-" + translogId);
if (translogFile.exists()) {
for (int i = 0; i < 3; i++) {
if (translogFile.renameTo(recoveringTranslogFile)) {
break;
String translogName = "translog-" + translogId;
String recoverTranslogName = translogName + ".recovering";


File recoveringTranslogFile = null;
for (File translogLocation : translog.locations()) {
File tmpRecoveringFile = new File(translogLocation, recoverTranslogName);
if (!tmpRecoveringFile.exists()) {
File tmpTranslogFile = new File(translogLocation, translogName);
if (tmpTranslogFile.exists()) {
for (int i = 0; i < 3; i++) {
if (tmpTranslogFile.renameTo(tmpRecoveringFile)) {
recoveringTranslogFile = tmpRecoveringFile;
break;
}
}
}
} else {
recoveringTranslogFile = tmpRecoveringFile;
break;
}
}

if (!recoveringTranslogFile.exists()) {
if (recoveringTranslogFile == null || !recoveringTranslogFile.exists()) {
// no translog to recovery from, start and bail
// no translog files, bail
indexShard.start("post recovery from gateway, no translog");
Expand Down
Expand Up @@ -436,7 +436,7 @@ private void deleteShard(int shardId, boolean delete, boolean snapshotGateway, b

// delete the shard location if needed
if (delete || indexGateway.type().equals(NoneGateway.TYPE)) {
FileSystemUtils.deleteRecursively(nodeEnv.shardLocation(sId));
FileSystemUtils.deleteRecursively(nodeEnv.shardLocations(sId));
}
}
}

0 comments on commit 8d7aaa7

Please sign in to comment.