Skip to content

Commit

Permalink
Rename fetching->syncing, synchronize pinInodes updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ooq committed Jul 10, 2015
1 parent 8ce1dc7 commit fd88ae0
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 23 deletions.
42 changes: 21 additions & 21 deletions servers/src/main/java/tachyon/worker/block/PinListSync.java
Expand Up @@ -30,9 +30,9 @@
import tachyon.util.ThreadFactoryUtils;

/**
* PinListSync periodically fetches the set of pinned inodes from master,
* PinListSync periodically syncs the set of pinned inodes from master,
* and save the new pinned inodes to the BlockDataManager.
* The fetching parameters (intervals, timeouts) adopt directly from worker-to-master heartbeat
* The syncing parameters (intervals, timeouts) adopt directly from worker-to-master heartbeat
* configurations.
*
*/
Expand All @@ -45,14 +45,14 @@ public class PinListSync implements Runnable {
private final ExecutorService mMasterClientExecutorService;
/** The configuration values */
private final TachyonConf mTachyonConf;
/** Milliseconds between each fetch */
private final int mFetchIntervalMs;
/** Milliseconds between fetches before a timeout */
private final int mFetchTimeoutMs;
/** Milliseconds between each sync */
private final int mSyncIntervalMs;
/** Milliseconds between syncs before a timeout */
private final int mSyncTimeoutMs;

/** Client for all master communication */
private MasterClient mMasterClient;
/** Flag to indicate if the fetching should continue */
/** Flag to indicate if the syncing should continue */
private volatile boolean mRunning;

/**
Expand All @@ -71,50 +71,50 @@ public PinListSync(BlockDataManager blockDataManager, TachyonConf tachyonConf) {
mMasterClient =
new MasterClient(BlockWorkerUtils.getMasterAddress(mTachyonConf),
mMasterClientExecutorService, mTachyonConf);
mFetchIntervalMs =
mSyncIntervalMs =
mTachyonConf.getInt(Constants.WORKER_TO_MASTER_HEARTBEAT_INTERVAL_MS, Constants.SECOND_MS);
mFetchTimeoutMs =
mSyncTimeoutMs =
mTachyonConf.getInt(Constants.WORKER_HEARTBEAT_TIMEOUT_MS, 10 * Constants.SECOND_MS);

mRunning = true;
}


/**
* Main loop for the sync, continuously fetch pinlist from master
* Main loop for the sync, continuously sync pinlist from master
*/
@Override
public void run() {
long lastFetchMs = System.currentTimeMillis();
long lastSyncMs = System.currentTimeMillis();
while (mRunning) {
// Check the time since last fetch, and wait until it is within fetch interval
long lastIntervalMs = System.currentTimeMillis() - lastFetchMs;
long toSleepMs = mFetchIntervalMs - lastIntervalMs;
// Check the time since last sync, and wait until it is within sync interval
long lastIntervalMs = System.currentTimeMillis() - lastSyncMs;
long toSleepMs = mSyncIntervalMs - lastIntervalMs;
if (toSleepMs > 0) {
CommonUtils.sleepMs(LOG, toSleepMs);
} else {
LOG.warn("Fetch took: " + lastIntervalMs + ", expected: " + mFetchIntervalMs);
LOG.warn("Sync took: " + lastIntervalMs + ", expected: " + mSyncIntervalMs);
}

// Send the fetch
// Send the sync
try {
Set<Integer> pinList = mMasterClient.worker_getPinIdList();
mBlockDataManager.updatePinList(pinList);
lastFetchMs = System.currentTimeMillis();
lastSyncMs = System.currentTimeMillis();
} catch (IOException ioe) {
// An error occurred, retry after 1 second or error if fetch timeout is reached
// An error occurred, retry after 1 second or error if sync timeout is reached
LOG.error("Failed to receive pinlist.", ioe);
resetMasterClient();
CommonUtils.sleepMs(LOG, Constants.SECOND_MS);
if (System.currentTimeMillis() - lastFetchMs >= mFetchTimeoutMs) {
throw new RuntimeException("Master fetch timeout exceeded: " + mFetchTimeoutMs);
if (System.currentTimeMillis() - lastSyncMs >= mSyncTimeoutMs) {
throw new RuntimeException("Master sync timeout exceeded: " + mSyncTimeoutMs);
}
}
}
}

/**
* Stops the fetching, once this method is called, the object should be discarded
* Stops the syncing, once this method is called, the object should be discarded
*/
public void stop() {
mRunning = false;
Expand Down
Expand Up @@ -498,7 +498,9 @@ private void freeSpaceInternal(long userId, long availableBytes, BlockStoreLocat
*/
@Override
public void updatePinnedInodes(Set<Integer> inodes) {
mPinnedInodes.clear();
mPinnedInodes.addAll(Preconditions.checkNotNull(inodes));
synchronized (mPinnedInodes) {
mPinnedInodes.clear();
mPinnedInodes.addAll(Preconditions.checkNotNull(inodes));
}
}
}

0 comments on commit fd88ae0

Please sign in to comment.