Skip to content

Commit

Permalink
Merge pull request #612 from hsaputra/merge_to_master
Browse files Browse the repository at this point in the history
Merge to master from conf branch for TACHYON-8 work on new Tachyon configuration
  • Loading branch information
haoyuan committed Feb 20, 2015
2 parents 4d4f92a + d6dac60 commit a73823d
Show file tree
Hide file tree
Showing 117 changed files with 2,610 additions and 1,530 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -19,3 +19,4 @@ journal/
logs/
script/
target/
**/tachyon-site.properties
104 changes: 104 additions & 0 deletions core/src/main/java/tachyon/Constants.java
Expand Up @@ -64,4 +64,108 @@ public class Constants {
* Version 3 [0.6.0] Add lastModificationTimeMs to inode.
*/
public static final int JOURNAL_VERSION = 3;

// Configurations properties constants.
// Please check and update Configuration-Settings.md file when you change or add Tachyon
// configuration properties.

// This constant is being used only in Hadoop MR job submissions where client need to pass site
// specific configuration properties. It will be used as key in the MR Configuration.
public static final String TACHYON_CONF_SITE = "tachyon.conf.site";

public static final String DEFAULT_HOME = "/mnt/tachyon_default_home";
public static final String DEFAULT_DATA_FOLDER = "/tachyon/data";
public static final String DEFAULT_JOURNAL_FOLDER = DEFAULT_HOME + "/journal/";
public static final String[] DEFAULT_STORAGE_TIER_DIR_QUOTA = "512MB,64GB,1TB".split(",");

public static final String TACHYON_HOME = "tachyon.home";
public static final String WEB_RESOURCES = "tachyon.web.resources";
public static final String UNDERFS_ADDRESS = "tachyon.underfs.address";
public static final String UNDERFS_DATA_FOLDER = "tachyon.data.folder";
public static final String UNDERFS_WORKERS_FOLDER = "tachyon.workers.folder";
public static final String UNDERFS_HDFS_IMPL = "tachyon.underfs.hdfs.impl";
public static final String ASYNC_ENABLED = "tachyon.async.enabled";
public static final String MAX_COLUMNS = "tachyon.max.columns";
public static final String IN_TEST_MODE = "tachyon.test.mode";
public static final String UNDERFS_GLUSTERFS_IMPL = "tachyon.underfs.glusterfs.impl";
public static final String UNDERFS_GLUSTERFS_VOLUMES = "tachyon.underfs.glusterfs.volumes";
public static final String UNDERFS_GLUSTERFS_MOUNTS = "tachyon.underfs.glusterfs.mounts";
public static final String UNDERFS_GLUSTERFS_MR_DIR =
"tachyon.underfs.glusterfs.mapred.system.dir";
public static final String USE_ZOOKEEPER = "tachyon.usezookeeper";
public static final String ZOOKEEPER_ADDRESS = "tachyon.zookeeper.address";
public static final String ZOOKEEPER_ELECTION_PATH = "tachyon.zookeeper.election.path";
public static final String ZOOKEEPER_LEADER_PATH = "tachyon.zookeeper.leader.path";
public static final String UNDERFS_HADOOP_PREFIXS = "tachyon.underfs.hadoop.prefixes";
public static final String MAX_TABLE_METADATA_BYTE = "tachyon.max.table.metadata.byte";
public static final String FORMAT_FILE_PREFIX = "_format_";

public static final String MASTER_FORMAT_FILE_PREFIX = "tachyon.master.format.file_prefix";
public static final String MASTER_HOSTNAME = "tachyon.master.hostname";
public static final String MASTER_JOURNAL_FOLDER = "tachyon.master.journal.folder";
public static final String MASTER_PORT = "tachyon.master.port";
public static final String MASTER_ADDRESS = "tachyon.master.address";
public static final String MASTER_WEB_PORT = "tachyon.master.web.port";
public static final String MASTER_WEB_THREAD_COUNT = "tachyon.master.web.threads";
public static final String MASTER_TEMPORARY_FOLDER = "tachyon.master.temporary.folder";
public static final String MASTER_HEARTBEAT_INTERVAL_MS = "tachyon.master.heartbeat.interval.ms";
public static final String MASTER_SELECTOR_THREADS = "tachyon.master.selector.threads";
public static final String MASTER_QUEUE_SIZE_PER_SELECTOR =
"tachyon.master.queue.size.per.selector";
public static final String MASTER_SERVER_THREADS = "tachyon.master.server.threads";
public static final String MASTER_WORKER_TIMEOUT_MS = "tachyon.master.worker.timeout.ms";
public static final String MASTER_WHITELIST = "tachyon.master.whitelist";
public static final String MASTER_KEYTAB_KEY = "tachyon.master.keytab.file";
public static final String MASTER_PRINCIPAL_KEY = "tachyon.master.principal";
public static final String MASTER_RETRY_COUNT = "tachyon.master.retry";

public static final String WORKER_MEMORY_SIZE = "tachyon.worker.memory.size";
public static final String WORKER_PORT = "tachyon.worker.port";
public static final String WORKER_DATA_PORT = "tachyon.worker.data.port";
public static final String WORKER_DATA_FOLDER = "tachyon.worker.data.folder";
public static final String WORKER_HEARTBEAT_TIMEOUT_MS = "tachyon.worker.heartbeat.timeout.ms";
public static final String WORKER_TO_MASTER_HEARTBEAT_INTERVAL_MS =
"tachyon.worker.to.master.heartbeat.interval.ms";
public static final String WORKER_SELECTOR_THREADS = "tachyon.worker.selector.threads";
public static final String WORKER_QUEUE_SIZE_PER_SELECTOR =
"tachyon.worker.queue.size.per.selector";
public static final String WORKER_SERVER_THREADS = "tachyon.worker.server.threads";
public static final String WORKER_USER_TIMEOUT_MS = "tachyon.worker.user.timeout.ms";

public static final String WORKER_CHECKPOINT_THREADS = "tachyon.worker.checkpoint.threads";
public static final String WORKER_PER_THREAD_CHECKPOINT_CAP_MB_SEC =
"tachyon.worker.per.thread.checkpoint.cap.mb.sec";
public static final String WORKER_NETWORK_TYPE = "tachyon.worker.network.type";
public static final String WORKER_NETTY_BOSS_THREADS =
"tachyon.worker.network.netty.boss.threads";
public static final String WORKER_NETTY_WORKER_THREADS =
"tachyon.worker.network.netty.worker.threads";
public static final String WORKER_NETWORK_NETTY_CHANNEL = "tachyon.worker.network.netty.channel";
public static final String WORKER_NETTY_FILE_TRANSFER_TYPE =
"tachyon.worker.network.netty.file.transfer";
public static final String WORKER_NETTY_WATERMARK_HIGH =
"tachyon.worker.network.netty.watermark.high";
public static final String WORKER_NETTY_WATERMARK_LOW =
"tachyon.worker.network.netty.watermark.low";
public static final String WORKER_NETTY_BACKLOG = "tachyon.worker.network.netty.backlog";
public static final String WORKER_NETTY_SEND_BUFFER = "tachyon.worker.network.netty.buffer.send";
public static final String WORKER_NETTY_RECEIVE_BUFFER =
"tachyon.worker.network.netty.buffer.receive";
public static final String WORKER_EVICT_STRATEGY_TYPE = "tachyon.worker.evict.strategy";
public static final String WORKER_ALLOCATE_STRATEGY_TYPE = "tachyon.worker.allocate.strategy";
public static final String WORKER_MAX_HIERARCHY_STORAGE_LEVEL =
"tachyon.worker.hierarchystore.level.max";
public static final String WORKER_KEYTAB_KEY = "tachyon.worker.keytab.file";
public static final String WORKER_PRINCIPAL_KEY = "tachyon.worker.principal";
public static final String WORKER_USER_TEMP_RELATIVE_FOLDER = "users";

public static final String USER_FAILED_SPACE_REQUEST_LIMITS =
"tachyon.user.failed.space.request.limits";
public static final String USER_QUOTA_UNIT_BYTES = "tachyon.user.quota.unit.bytes";
public static final String USER_FILE_BUFFER_BYTES = "tachyon.user.file.buffer.bytes";
public static final String USER_HEARTBEAT_INTERVAL_MS = "tachyon.user.heartbeat.interval.ms";
public static final String USER_DEFAULT_BLOCK_SIZE_BYTE = "tachyon.user.default.block.size.byte";
public static final String USER_REMOTE_READ_BUFFER_SIZE_BYTE =
"tachyon.user.remote.read.buffer.size.byte";
public static final String USER_DEFAULT_WRITE_TYPE = "tachyon.user.file.writetype.default";
}
45 changes: 28 additions & 17 deletions core/src/main/java/tachyon/Format.java
Expand Up @@ -20,9 +20,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tachyon.conf.CommonConf;
import tachyon.conf.MasterConf;
import tachyon.conf.WorkerConf;
import tachyon.conf.TachyonConf;
import tachyon.util.CommonUtils;

/**
Expand All @@ -33,8 +31,9 @@ public class Format {
private static final String USAGE = "java -cp target/tachyon-" + Version.VERSION
+ "-jar-with-dependencies.jar tachyon.Format <MASTER/WORKER>";

private static boolean formatFolder(String name, String folder) throws IOException {
UnderFileSystem ufs = UnderFileSystem.get(folder);
private static boolean formatFolder(String name, String folder, TachyonConf tachyonConf)
throws IOException {
UnderFileSystem ufs = UnderFileSystem.get(folder, tachyonConf);
LOG.info("Formatting {}:{}", name, folder);
if (ufs.exists(folder) && !ufs.delete(folder, true)) {
LOG.info("Failed to remove {}:{}", name, folder);
Expand All @@ -53,28 +52,39 @@ public static void main(String[] args) throws IOException {
System.exit(-1);
}

TachyonConf tachyonConf = new TachyonConf();

if (args[0].toUpperCase().equals("MASTER")) {
MasterConf masterConf = MasterConf.get();

if (!formatFolder("JOURNAL_FOLDER", masterConf.JOURNAL_FOLDER)) {
String masterJournal = tachyonConf.get(Constants.MASTER_JOURNAL_FOLDER,
Constants.DEFAULT_JOURNAL_FOLDER);
if (!formatFolder("JOURNAL_FOLDER", masterJournal, tachyonConf)) {
System.exit(-1);
}

CommonConf commonConf = CommonConf.get();
if (!formatFolder("UNDERFS_DATA_FOLDER", commonConf.UNDERFS_DATA_FOLDER)
|| !formatFolder("UNDERFS_WORKERS_FOLDER", commonConf.UNDERFS_WORKERS_FOLDER)) {
String tachyonHome = tachyonConf.get(Constants.TACHYON_HOME, Constants.DEFAULT_HOME);
String ufsAddress = tachyonConf.get(Constants.UNDERFS_ADDRESS, tachyonHome + "/underfs");
String ufsDataFolder = tachyonConf.get(Constants.UNDERFS_DATA_FOLDER,
ufsAddress + "/tachyon/data");
String ufsWorkerFolder = tachyonConf.get(Constants.UNDERFS_WORKERS_FOLDER,
ufsAddress + "/tachyon/workers");
if (!formatFolder("UNDERFS_DATA_FOLDER", ufsDataFolder, tachyonConf)
|| !formatFolder("UNDERFS_WORKERS_FOLDER", ufsWorkerFolder, tachyonConf)) {
System.exit(-1);
}

CommonUtils.touch(masterConf.JOURNAL_FOLDER + masterConf.FORMAT_FILE_PREFIX
+ System.currentTimeMillis());
CommonUtils.touch(masterJournal + Constants.FORMAT_FILE_PREFIX + System.currentTimeMillis(),
tachyonConf);
} else if (args[0].toUpperCase().equals("WORKER")) {
WorkerConf workerConf = WorkerConf.get();
for (int level = 0; level < workerConf.STORAGE_LEVELS; level ++) {
String[] dirPaths = workerConf.STORAGE_TIER_DIRS[level].split(",");
int maxStorageLevels = tachyonConf.getInt(Constants.WORKER_MAX_HIERARCHY_STORAGE_LEVEL, 1);
String workerDataFolder =
tachyonConf.get(Constants.WORKER_DATA_FOLDER, Constants.DEFAULT_DATA_FOLDER);
for (int level = 0; level < maxStorageLevels; level ++) {
String tierLevelDirPath = "tachyon.worker.hierarchystore.level" + level + ".dirs.path";
String[] dirPaths = tachyonConf.get(tierLevelDirPath, "/mnt/ramdisk").split(",");
for (int i = 0; i < dirPaths.length; i ++) {
String dataPath = CommonUtils.concat(dirPaths[i].trim(), workerConf.DATA_FOLDER);
UnderFileSystem ufs = UnderFileSystem.get(dataPath);
String dataPath = CommonUtils.concat(dirPaths[i].trim(), workerDataFolder);
UnderFileSystem ufs = UnderFileSystem.get(dataPath, tachyonConf);
LOG.info("Removing data under folder: {}", dataPath);
if (ufs.exists(dataPath)) {
String[] files = ufs.list(dataPath);
Expand All @@ -84,6 +94,7 @@ public static void main(String[] args) throws IOException {
}
}
}

} else {
LOG.info(USAGE);
System.exit(-1);
Expand Down
31 changes: 20 additions & 11 deletions core/src/main/java/tachyon/UnderFileSystem.java
Expand Up @@ -22,13 +22,15 @@

import com.google.common.base.Preconditions;

import tachyon.conf.CommonConf;
import tachyon.conf.TachyonConf;

/**
* Tachyon stores data into an under layer file system. Any file system implementing this interface
* can be a valid under layer file system
*/
public abstract class UnderFileSystem {
protected final TachyonConf mTachyonConf;

public enum SpaceType {
SPACE_TOTAL(0), SPACE_FREE(1), SPACE_USED(2);

Expand All @@ -50,28 +52,30 @@ public int getValue() {
* Get the UnderFileSystem instance according to its schema.
*
* @param path file path storing over the ufs.
* @param tachyonConf the {@link tachyon.conf.TachyonConf} instance.
* @throws IllegalArgumentException for unknown scheme
* @return instance of the under layer file system
*/
public static UnderFileSystem get(String path) {
return get(path, null);
public static UnderFileSystem get(String path, TachyonConf tachyonConf) {
return get(path, null, tachyonConf);
}

/**
* Get the UnderFileSystem instance according to its scheme and configuration.
*
* @param path file path storing over the ufs
* @param conf the configuration object for ufs only
* @param tachyonConf the {@link tachyon.conf.TachyonConf} instance.
* @throws IllegalArgumentException for unknown scheme
* @return instance of the under layer file system
*/
public static UnderFileSystem get(String path, Object conf) {
public static UnderFileSystem get(String path, Object conf, TachyonConf tachyonConf) {
Preconditions.checkArgument(path != null, "path may not be null");

if (isHadoopUnderFS(path)) {
return UnderFileSystemHdfs.getClient(path, conf);
if (isHadoopUnderFS(path, tachyonConf)) {
return UnderFileSystemHdfs.getClient(path, conf, tachyonConf);
} else if (path.startsWith(TachyonURI.SEPARATOR) || path.startsWith("file://")) {
return UnderFileSystemSingleLocal.getClient();
return UnderFileSystemSingleLocal.getClient(tachyonConf);
}
throw new IllegalArgumentException("Unknown under file system scheme " + path);
}
Expand All @@ -82,8 +86,9 @@ public static UnderFileSystem get(String path, Object conf) {
* The logic to say if a path should use the hadoop implementation is by checking if
* {@link String#startsWith(String)} to see if the configured schemas are found.
*/
private static boolean isHadoopUnderFS(final String path) {
for (final String prefix : CommonConf.get().HADOOP_UFS_PREFIXES) {
private static boolean isHadoopUnderFS(final String path, TachyonConf tachyonConf) {

for (final String prefix : tachyonConf.getList(Constants.UNDERFS_HADOOP_PREFIXS, ",", null)) {
if (path.startsWith(prefix)) {
return true;
}
Expand All @@ -102,14 +107,14 @@ private static boolean isHadoopUnderFS(final String path) {
* relative to that address. For local FS (with prefixes file:// or /), the under FS
* address is "/" and the path starts with "/".
*/
public static Pair<String, String> parse(TachyonURI path) {
public static Pair<String, String> parse(TachyonURI path, TachyonConf tachyonConf) {
Preconditions.checkArgument(path != null, "path may not be null");

if (path.hasScheme()) {
String header = path.getScheme() + "://";
String authority = (path.hasAuthority()) ? path.getAuthority() : "";
if (header.equals(Constants.HEADER) || header.equals(Constants.HEADER_FT)
|| isHadoopUnderFS(header)) {
|| isHadoopUnderFS(header, tachyonConf)) {
if (path.getPath().isEmpty()) {
return new Pair<String, String>(header + authority, TachyonURI.SEPARATOR);
} else {
Expand All @@ -125,6 +130,10 @@ public static Pair<String, String> parse(TachyonURI path) {
return null;
}

protected UnderFileSystem(TachyonConf tachyonConf) {
mTachyonConf = tachyonConf;
}

public abstract void close() throws IOException;

public abstract OutputStream create(String path) throws IOException;
Expand Down
34 changes: 22 additions & 12 deletions core/src/main/java/tachyon/UnderFileSystemHdfs.java
Expand Up @@ -36,7 +36,7 @@

import com.google.common.base.Throwables;

import tachyon.conf.CommonConf;
import tachyon.conf.TachyonConf;
import tachyon.hadoop.Utils;

/**
Expand All @@ -52,15 +52,17 @@ public class UnderFileSystemHdfs extends UnderFileSystem {
private static final FsPermission PERMISSION = new FsPermission((short) 0777)
.applyUMask(FsPermission.createImmutable((short) 0000));

public static UnderFileSystemHdfs getClient(String path) {
return new UnderFileSystemHdfs(path, null);
public static UnderFileSystemHdfs getClient(String path, TachyonConf tachyonConf) {
return new UnderFileSystemHdfs(path, null, tachyonConf);
}

public static UnderFileSystemHdfs getClient(String path, Object conf) {
return new UnderFileSystemHdfs(path, conf);
public static UnderFileSystemHdfs getClient(String path, Object conf, TachyonConf tachyonConf) {
return new UnderFileSystemHdfs(path, conf, tachyonConf);
}

private UnderFileSystemHdfs(String fsDefaultName, Object conf) {
private UnderFileSystemHdfs(String fsDefaultName, Object conf, TachyonConf tachyonConf) {
super(tachyonConf);

mUfsPrefix = fsDefaultName;
Configuration tConf;
if (conf != null) {
Expand All @@ -71,19 +73,27 @@ private UnderFileSystemHdfs(String fsDefaultName, Object conf) {
String glusterfsPrefix = "glusterfs:///";
tConf.set("fs.defaultFS", fsDefaultName);
if (fsDefaultName.startsWith(glusterfsPrefix)) {
String gfsImpl = mTachyonConf.get(Constants.UNDERFS_GLUSTERFS_IMPL,
"org.apache.hadoop.fs.glusterfs.GlusterFileSystem");
String gfsMrDir = mTachyonConf.get(Constants.UNDERFS_GLUSTERFS_MR_DIR,
"glusterfs:///mapred/system");
String gfsVolumes = mTachyonConf.get(Constants.UNDERFS_GLUSTERFS_VOLUMES, null);
String gfsMounts = mTachyonConf.get(Constants.UNDERFS_GLUSTERFS_MOUNTS, null);

if (tConf.get("fs.glusterfs.impl") == null) {
tConf.set("fs.glusterfs.impl", CommonConf.get().UNDERFS_GLUSTERFS_IMPL);
tConf.set("fs.glusterfs.impl", gfsImpl);
}
if (tConf.get("mapred.system.dir") == null) {
tConf.set("mapred.system.dir", CommonConf.get().UNDERFS_GLUSTERFS_MR_DIR);
tConf.set("mapred.system.dir", gfsMrDir);
}
if (tConf.get("fs.glusterfs.volumes") == null) {
tConf.set("fs.glusterfs.volumes", CommonConf.get().UNDERFS_GLUSTERFS_VOLUMES);
tConf.set("fs.glusterfs.volume.fuse." + CommonConf.get().UNDERFS_GLUSTERFS_VOLUMES,
CommonConf.get().UNDERFS_GLUSTERFS_MOUNTS);
tConf.set("fs.glusterfs.volumes", gfsVolumes);
tConf.set("fs.glusterfs.volume.fuse." + gfsVolumes, gfsMounts);
}
} else {
tConf.set("fs.hdfs.impl", CommonConf.get().UNDERFS_HDFS_IMPL);
String ufsHdfsImpl = mTachyonConf.get(Constants.UNDERFS_HDFS_IMPL,
"org.apache.hadoop.hdfs.DistributedFileSystem");
tConf.set("fs.hdfs.impl", ufsHdfsImpl);

// To disable the instance cache for hdfs client, otherwise it causes the
// FileSystem closed exception. Being configurable for unit/integration
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/java/tachyon/UnderFileSystemSingleLocal.java
Expand Up @@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.List;

import tachyon.conf.TachyonConf;
import tachyon.util.CommonUtils;
import tachyon.util.NetworkUtils;

Expand All @@ -35,8 +36,12 @@
*/
public class UnderFileSystemSingleLocal extends UnderFileSystem {

public static UnderFileSystem getClient() {
return new UnderFileSystemSingleLocal();
protected UnderFileSystemSingleLocal(TachyonConf tachyonConf) {
super(tachyonConf);
}

public static UnderFileSystem getClient(TachyonConf tachyonConf) {
return new UnderFileSystemSingleLocal(tachyonConf);
}

@Override
Expand Down

0 comments on commit a73823d

Please sign in to comment.