Skip to content

Commit

Permalink
Modify utils
Browse files Browse the repository at this point in the history
  • Loading branch information
apc999 committed Aug 10, 2015
1 parent fea8043 commit c7301a4
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 23 deletions.
12 changes: 11 additions & 1 deletion common/src/main/java/tachyon/util/io/FileUtils.java
Expand Up @@ -170,7 +170,7 @@ public static void delete(File file) throws IOException {
} }


/** /**
* Creates a file and its intermediate directories if necessary. * Creates an empty file and its intermediate directories if necessary.
* *
* @param file the file to create * @param file the file to create
* @throws IOException if an I/O error occurred or file already exists * @throws IOException if an I/O error occurred or file already exists
Expand All @@ -181,4 +181,14 @@ public static void createFile(File file) throws IOException {
throw new IOException("File already exists " + file.getPath()); throw new IOException("File already exists " + file.getPath());
} }
} }

/**
* Creates an empty directory and its intermediate directories if necessary.
*
* @param file the file to create
* @throws IOException if an I/O error occurred or file already exists
*/
public static void createDir(File file) throws IOException {
file.mkdirs();
}
} }
Expand Up @@ -16,20 +16,20 @@
package tachyon.worker.block; package tachyon.worker.block;


import java.io.File; import java.io.File;
import java.io.IOException;


import org.apache.commons.io.FileUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;


import tachyon.Constants; import tachyon.Constants;
import tachyon.Pair;
import tachyon.StorageLevelAlias;
import tachyon.conf.TachyonConf; import tachyon.conf.TachyonConf;
import tachyon.util.io.BufferUtils; import tachyon.util.io.BufferUtils;
import tachyon.util.io.PathUtils; import tachyon.util.io.PathUtils;
import tachyon.util.io.FileUtils;
import tachyon.worker.block.evictor.Evictor; import tachyon.worker.block.evictor.Evictor;
import tachyon.worker.block.io.BlockWriter; import tachyon.worker.block.io.BlockWriter;
import tachyon.worker.block.io.LocalFileBlockWriter; import tachyon.worker.block.io.LocalFileBlockWriter;
Expand All @@ -47,10 +47,11 @@ public class TieredBlockStoreTestUtils {
* 2000, 3000 bytes separately in the SSD tier. * 2000, 3000 bytes separately in the SSD tier.
*/ */
public static final int[] TIER_LEVEL = {0, 1}; public static final int[] TIER_LEVEL = {0, 1};
public static final String[] TIER_ALIAS = {"MEM", "SSD"}; public static final StorageLevelAlias[] TIER_ALIAS = {StorageLevelAlias.MEM,
StorageLevelAlias.SSD};
public static final String[][] TIER_PATH = public static final String[][] TIER_PATH =
{ {"/mem/0", "/mem/1"}, {"/ssd/0", "/ssd/1", "/ssd/2"}}; { {"/mem/0", "/mem/1"}, {"/ssd/0", "/ssd/1", "/ssd/2"}};
public static final long[][] TIER_CAPACITY = { {100, 200}, {1000, 2000, 3000}}; public static final long[][] TIER_CAPACITY = { {2000, 3000}, {10000, 20000, 30000}};
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);


/** /**
Expand All @@ -66,7 +67,7 @@ public class TieredBlockStoreTestUtils {
* each element is the capacity of the corresponding dir in tierPath * each element is the capacity of the corresponding dir in tierPath
* @return the created TachyonConf * @return the created TachyonConf
*/ */
public static TachyonConf newTachyonConf(int[] tierLevel, String[] tierAlias, public static TachyonConf newTachyonConf(int[] tierLevel, StorageLevelAlias[] tierAlias,
String[][] tierPath, long[][] tierCapacity) { String[][] tierPath, long[][] tierCapacity) {
// make sure dimensions are legal // make sure dimensions are legal
Preconditions.checkNotNull(tierLevel); Preconditions.checkNotNull(tierLevel);
Expand All @@ -92,7 +93,7 @@ public static TachyonConf newTachyonConf(int[] tierLevel, String[] tierAlias,
for (int i = 0; i < nTier; i ++) { for (int i = 0; i < nTier; i ++) {
int level = tierLevel[i]; int level = tierLevel[i];
tachyonConf.set(String.format(Constants.WORKER_TIERED_STORAGE_LEVEL_ALIAS_FORMAT, level), tachyonConf.set(String.format(Constants.WORKER_TIERED_STORAGE_LEVEL_ALIAS_FORMAT, level),
tierAlias[i]); tierAlias[i].toString());


StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (String path : tierPath[i]) { for (String path : tierPath[i]) {
Expand Down Expand Up @@ -143,7 +144,7 @@ public static TachyonConf defaultTachyonConf(String baseDir) throws Exception {
dirs[i] = new String[len]; dirs[i] = new String[len];
for (int j = 0; j < len; j ++) { for (int j = 0; j < len; j ++) {
dirs[i][j] = PathUtils.concatPath(baseDir, TIER_PATH[i][j]); dirs[i][j] = PathUtils.concatPath(baseDir, TIER_PATH[i][j]);
FileUtils.forceMkdir(new File(dirs[i][j])); FileUtils.createDir(new File(dirs[i][j]));
} }
} }
return newTachyonConf(TIER_LEVEL, TIER_ALIAS, dirs, TIER_CAPACITY); return newTachyonConf(TIER_LEVEL, TIER_ALIAS, dirs, TIER_CAPACITY);
Expand All @@ -162,22 +163,12 @@ public static TachyonConf defaultTachyonConf(String baseDir) throws Exception {
*/ */
public static void cache(long userId, long blockId, long bytes, StorageDir dir, public static void cache(long userId, long blockId, long bytes, StorageDir dir,
BlockMetadataManager meta, Evictor evictor) throws Exception { BlockMetadataManager meta, Evictor evictor) throws Exception {
// prepare temp block Pair<TempBlockMeta, File> result = makeTempBlock(userId, blockId, bytes, dir, meta);
TempBlockMeta block = new TempBlockMeta(userId, blockId, bytes, dir); TempBlockMeta block = result.getFirst();
meta.addTempBlockMeta(block); File tempFile = result.getSecond();

// write data
File tempFile = new File(block.getPath());
if (!tempFile.getParentFile().mkdir()) {
throw new IOException(String.format(
"Parent directory of %s can not be created for temp block", block.getPath()));
}
BlockWriter writer = new LocalFileBlockWriter(block);
writer.append(BufferUtils.getIncreasingByteBuffer(Ints.checkedCast(bytes)));
writer.close();


// commit block // commit block
Files.move(tempFile, new File(block.getCommitPath())); FileUtils.move(tempFile, new File(block.getCommitPath()));
meta.commitTempBlockMeta(block); meta.commitTempBlockMeta(block);


// update evictor // update evictor
Expand All @@ -187,4 +178,29 @@ public static void cache(long userId, long blockId, long bytes, StorageDir dir,
} }
} }


/**
* Make a temp block in StorageDir.
*
* @param userId user who caches the data
* @param blockId id of the cached block
* @param bytes size of the block in bytes
* @param dir the StorageDir the block resides in
* @param meta the metadata manager to update meta of the block
* @return a pair of temp block meta and the file handler
* @throws Exception when fail to create this block
*/
public static Pair<TempBlockMeta, File> makeTempBlock(long userId, long blockId, long bytes,
StorageDir dir, BlockMetadataManager meta) throws Exception {
// prepare temp block
TempBlockMeta block = new TempBlockMeta(userId, blockId, bytes, dir);
meta.addTempBlockMeta(block);

// write data
File tempFile = new File(block.getPath());
FileUtils.createFile(tempFile);
BlockWriter writer = new LocalFileBlockWriter(block);
writer.append(BufferUtils.getIncreasingByteBuffer(Ints.checkedCast(bytes)));
writer.close();
return new Pair<TempBlockMeta, File>(block, tempFile);
}
} }

0 comments on commit c7301a4

Please sign in to comment.