Skip to content

Commit

Permalink
modify the code according to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Mingfei committed Sep 23, 2015
1 parent a8b8eea commit 783ee1a
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 28 deletions.
8 changes: 4 additions & 4 deletions common/src/main/java/tachyon/Constants.java
Expand Up @@ -230,11 +230,11 @@ public final class Constants {
public static final String WORKER_TIERED_STORAGE_LEVEL_RESERVED_RATIO_FORMAT =
"tachyon.worker.tieredstore.level%d.reserved.ratio";

public static final String WORKER_TIERED_STORAGE_EVICT_ASYNC_PERIOD_MS_FORMAT =
"tachyon.worker.tieredstore.evict.async.period.ms";
public static final String WORKER_SPACE_RESERVER_PERIOD_MS =
"tachyon.worker.space.reserver.period.ms";

public static final String WORKER_EVICT_ASYNC_ENABLE =
"tachyon.worker.tieredstore.evict.async.enable";
public static final String WORKER_SPACE_RESERVER_ENABLE =
"tachyon.worker.space.reserver.enable";
public static final String WORKER_KEYTAB_KEY = "tachyon.worker.keytab.file";
public static final String WORKER_PRINCIPAL_KEY = "tachyon.worker.principal";
public static final String WORKER_USER_TEMP_RELATIVE_FOLDER = "users";
Expand Down
4 changes: 2 additions & 2 deletions common/src/main/resources/tachyon-default.properties
Expand Up @@ -93,8 +93,8 @@ tachyon.worker.tieredstore.level0.alias=MEM
tachyon.worker.tieredstore.level0.dirs.quota=${tachyon.worker.memory.size}
tachyon.worker.tieredstore.level0.dirs.path=/mnt/ramdisk
tachyon.worker.tieredstore.level0.reserved.ratio=0.1
tachyon.worker.tieredstore.evict.async.enable=false
tachyon.worker.tieredstore.evict.async.period.ms=1000
tachyon.worker.space.reserver.enable=false
tachyon.worker.space.reserver.period.ms=1000

tachyon.worker.resource.cpu=1
tachyon.worker.resource.mem=1024MB
Expand Down
Expand Up @@ -325,7 +325,7 @@ public void createBlockRemote(long sessionId, long blockId, int tierAlias, long
* @throws AlreadyExistsException if blocks to move already exists in destination location
* @throws InvalidStateException if blocks to move/evict is uncommitted
*/
void freeSpace(long sessionId, long availableBytes, int tierAlias)
public void freeSpace(long sessionId, long availableBytes, int tierAlias)
throws OutOfSpaceException, NotFoundException, IOException, AlreadyExistsException,
InvalidStateException {
BlockStoreLocation location = BlockStoreLocation.anyDirInTier(tierAlias);
Expand Down
22 changes: 11 additions & 11 deletions servers/src/main/java/tachyon/worker/block/BlockWorker.java
Expand Up @@ -96,8 +96,8 @@ public final class BlockWorker {
private final UIWebServer mWebServer;
/** Worker metrics system */
private MetricsSystem mWorkerMetricsSystem;
/** Asynchronous evictor for the block data manager */
private AsyncEvictor mAsyncEvictor = null;
/** Space reserver for the block data manager */
private SpaceReserver mSpaceReserver = null;

/**
* @return the worker service handler
Expand Down Expand Up @@ -209,7 +209,7 @@ public BlockWorker() throws IOException {

// Setup Worker to Master Syncer
// We create four threads for two syncers, one cleaner and one asynchronous evictor:
// mBlockMasterSync, mPinListSync, mSessionCleanerThread, mAsyncEvictor
// mBlockMasterSync, mPinListSync, mSessionCleanerThread, mSpaceReserver
mSyncExecutorService =
Executors.newFixedThreadPool(4, ThreadFactoryUtils.build("worker-heartbeat-%d", true));

Expand All @@ -225,9 +225,9 @@ public BlockWorker() throws IOException {
// Setup session cleaner
mSessionCleanerThread = new SessionCleaner(mBlockDataManager);

// Setup asynchronous evictor
if (mTachyonConf.getBoolean(Constants.WORKER_EVICT_ASYNC_ENABLE)) {
mAsyncEvictor = new AsyncEvictor(mBlockDataManager, mTachyonConf);
// Setup space reserver
if (mTachyonConf.getBoolean(Constants.WORKER_SPACE_RESERVER_ENABLE)) {
mSpaceReserver = new SpaceReserver(mBlockDataManager);
}

// Setup session metadata mapping
Expand Down Expand Up @@ -270,9 +270,9 @@ public void process() {
// Start the session cleanup checker to perform the periodical checking
mSyncExecutorService.submit(mSessionCleanerThread);

// Start the asynchronous evictor
if (mAsyncEvictor != null) {
mSyncExecutorService.submit(mAsyncEvictor);
// Start the space reserver
if (mSpaceReserver != null) {
mSyncExecutorService.submit(mSpaceReserver);
}

mWebServer.startWebServer();
Expand All @@ -296,8 +296,8 @@ public void stop() throws IOException {
mMasterClientExecutorService.shutdown();
mSyncExecutorService.shutdown();
mWorkerMetricsSystem.stop();
if (mAsyncEvictor != null) {
mAsyncEvictor.stop();
if (mSpaceReserver != null) {
mSpaceReserver.stop();
}
try {
mWebServer.shutdownWebServer();
Expand Down
Expand Up @@ -19,7 +19,6 @@
import org.slf4j.LoggerFactory;

import tachyon.Constants;
import tachyon.conf.TachyonConf;
import tachyon.util.CommonUtils;
import tachyon.worker.WorkerContext;

Expand Down
Expand Up @@ -25,29 +25,29 @@
import tachyon.Constants;
import tachyon.Pair;
import tachyon.Sessions;
import tachyon.conf.TachyonConf;
import tachyon.exception.AlreadyExistsException;
import tachyon.exception.InvalidStateException;
import tachyon.exception.NotFoundException;
import tachyon.exception.OutOfSpaceException;
import tachyon.util.CommonUtils;
import tachyon.worker.WorkerContext;

/**
* AsyncEvictor periodically checks if there is enough space reserved on each storage tier, if
* SpaceReserver periodically checks if there is enough space reserved on each storage tier, if
* there is no enough free space on some tier, free space from it.
*/
public class AsyncEvictor implements Runnable {
public class SpaceReserver implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
private final BlockDataManager mBlockManager;
/** Mapping from tier alias to space to be reserved on the tier */
/** Mapping from tier alias to space size to be reserved on the tier */
private final List<Pair<Integer, Long>> mBytesToReserveOnTiers =
new ArrayList<Pair<Integer, Long>>();
/** Milliseconds between each check */
private final int mCheckIntervalMs;
/** Flag to indicate if the checking should continue */
private volatile boolean mRunning;

public AsyncEvictor(BlockDataManager blockManager, TachyonConf tachyonConf) {
public SpaceReserver(BlockDataManager blockManager) {
mBlockManager = blockManager;
List<Long> capOnTiers = blockManager.getStoreMeta().getCapacityBytesOnTiers();
List<Integer> aliasOnTiers = blockManager.getStoreMeta().getAliasOnTiers();
Expand All @@ -56,14 +56,16 @@ public AsyncEvictor(BlockDataManager blockManager, TachyonConf tachyonConf) {
String tierReservedSpaceProp =
String.format(Constants.WORKER_TIERED_STORAGE_LEVEL_RESERVED_RATIO_FORMAT, idx);
int tierAlias = aliasOnTiers.get(idx);
/** Similar to {@link BlockStoreMeta}, the alias index is the value of alias - 1 */
long reservedSpaceBytes =
(long)(capOnTiers.get(tierAlias - 1) * tachyonConf.getDouble(tierReservedSpaceProp));
(long)(capOnTiers.get(tierAlias - 1) * WorkerContext.getConf()
.getDouble(tierReservedSpaceProp));
mBytesToReserveOnTiers.add(new Pair<Integer, Long>(tierAlias,
reservedSpaceBytes + lastTierReservedBytes));
lastTierReservedBytes += reservedSpaceBytes;
}
mCheckIntervalMs =
tachyonConf.getInt(Constants.WORKER_TIERED_STORAGE_EVICT_ASYNC_PERIOD_MS_FORMAT);
WorkerContext.getConf().getInt(Constants.WORKER_SPACE_RESERVER_PERIOD_MS);
mRunning = true;
}

Expand All @@ -77,14 +79,14 @@ public void run() {
if (toSleepMs > 0) {
CommonUtils.sleepMs(LOG, toSleepMs);
} else {
LOG.warn("Async eviction took: " + lastIntervalMs + ", expected: " + mCheckIntervalMs);
LOG.warn("Space reserver took: " + lastIntervalMs + ", expected: " + mCheckIntervalMs);
}
for (int tierIdx = mBytesToReserveOnTiers.size() - 1; tierIdx >= 0 ; tierIdx --) {
Pair<Integer, Long> bytesReservedOnTier = mBytesToReserveOnTiers.get(tierIdx);
int tierAlias = bytesReservedOnTier.getFirst();
long bytesReserved = bytesReservedOnTier.getSecond();
try {
mBlockManager.freeSpace(Sessions.DATASERVER_SESSION_ID, bytesReserved, tierAlias);
mBlockManager.freeSpace(Sessions.MIGRATE_DATA_SESSION_ID, bytesReserved, tierAlias);
} catch (OutOfSpaceException e) {
LOG.warn(e.getMessage());
} catch (NotFoundException e) {
Expand All @@ -104,6 +106,7 @@ public void run() {
* Stops the checking, once this method is called, the object should be discarded
*/
public void stop() {
LOG.info("Space reserver exits!");
mRunning = false;
}
}

0 comments on commit 783ee1a

Please sign in to comment.