Skip to content

Commit

Permalink
Improve comments and code for the master
Browse files Browse the repository at this point in the history
  • Loading branch information
gpang committed Sep 6, 2015
1 parent 84c1aee commit 365d5ab
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 168 deletions.
16 changes: 4 additions & 12 deletions common/src/main/java/tachyon/master/block/BlockId.java
Expand Up @@ -32,12 +32,10 @@ public final class BlockId {
private static final long SEQUENCE_NUMBER_MASK = (1L << SEQUENCE_NUMBER_BITS) - 1; private static final long SEQUENCE_NUMBER_MASK = (1L << SEQUENCE_NUMBER_BITS) - 1;


private BlockId() { private BlockId() {
// util class // prevent instantiation of a util class
} }


/** /**
* Returns a block id with the specified container id and sequence number.
*
* @param containerId the container id to create the block id with * @param containerId the container id to create the block id with
* @param sequenceNumber the sequence number to create the block id with * @param sequenceNumber the sequence number to create the block id with
* @return the block id constructed with the container id and sequence number * @return the block id constructed with the container id and sequence number
Expand All @@ -49,29 +47,23 @@ public static long createBlockId(long containerId, long sequenceNumber) {
} }


/** /**
* Returns the container id of the specified block id.
*
* @param blockId the block id to get the container id for * @param blockId the block id to get the container id for
* @return the container id * @return the container id of a specified block id
*/ */
public static long getContainerId(long blockId) { public static long getContainerId(long blockId) {
return (blockId >> SEQUENCE_NUMBER_BITS) & CONTAINER_ID_MASK; return (blockId >> SEQUENCE_NUMBER_BITS) & CONTAINER_ID_MASK;
} }


/** /**
* Returns the sequence number of the specified block id.
*
* @param blockId the block id to get the sequene number for * @param blockId the block id to get the sequene number for
* @return the sequence number * @return the sequence number of the specified block id
*/ */
public static long getSequenceNumber(long blockId) { public static long getSequenceNumber(long blockId) {
return blockId & SEQUENCE_NUMBER_MASK; return blockId & SEQUENCE_NUMBER_MASK;
} }


/** /**
* Returns the maximum possible sequence number for block ids. * @return the maximum possible sequence number for block ids
*
* @return the maximum possible sequence number
*/ */
public static long getMaxSequenceNumber() { public static long getMaxSequenceNumber() {
return SEQUENCE_NUMBER_MASK; return SEQUENCE_NUMBER_MASK;
Expand Down
1 change: 0 additions & 1 deletion common/src/thrift/tachyon.thrift
Expand Up @@ -44,7 +44,6 @@ struct FileBlockInfo {
4: list<NetAddress> locations 4: list<NetAddress> locations
} }


// TODO: make fileId into i64
struct FileInfo { struct FileInfo {
1: i64 fileId 1: i64 fileId
2: string name 2: string name
Expand Down
2 changes: 1 addition & 1 deletion servers/src/main/java/tachyon/master/Master.java
Expand Up @@ -36,7 +36,7 @@ public interface Master extends JournalCheckpointStreamable {
* *
* @return a {@link String} representing this master service name. * @return a {@link String} representing this master service name.
*/ */
String getProcessorName(); String getServiceName();


/** /**
* Processes the journal checkpoint file. * Processes the journal checkpoint file.
Expand Down
24 changes: 12 additions & 12 deletions servers/src/main/java/tachyon/master/MasterBase.java
Expand Up @@ -60,15 +60,18 @@ protected MasterBase(Journal journal, ExecutorService executorService) {
@Override @Override
public void processJournalCheckpoint(JournalInputStream inputStream) throws IOException { public void processJournalCheckpoint(JournalInputStream inputStream) throws IOException {
JournalEntry entry; JournalEntry entry;
while ((entry = inputStream.getNextEntry()) != null) { try {
processJournalEntry(entry); while ((entry = inputStream.getNextEntry()) != null) {
processJournalEntry(entry);
}
} finally {
inputStream.close();
} }
inputStream.close();
} }


@Override @Override
public void start(boolean isLeader) throws IOException { public void start(boolean isLeader) throws IOException {
LOG.info(getProcessorName() + ": Starting master. isLeader: " + isLeader); LOG.info(getServiceName() + ": Starting master. isLeader: " + isLeader);
mIsLeader = isLeader; mIsLeader = isLeader;
if (mIsLeader) { if (mIsLeader) {
mJournalWriter = mJournal.getNewWriter(); mJournalWriter = mJournal.getNewWriter();
Expand Down Expand Up @@ -96,7 +99,7 @@ public void start(boolean isLeader) throws IOException {
// Phase 2: Replay all the state of the checkpoint and the completed log files. // Phase 2: Replay all the state of the checkpoint and the completed log files.
// TODO: only do this if this is a fresh start, not if this master had already been tailing // TODO: only do this if this is a fresh start, not if this master had already been tailing
// the journal. // the journal.
LOG.info(getProcessorName() + ": process completed logs before becoming master."); LOG.info(getServiceName() + ": process completed logs before becoming master.");
JournalTailer catchupTailer = new JournalTailer(this, mJournal); JournalTailer catchupTailer = new JournalTailer(this, mJournal);
if (catchupTailer.checkpointExists()) { if (catchupTailer.checkpointExists()) {
catchupTailer.processJournalCheckpoint(true); catchupTailer.processJournalCheckpoint(true);
Expand All @@ -119,11 +122,12 @@ public void start(boolean isLeader) throws IOException {


@Override @Override
public void stop() throws IOException { public void stop() throws IOException {
LOG.info(getProcessorName() + ":Stopping master. isLeader: " + isLeaderMode()); LOG.info(getServiceName() + ":Stopping master. isLeader: " + isLeaderMode());
if (isStandbyMode()) { if (isStandbyMode()) {
if (mStandbyJournalTailer != null) { if (mStandbyJournalTailer != null) {
// stop and wait for the journal tailer thread. // stop and wait for the journal tailer thread.
mStandbyJournalTailer.shutdownAndJoin(); mStandbyJournalTailer.shutdownAndJoin();
mStandbyJournalTailer = null;
} }
} else { } else {
// Stop this master. // Stop this master.
Expand All @@ -143,9 +147,7 @@ protected boolean isStandbyMode() {
} }


protected void writeJournalEntry(JournalEntry entry) { protected void writeJournalEntry(JournalEntry entry) {
if (mJournalWriter == null) { Preconditions.checkNotNull(mJournalWriter, "Cannot write entry: journal writer is null.");
throw new RuntimeException("Cannot write entry: journal writer is null.");
}
try { try {
mJournalWriter.getEntryOutputStream().writeEntry(entry); mJournalWriter.getEntryOutputStream().writeEntry(entry);
} catch (IOException ioe) { } catch (IOException ioe) {
Expand All @@ -154,9 +156,7 @@ protected void writeJournalEntry(JournalEntry entry) {
} }


protected void flushJournal() { protected void flushJournal() {
if (mJournalWriter == null) { Preconditions.checkNotNull(mJournalWriter, "Cannot write entry: journal writer is null.");
throw new RuntimeException("Cannot flush journal: Journal writer is null.");
}
try { try {
mJournalWriter.getEntryOutputStream().flush(); mJournalWriter.getEntryOutputStream().flush();
} catch (IOException ioe) { } catch (IOException ioe) {
Expand Down
1 change: 1 addition & 0 deletions servers/src/main/java/tachyon/master/MasterSource.java
Expand Up @@ -121,6 +121,7 @@ public Integer getValue() {
} }
}); });


// TODO: renable when metrics is fully implemented on the master.
/* /*
mMetricRegistry.register(MetricRegistry.name("FilesTotal"), new Gauge<Integer>() { mMetricRegistry.register(MetricRegistry.name("FilesTotal"), new Gauge<Integer>() {
@Override @Override
Expand Down
36 changes: 19 additions & 17 deletions servers/src/main/java/tachyon/master/TachyonMaster.java
Expand Up @@ -58,6 +58,7 @@ public static void main(String[] args) {
} }


try { try {
// TODO: create a master context with the tachyon conf.
TachyonConf conf = new TachyonConf(); TachyonConf conf = new TachyonConf();
TachyonMaster master; TachyonMaster master;
if (conf.getBoolean(Constants.USE_ZOOKEEPER)) { if (conf.getBoolean(Constants.USE_ZOOKEEPER)) {
Expand Down Expand Up @@ -150,80 +151,76 @@ public TachyonMaster(TachyonConf tachyonConf) {
} }


/** /**
* Returns the underlying {@link TachyonConf} instance for the Worker. * @return the underlying {@link TachyonConf} instance for the master.
*
* @return TachyonConf of the Master
*/ */
public TachyonConf getTachyonConf() { public TachyonConf getTachyonConf() {
return mTachyonConf; return mTachyonConf;
} }


/** /**
* Gets the externally resolvable address of this master. * @return the externally resolvable address of this master.
*/ */
public InetSocketAddress getMasterAddress() { public InetSocketAddress getMasterAddress() {
return mMasterAddress; return mMasterAddress;
} }


/** /**
* Gets the actual bind hostname on RPC service (used by unit test only). * @return the actual bind hostname on RPC service (used by unit test only).
*/ */
public String getRPCBindHost() { public String getRPCBindHost() {
return NetworkAddressUtils.getThriftSocket(mTServerSocket).getLocalSocketAddress().toString(); return NetworkAddressUtils.getThriftSocket(mTServerSocket).getLocalSocketAddress().toString();
} }


/** /**
* Gets the actual port that the RPC service is listening on (used by unit test only) * @return the actual port that the RPC service is listening on (used by unit test only)
*/ */
public int getRPCLocalPort() { public int getRPCLocalPort() {
return mPort; return mPort;
} }


/** /**
* Gets the actual bind hostname on web service (used by unit test only). * @return the actual bind hostname on web service (used by unit test only).
*/ */
public String getWebBindHost() { public String getWebBindHost() {
return mWebServer.getBindHost(); return mWebServer.getBindHost();
} }


/** /**
* Gets the actual port that the web service is listening on (used by unit test only) * @return the actual port that the web service is listening on (used by unit test only)
*/ */
public int getWebLocalPort() { public int getWebLocalPort() {
return mWebServer.getLocalPort(); return mWebServer.getLocalPort();
} }


/** /**
* Gets internal {@link FileSystemMaster}, for unit test only. * @return internal {@link FileSystemMaster}, for unit test only.
*/ */
public FileSystemMaster getFileSystemMaster() { public FileSystemMaster getFileSystemMaster() {
return mFileSystemMaster; return mFileSystemMaster;
} }


/** /**
* Gets internal {@link RawTableMaster}, for unit test only. * @return internal {@link RawTableMaster}, for unit test only.
*/ */
public RawTableMaster getRawTableMaster() { public RawTableMaster getRawTableMaster() {
return mRawTableMaster; return mRawTableMaster;
} }


/** /**
* Gets internal {@link BlockMaster}, for unit test only. * @return internal {@link BlockMaster}, for unit test only.
*/ */
public BlockMaster getBlockMaster() { public BlockMaster getBlockMaster() {
return mBlockMaster; return mBlockMaster;
} }


/** /**
* Gets the millisecond when Tachyon Master starts serving, return -1 when not started. * @return the millisecond when Tachyon Master starts serving, return -1 when not started.
*/ */
public long getStarttimeMs() { public long getStarttimeMs() {
return mStartTimeMs; return mStartTimeMs;
} }


/** /**
* Gets whether the system is serving the rpc server.
*
* @return true if the system is the leader (serving the rpc server), false otherwise. * @return true if the system is the leader (serving the rpc server), false otherwise.
*/ */
boolean isServing() { boolean isServing() {
Expand Down Expand Up @@ -293,10 +290,10 @@ protected void startServingWebServer() {
protected void startServingRPCServer() { protected void startServingRPCServer() {
// set up multiplexed thrift processors // set up multiplexed thrift processors
TMultiplexedProcessor processor = new TMultiplexedProcessor(); TMultiplexedProcessor processor = new TMultiplexedProcessor();
processor.registerProcessor(mBlockMaster.getProcessorName(), mBlockMaster.getProcessor()); processor.registerProcessor(mBlockMaster.getServiceName(), mBlockMaster.getProcessor());
processor.registerProcessor(mFileSystemMaster.getProcessorName(), processor.registerProcessor(mFileSystemMaster.getServiceName(),
mFileSystemMaster.getProcessor()); mFileSystemMaster.getProcessor());
processor.registerProcessor(mRawTableMaster.getProcessorName(), mRawTableMaster.getProcessor()); processor.registerProcessor(mRawTableMaster.getServiceName(), mRawTableMaster.getProcessor());


// create master thrift service with the multiplexed processor. // create master thrift service with the multiplexed processor.
mMasterServiceServer = new TThreadPoolServer(new TThreadPoolServer.Args(mTServerSocket) mMasterServiceServer = new TThreadPoolServer(new TThreadPoolServer.Args(mTServerSocket)
Expand All @@ -313,9 +310,11 @@ protected void startServingRPCServer() {
protected void stopServing() throws Exception { protected void stopServing() throws Exception {
if (mMasterServiceServer != null) { if (mMasterServiceServer != null) {
mMasterServiceServer.stop(); mMasterServiceServer.stop();
mMasterServiceServer = null;
} }
if (mWebServer != null) { if (mWebServer != null) {
mWebServer.shutdownWebServer(); mWebServer.shutdownWebServer();
mWebServer = null;
} }
mIsServing = false; mIsServing = false;
} }
Expand All @@ -330,6 +329,9 @@ protected void stopServing() throws Exception {
private boolean isJournalFormatted(String journalDirectory) throws IOException { private boolean isJournalFormatted(String journalDirectory) throws IOException {
UnderFileSystem ufs = UnderFileSystem.get(journalDirectory, mTachyonConf); UnderFileSystem ufs = UnderFileSystem.get(journalDirectory, mTachyonConf);
if (!ufs.providesStorage()) { if (!ufs.providesStorage()) {
// TODO: Should the journal really be allowed on a ufs without storage?
// This ufs doesn't provide storage. Allow the master to use this ufs for the journal.
LOG.info("Journal directory doesn't provide storage: " + journalDirectory);
return true; return true;
} }
String[] files = ufs.list(journalDirectory); String[] files = ufs.list(journalDirectory);
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;


import tachyon.Constants; import tachyon.Constants;
Expand All @@ -43,6 +44,7 @@ public final class TachyonMasterFaultTolerant extends TachyonMaster {


public TachyonMasterFaultTolerant(TachyonConf tachyonConf) { public TachyonMasterFaultTolerant(TachyonConf tachyonConf) {
super(tachyonConf); super(tachyonConf);
Preconditions.checkArgument(tachyonConf.getBoolean(Constants.USE_ZOOKEEPER));


// Set up zookeeper specific functionality. // Set up zookeeper specific functionality.
try { try {
Expand Down Expand Up @@ -78,15 +80,14 @@ public void start() throws Exception {


while (true) { while (true) {
if (mLeaderSelectorClient.isLeader()) { if (mLeaderSelectorClient.isLeader()) {
if (started) { stopMasters();
stopMasters();
}
startMasters(true); startMasters(true);
started = true; started = true;
startServing(); startServing();
} else { } else {
// This master should be standby, and not the leader // This master should be standby, and not the leader
if (isServing() || !started) { if (isServing() || !started) {
// Need to transition this master to standby mode.
stopServing(); stopServing();
stopMasters(); stopMasters();


Expand All @@ -100,6 +101,7 @@ public void start() throws Exception {
startMasters(false); startMasters(false);
started = true; started = true;
} }
// This master is already in standby mode. No further actions needed.
} }


CommonUtils.sleepMs(LOG, 100); CommonUtils.sleepMs(LOG, 100);
Expand Down

0 comments on commit 365d5ab

Please sign in to comment.