Skip to content

Commit

Permalink
Addressing review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
jsimsa committed Oct 12, 2015
1 parent 9accfda commit 5e418c1
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 38 deletions.
59 changes: 29 additions & 30 deletions common/src/main/java/tachyon/heartbeat/HeartbeatContext.java
Expand Up @@ -26,15 +26,15 @@
/** /**
* This class is a singleton for storing and retrieving heartbeat related information. * This class is a singleton for storing and retrieving heartbeat related information.
*/ */
public class HeartbeatContext { public final class HeartbeatContext {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
private static Map<String, String> sExecutorTimerClasses; private static Map<String, Class<HeartbeatTimer>> sTimerClasses;


// Names of different heartbeat timer classes. // Names of different heartbeat timer classes.
public static final String SLEEPING_TIMER_CLASS = "tachyon.heartbeat.SleepingTimer"; public static final Class<HeartbeatTimer> SCHEDULED_TIMER_CLASS;
public static final String SCHEDULED_TIMER_CLASS = "tachyon.heartbeat.ScheduledTimer"; public static final Class<HeartbeatTimer> SLEEPING_TIMER_CLASS;


// Names of different hearbeat executors. // Names of different heartbeat executors.
public static final String MASTER_CHECKPOINT_SCHEDULING = "Master Checkpoint Scheduling"; public static final String MASTER_CHECKPOINT_SCHEDULING = "Master Checkpoint Scheduling";
public static final String MASTER_FILE_RECOMPUTATION = "Master File Recomputation"; public static final String MASTER_FILE_RECOMPUTATION = "Master File Recomputation";
public static final String MASTER_LOST_WORKER_DETECTION = "Master Lost Worker Detection"; public static final String MASTER_LOST_WORKER_DETECTION = "Master Lost Worker Detection";
Expand All @@ -43,39 +43,38 @@ public class HeartbeatContext {
public static final String WORKER_CLIENT = "Worker Client"; public static final String WORKER_CLIENT = "Worker Client";


static { static {
sExecutorTimerClasses = new HashMap<String, String>(); try {
sExecutorTimerClasses.put(MASTER_CHECKPOINT_SCHEDULING, SLEEPING_TIMER_CLASS); SCHEDULED_TIMER_CLASS =
sExecutorTimerClasses.put(MASTER_FILE_RECOMPUTATION, SLEEPING_TIMER_CLASS); (Class<HeartbeatTimer>) Class.forName("tachyon.heartbeat.ScheduledTimer");
sExecutorTimerClasses.put(MASTER_LOST_WORKER_DETECTION, SLEEPING_TIMER_CLASS); SLEEPING_TIMER_CLASS =
sExecutorTimerClasses.put(MASTER_TTL_CHECK, SLEEPING_TIMER_CLASS); (Class<HeartbeatTimer>) Class.forName("tachyon.heartbeat.SleepingTimer");
sExecutorTimerClasses.put(WORKER_LINEAGE_SYNC, SLEEPING_TIMER_CLASS); } catch (Exception e) {
sExecutorTimerClasses.put(WORKER_CLIENT, SLEEPING_TIMER_CLASS); throw new RuntimeException("requested class could not be loaded: " + e.getMessage());
}
sTimerClasses = new HashMap<String, Class<HeartbeatTimer>>();
sTimerClasses.put(MASTER_CHECKPOINT_SCHEDULING, SLEEPING_TIMER_CLASS);
sTimerClasses.put(MASTER_FILE_RECOMPUTATION, SLEEPING_TIMER_CLASS);
sTimerClasses.put(MASTER_LOST_WORKER_DETECTION, SLEEPING_TIMER_CLASS);
sTimerClasses.put(MASTER_TTL_CHECK, SLEEPING_TIMER_CLASS);
sTimerClasses.put(WORKER_LINEAGE_SYNC, SLEEPING_TIMER_CLASS);
sTimerClasses.put(WORKER_CLIENT, SLEEPING_TIMER_CLASS);
} }


private HeartbeatContext() {} // to prevent initialization

/** /**
* @param name a name of a heartbeat executor * @param name a name of a heartbeat executor thread
* @return the timer class to use for the executor * @return the timer class to use for the executor thread
*/ */
public static synchronized Class<HeartbeatTimer> getTimerClass(String name) { public static synchronized Class<HeartbeatTimer> getTimerClass(String name) {
String className = sExecutorTimerClasses.get(name); return sTimerClasses.get(name);
if (name == null) {
LOG.error("timer class for executor " + name + " not found");
return null;
}
try {
return (Class<HeartbeatTimer>) Class.forName(className);
} catch (Exception e) {
String msg = "requested class could not be loaded";
LOG.error("{} : {} , {}", msg, className, e);
}
return null;
} }


/** /**
* @param name a name of a heartbeat executor * @param name a name of a heartbeat executor thread
* @param className the timer class to use for the executor * @param timerClass the timer class to use for the executor thread
*/ */
public static synchronized void setTimerClass(String name, String className) { public static synchronized void setTimerClass(String name, Class<HeartbeatTimer> timerClass) {
sExecutorTimerClasses.put(name, className); sTimerClasses.put(name, timerClass);
} }
} }
3 changes: 3 additions & 0 deletions common/src/main/java/tachyon/heartbeat/HeartbeatExecutor.java
Expand Up @@ -20,5 +20,8 @@
* {@link #heartbeat()} method. * {@link #heartbeat()} method.
*/ */
public interface HeartbeatExecutor { public interface HeartbeatExecutor {
/**
* Implements the heartbeat logic.
*/
void heartbeat(); void heartbeat();
} }
30 changes: 26 additions & 4 deletions common/src/main/java/tachyon/heartbeat/HeartbeatScheduler.java
Expand Up @@ -18,6 +18,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -63,7 +64,7 @@ public static synchronized Set<String> getThreadNames() {
/** /**
* Schedules execution of a heartbeat for the given thread. * Schedules execution of a heartbeat for the given thread.
* *
* @param threadName the thread for which heartbeat is to be executed * @param threadName a name of the thread for which heartbeat is to be executed
*/ */
public static void schedule(String threadName) { public static void schedule(String threadName) {
sLock.lock(); sLock.lock();
Expand All @@ -80,13 +81,34 @@ public static void schedule(String threadName) {
/** /**
* Waits until the given thread can be executed. * Waits until the given thread can be executed.
* *
* @param threadName the thread to wait for * @param name a name of the thread to wait for
*/ */
public static void await(String threadName) throws InterruptedException { public static void await(String name) throws InterruptedException {
sLock.lock(); sLock.lock();
while (!sTimers.containsKey(threadName)) { while (!sTimers.containsKey(name)) {
sCondition.await(); sCondition.await();
} }
sLock.unlock(); sLock.unlock();
} }

/**
* Waits until the given thread can be executed or the given timeout expires.
*
* @param name a name of the thread to wait for
* @param time the maximum time to wait
* @param unit the time unit of the {@code time} argument
* @return {@code false} if the waiting time detectably elapsed before return from the method,
* else {@code true}
*/
public static boolean await(String name, long time, TimeUnit unit) throws InterruptedException {
sLock.lock();
while (!sTimers.containsKey(name)) {
if (!sCondition.await(time, unit)) {
sLock.unlock();
return false;
}
}
sLock.unlock();
return true;
}
} }
5 changes: 5 additions & 0 deletions common/src/main/java/tachyon/heartbeat/HeartbeatTimer.java
Expand Up @@ -19,5 +19,10 @@
* An interface for heartbeat timers. The {@link HeartbeatThread} calls the {@link #tick()} method. * An interface for heartbeat timers. The {@link HeartbeatThread} calls the {@link #tick()} method.
*/ */
public interface HeartbeatTimer { public interface HeartbeatTimer {
/**
* Waits until next heartbeat should be executed.
*
* @throws InterruptedException if the thread is interrupted while waiting
*/
void tick() throws InterruptedException; void tick() throws InterruptedException;
} }
4 changes: 2 additions & 2 deletions common/src/main/java/tachyon/heartbeat/SleepingTimer.java
Expand Up @@ -50,8 +50,8 @@ public void tick() throws InterruptedException {
long currentTickMs = System.currentTimeMillis(); long currentTickMs = System.currentTimeMillis();
long executionTimeMs = currentTickMs - mPreviousTickMs; long executionTimeMs = currentTickMs - mPreviousTickMs;
if (executionTimeMs > mIntervalMs) { if (executionTimeMs > mIntervalMs) {
LOG.warn(mThreadName + " last execution took " + executionTimeMs + " ms. Longer than " LOG.warn(mThreadName + " last execution took " + executionTimeMs
+ " the mFixedExecutionIntervalMs " + mIntervalMs); + " ms. Longer than the interval " + mIntervalMs);
} else { } else {
Thread.sleep(mIntervalMs - executionTimeMs); Thread.sleep(mIntervalMs - executionTimeMs);
} }
Expand Down
Expand Up @@ -16,6 +16,7 @@
package tachyon.master.file; package tachyon.master.file;


import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;


import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
Expand Down Expand Up @@ -127,9 +128,11 @@ public void createFileWithTTLTest() throws Exception {
long fileId = mFileSystemMaster.create(NESTED_FILE_URI, Constants.KB, true, 0); long fileId = mFileSystemMaster.create(NESTED_FILE_URI, Constants.KB, true, 0);
FileInfo fileInfo = mFileSystemMaster.getFileInfo(fileId); FileInfo fileInfo = mFileSystemMaster.getFileInfo(fileId);
Assert.assertEquals(fileInfo.fileId, fileId); Assert.assertEquals(fileInfo.fileId, fileId);
HeartbeatScheduler.await(HeartbeatContext.MASTER_TTL_CHECK); Assert.assertTrue(HeartbeatScheduler.await(HeartbeatContext.MASTER_TTL_CHECK, 1,
TimeUnit.SECONDS));
HeartbeatScheduler.schedule(HeartbeatContext.MASTER_TTL_CHECK); HeartbeatScheduler.schedule(HeartbeatContext.MASTER_TTL_CHECK);
HeartbeatScheduler.await(HeartbeatContext.MASTER_TTL_CHECK); Assert.assertTrue(HeartbeatScheduler.await(HeartbeatContext.MASTER_TTL_CHECK, 1,
TimeUnit.SECONDS));
mThrown.expect(FileDoesNotExistException.class); mThrown.expect(FileDoesNotExistException.class);
mFileSystemMaster.getFileInfo(fileId); mFileSystemMaster.getFileInfo(fileId);
} }
Expand Down

0 comments on commit 5e418c1

Please sign in to comment.