From 783ee1a9467372575349f33a4320ecc30bf6cfbe Mon Sep 17 00:00:00 2001 From: Mingfei Date: Tue, 22 Sep 2015 10:16:51 +0800 Subject: [PATCH] modify the code according to comments --- common/src/main/java/tachyon/Constants.java | 8 +++---- .../main/resources/tachyon-default.properties | 4 ++-- .../worker/block/BlockDataManager.java | 2 +- .../tachyon/worker/block/BlockWorker.java | 22 +++++++++---------- .../tachyon/worker/block/SessionCleaner.java | 1 - .../{AsyncEvictor.java => SpaceReserver.java} | 21 ++++++++++-------- 6 files changed, 30 insertions(+), 28 deletions(-) rename servers/src/main/java/tachyon/worker/block/{AsyncEvictor.java => SpaceReserver.java} (82%) diff --git a/common/src/main/java/tachyon/Constants.java b/common/src/main/java/tachyon/Constants.java index f744c83c6768..d46f5e6a9fcd 100644 --- a/common/src/main/java/tachyon/Constants.java +++ b/common/src/main/java/tachyon/Constants.java @@ -230,11 +230,11 @@ public final class Constants { public static final String WORKER_TIERED_STORAGE_LEVEL_RESERVED_RATIO_FORMAT = "tachyon.worker.tieredstore.level%d.reserved.ratio"; - public static final String WORKER_TIERED_STORAGE_EVICT_ASYNC_PERIOD_MS_FORMAT = - "tachyon.worker.tieredstore.evict.async.period.ms"; + public static final String WORKER_SPACE_RESERVER_PERIOD_MS = + "tachyon.worker.space.reserver.period.ms"; - public static final String WORKER_EVICT_ASYNC_ENABLE = - "tachyon.worker.tieredstore.evict.async.enable"; + public static final String WORKER_SPACE_RESERVER_ENABLE = + "tachyon.worker.space.reserver.enable"; public static final String WORKER_KEYTAB_KEY = "tachyon.worker.keytab.file"; public static final String WORKER_PRINCIPAL_KEY = "tachyon.worker.principal"; public static final String WORKER_USER_TEMP_RELATIVE_FOLDER = "users"; diff --git a/common/src/main/resources/tachyon-default.properties b/common/src/main/resources/tachyon-default.properties index a43c0dc4f8cf..78e014e4ca1b 100644 --- a/common/src/main/resources/tachyon-default.properties +++ b/common/src/main/resources/tachyon-default.properties @@ -93,8 +93,8 @@ tachyon.worker.tieredstore.level0.alias=MEM tachyon.worker.tieredstore.level0.dirs.quota=${tachyon.worker.memory.size} tachyon.worker.tieredstore.level0.dirs.path=/mnt/ramdisk tachyon.worker.tieredstore.level0.reserved.ratio=0.1 -tachyon.worker.tieredstore.evict.async.enable=false -tachyon.worker.tieredstore.evict.async.period.ms=1000 +tachyon.worker.space.reserver.enable=false +tachyon.worker.space.reserver.period.ms=1000 tachyon.worker.resource.cpu=1 tachyon.worker.resource.mem=1024MB diff --git a/servers/src/main/java/tachyon/worker/block/BlockDataManager.java b/servers/src/main/java/tachyon/worker/block/BlockDataManager.java index 257fd07ad68d..f40f67910994 100644 --- a/servers/src/main/java/tachyon/worker/block/BlockDataManager.java +++ b/servers/src/main/java/tachyon/worker/block/BlockDataManager.java @@ -325,7 +325,7 @@ public void createBlockRemote(long sessionId, long blockId, int tierAlias, long * @throws AlreadyExistsException if blocks to move already exists in destination location * @throws InvalidStateException if blocks to move/evict is uncommitted */ - void freeSpace(long sessionId, long availableBytes, int tierAlias) + public void freeSpace(long sessionId, long availableBytes, int tierAlias) throws OutOfSpaceException, NotFoundException, IOException, AlreadyExistsException, InvalidStateException { BlockStoreLocation location = BlockStoreLocation.anyDirInTier(tierAlias); diff --git a/servers/src/main/java/tachyon/worker/block/BlockWorker.java b/servers/src/main/java/tachyon/worker/block/BlockWorker.java index 7b1e11bb302a..4fabc2c1f8d7 100644 --- a/servers/src/main/java/tachyon/worker/block/BlockWorker.java +++ b/servers/src/main/java/tachyon/worker/block/BlockWorker.java @@ -96,8 +96,8 @@ public final class BlockWorker { private final UIWebServer mWebServer; /** Worker metrics system */ private MetricsSystem mWorkerMetricsSystem; - /** Asynchronous evictor for the block data manager */ - private AsyncEvictor mAsyncEvictor = null; + /** Space reserver for the block data manager */ + private SpaceReserver mSpaceReserver = null; /** * @return the worker service handler @@ -209,7 +209,7 @@ public BlockWorker() throws IOException { // Setup Worker to Master Syncer // We create four threads for two syncers, one cleaner and one asynchronous evictor: - // mBlockMasterSync, mPinListSync, mSessionCleanerThread, mAsyncEvictor + // mBlockMasterSync, mPinListSync, mSessionCleanerThread, mSpaceReserver mSyncExecutorService = Executors.newFixedThreadPool(4, ThreadFactoryUtils.build("worker-heartbeat-%d", true)); @@ -225,9 +225,9 @@ public BlockWorker() throws IOException { // Setup session cleaner mSessionCleanerThread = new SessionCleaner(mBlockDataManager); - // Setup asynchronous evictor - if (mTachyonConf.getBoolean(Constants.WORKER_EVICT_ASYNC_ENABLE)) { - mAsyncEvictor = new AsyncEvictor(mBlockDataManager, mTachyonConf); + // Setup space reserver + if (mTachyonConf.getBoolean(Constants.WORKER_SPACE_RESERVER_ENABLE)) { + mSpaceReserver = new SpaceReserver(mBlockDataManager); } // Setup session metadata mapping @@ -270,9 +270,9 @@ public void process() { // Start the session cleanup checker to perform the periodical checking mSyncExecutorService.submit(mSessionCleanerThread); - // Start the asynchronous evictor - if (mAsyncEvictor != null) { - mSyncExecutorService.submit(mAsyncEvictor); + // Start the space reserver + if (mSpaceReserver != null) { + mSyncExecutorService.submit(mSpaceReserver); } mWebServer.startWebServer(); @@ -296,8 +296,8 @@ public void stop() throws IOException { mMasterClientExecutorService.shutdown(); mSyncExecutorService.shutdown(); mWorkerMetricsSystem.stop(); - if (mAsyncEvictor != null) { - mAsyncEvictor.stop(); + if (mSpaceReserver != null) { + mSpaceReserver.stop(); } try { mWebServer.shutdownWebServer(); diff --git a/servers/src/main/java/tachyon/worker/block/SessionCleaner.java b/servers/src/main/java/tachyon/worker/block/SessionCleaner.java index 83084729d244..1c655e51395a 100644 --- a/servers/src/main/java/tachyon/worker/block/SessionCleaner.java +++ b/servers/src/main/java/tachyon/worker/block/SessionCleaner.java @@ -19,7 +19,6 @@ import org.slf4j.LoggerFactory; import tachyon.Constants; -import tachyon.conf.TachyonConf; import tachyon.util.CommonUtils; import tachyon.worker.WorkerContext; diff --git a/servers/src/main/java/tachyon/worker/block/AsyncEvictor.java b/servers/src/main/java/tachyon/worker/block/SpaceReserver.java similarity index 82% rename from servers/src/main/java/tachyon/worker/block/AsyncEvictor.java rename to servers/src/main/java/tachyon/worker/block/SpaceReserver.java index 4dbe38ceae9d..a5fbc1a6ac23 100644 --- a/servers/src/main/java/tachyon/worker/block/AsyncEvictor.java +++ b/servers/src/main/java/tachyon/worker/block/SpaceReserver.java @@ -25,21 +25,21 @@ import tachyon.Constants; import tachyon.Pair; import tachyon.Sessions; -import tachyon.conf.TachyonConf; import tachyon.exception.AlreadyExistsException; import tachyon.exception.InvalidStateException; import tachyon.exception.NotFoundException; import tachyon.exception.OutOfSpaceException; import tachyon.util.CommonUtils; +import tachyon.worker.WorkerContext; /** - * AsyncEvictor periodically checks if there is enough space reserved on each storage tier, if + * SpaceReserver periodically checks if there is enough space reserved on each storage tier, if * there is no enough free space on some tier, free space from it. */ -public class AsyncEvictor implements Runnable { +public class SpaceReserver implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private final BlockDataManager mBlockManager; - /** Mapping from tier alias to space to be reserved on the tier */ + /** Mapping from tier alias to space size to be reserved on the tier */ private final List> mBytesToReserveOnTiers = new ArrayList>(); /** Milliseconds between each check */ @@ -47,7 +47,7 @@ public class AsyncEvictor implements Runnable { /** Flag to indicate if the checking should continue */ private volatile boolean mRunning; - public AsyncEvictor(BlockDataManager blockManager, TachyonConf tachyonConf) { + public SpaceReserver(BlockDataManager blockManager) { mBlockManager = blockManager; List capOnTiers = blockManager.getStoreMeta().getCapacityBytesOnTiers(); List aliasOnTiers = blockManager.getStoreMeta().getAliasOnTiers(); @@ -56,14 +56,16 @@ public AsyncEvictor(BlockDataManager blockManager, TachyonConf tachyonConf) { String tierReservedSpaceProp = String.format(Constants.WORKER_TIERED_STORAGE_LEVEL_RESERVED_RATIO_FORMAT, idx); int tierAlias = aliasOnTiers.get(idx); + /** Similar to {@link BlockStoreMeta}, the alias index is the value of alias - 1 */ long reservedSpaceBytes = - (long)(capOnTiers.get(tierAlias - 1) * tachyonConf.getDouble(tierReservedSpaceProp)); + (long)(capOnTiers.get(tierAlias - 1) * WorkerContext.getConf() + .getDouble(tierReservedSpaceProp)); mBytesToReserveOnTiers.add(new Pair(tierAlias, reservedSpaceBytes + lastTierReservedBytes)); lastTierReservedBytes += reservedSpaceBytes; } mCheckIntervalMs = - tachyonConf.getInt(Constants.WORKER_TIERED_STORAGE_EVICT_ASYNC_PERIOD_MS_FORMAT); + WorkerContext.getConf().getInt(Constants.WORKER_SPACE_RESERVER_PERIOD_MS); mRunning = true; } @@ -77,14 +79,14 @@ public void run() { if (toSleepMs > 0) { CommonUtils.sleepMs(LOG, toSleepMs); } else { - LOG.warn("Async eviction took: " + lastIntervalMs + ", expected: " + mCheckIntervalMs); + LOG.warn("Space reserver took: " + lastIntervalMs + ", expected: " + mCheckIntervalMs); } for (int tierIdx = mBytesToReserveOnTiers.size() - 1; tierIdx >= 0 ; tierIdx --) { Pair bytesReservedOnTier = mBytesToReserveOnTiers.get(tierIdx); int tierAlias = bytesReservedOnTier.getFirst(); long bytesReserved = bytesReservedOnTier.getSecond(); try { - mBlockManager.freeSpace(Sessions.DATASERVER_SESSION_ID, bytesReserved, tierAlias); + mBlockManager.freeSpace(Sessions.MIGRATE_DATA_SESSION_ID, bytesReserved, tierAlias); } catch (OutOfSpaceException e) { LOG.warn(e.getMessage()); } catch (NotFoundException e) { @@ -104,6 +106,7 @@ public void run() { * Stops the checking, once this method is called, the object should be discarded */ public void stop() { + LOG.info("Space reserver exits!"); mRunning = false; } }