Skip to content

Commit

Permalink
Clean up block master sync loop.
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinjia committed Jun 27, 2015
1 parent 0e917bc commit f5ce9c7
Showing 1 changed file with 29 additions and 21 deletions.
50 changes: 29 additions & 21 deletions servers/src/main/java/tachyon/worker/block/BlockMasterSync.java
Expand Up @@ -90,6 +90,7 @@ public class BlockMasterSync implements Runnable {


/** /**
* Gets the Tachyon master address from the configuration * Gets the Tachyon master address from the configuration
*
* @return the InetSocketAddress of the master * @return the InetSocketAddress of the master
*/ */
private InetSocketAddress getMasterAddress() { private InetSocketAddress getMasterAddress() {
Expand All @@ -102,6 +103,7 @@ private InetSocketAddress getMasterAddress() {
/** /**
* Handles a master command. The command is one of Unknown, Nothing, Register, Free, or Delete. * Handles a master command. The command is one of Unknown, Nothing, Register, Free, or Delete.
* This call will block until the command is complete. * This call will block until the command is complete.
*
* @param cmd the command to execute. * @param cmd the command to execute.
* @throws IOException if an error occurs when executing the command * @throws IOException if an error occurs when executing the command
*/ */
Expand All @@ -111,7 +113,7 @@ private void handleMasterCommand(Command cmd) throws IOException {
return; return;
} }
switch (cmd.mCommandType) { switch (cmd.mCommandType) {
// Currently unused // Currently unused
case Delete: case Delete:
break; break;
// Master requests blocks to be removed from Tachyon managed space. // Master requests blocks to be removed from Tachyon managed space.
Expand All @@ -137,8 +139,9 @@ private void handleMasterCommand(Command cmd) throws IOException {
} }


/** /**
* Registers with the Tachyon master. This should be called before the continuous heartbeat * Registers with the Tachyon master. This should be called before the continuous heartbeat thread
* thread begins. The workerId will be set after this method is successful. * begins. The workerId will be set after this method is successful.
*
* @throws IOException if the registration fails * @throws IOException if the registration fails
*/ */
public void registerWithMaster() throws IOException { public void registerWithMaster() throws IOException {
Expand All @@ -164,6 +167,7 @@ private void resetMasterClient() {


/** /**
* Gets the worker id, 0 if registerWithMaster has not been called successfully. * Gets the worker id, 0 if registerWithMaster has not been called successfully.
*
* @return the worker id * @return the worker id
*/ */
public long getWorkerId() { public long getWorkerId() {
Expand All @@ -177,35 +181,39 @@ public long getWorkerId() {
@Override @Override
public void run() { public void run() {
long lastHeartbeatMs = System.currentTimeMillis(); long lastHeartbeatMs = System.currentTimeMillis();
Command cmd = null;
while (mRunning) { while (mRunning) {
long diff = System.currentTimeMillis() - lastHeartbeatMs; // Check the time since last heartbeat, and wait until it is within heartbeat interval
if (diff < mHeartbeatIntervalMs) { long lastIntervalMs = System.currentTimeMillis() - lastHeartbeatMs;
LOG.debug("Heartbeat process takes {} ms.", diff); long toSleepMs = mHeartbeatIntervalMs - lastIntervalMs;
CommonUtils.sleepMs(LOG, mHeartbeatIntervalMs - diff); if (toSleepMs > 0) {
CommonUtils.sleepMs(LOG, toSleepMs);
} else { } else {
LOG.warn("Heartbeat took " + diff + " ms, expected " + mHeartbeatIntervalMs + " ms."); LOG.warn("Heartbeat took: " + lastIntervalMs + ", expected: " + mHeartbeatIntervalMs);
} }

// Prepare metadata for the next heartbeat
BlockHeartbeatReport blockReport = mBlockDataManager.getReport();
BlockStoreMeta storeMeta = mBlockDataManager.getStoreMeta();

// Send the heartbeat and execute the response
try { try {
BlockHeartbeatReport blockReport = mBlockDataManager.getReport(); Command cmdFromMaster =
BlockStoreMeta storeMeta = mBlockDataManager.getStoreMeta();
cmd =
mMasterClient.worker_heartbeat(mWorkerId, storeMeta.getUsedBytesOnTiers(), mMasterClient.worker_heartbeat(mWorkerId, storeMeta.getUsedBytesOnTiers(),
blockReport.getRemovedBlocks(), blockReport.getAddedBlocks()); blockReport.getRemovedBlocks(), blockReport.getAddedBlocks());
lastHeartbeatMs = System.currentTimeMillis(); lastHeartbeatMs = System.currentTimeMillis();
} catch (IOException e) { handleMasterCommand(cmdFromMaster);
LOG.error(e.getMessage(), e); } catch (IOException ioe) {
// An error occurred, retry after 1 second or error if heartbeat timeout is reached
LOG.error("Failed to receive or execute master heartbeat command.", ioe);
resetMasterClient(); resetMasterClient();
CommonUtils.sleepMs(LOG, Constants.SECOND_MS); CommonUtils.sleepMs(LOG, Constants.SECOND_MS);
cmd = null; if (System.currentTimeMillis() - lastHeartbeatMs >= mHeartbeatTimeoutMs) {
diff = System.currentTimeMillis() - lastHeartbeatMs; throw new RuntimeException("Master heartbeat timeout exceeded: " + mHeartbeatTimeoutMs);
if (diff >= mHeartbeatTimeoutMs) {
throw new RuntimeException("Heartbeat timeout " + diff + "ms");
} }
} }
// TODO: Is there a way to make this async? Could take much longer than heartbeat timeout.
handleMasterCommand(cmd); // Check if any users have become zombies, if so clean them up
// TODO: This should go in its own thread // TODO: Make this unrelated to master sync
mBlockDataManager.cleanupUsers(); mBlockDataManager.cleanupUsers();
} }
} }
Expand Down

0 comments on commit f5ce9c7

Please sign in to comment.