Skip to content

Commit

Permalink
Improve journal handling and code comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gpang committed Sep 5, 2015
1 parent 25539fd commit 9467a60
Show file tree
Hide file tree
Showing 17 changed files with 264 additions and 189 deletions.
73 changes: 46 additions & 27 deletions servers/src/main/java/tachyon/master/MasterBase.java
Expand Up @@ -27,23 +27,30 @@
import tachyon.master.journal.Journal; import tachyon.master.journal.Journal;
import tachyon.master.journal.JournalEntry; import tachyon.master.journal.JournalEntry;
import tachyon.master.journal.JournalInputStream; import tachyon.master.journal.JournalInputStream;
import tachyon.master.journal.JournalOutputStream;
import tachyon.master.journal.JournalSerializable; import tachyon.master.journal.JournalSerializable;
import tachyon.master.journal.JournalTailer; import tachyon.master.journal.JournalTailer;
import tachyon.master.journal.JournalTailerThread; import tachyon.master.journal.JournalTailerThread;
import tachyon.master.journal.JournalWriter; import tachyon.master.journal.JournalWriter;


/**
* This is the base class for all masters, and contains common functionality. Common functionality
* mostly consists of journal operations, like initializing it, tailing it when in standby mode, or
* writing to it when the master is the leader.
*/
public abstract class MasterBase implements Master { public abstract class MasterBase implements Master {
private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE); private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);


/** A handler to the journal for this master. */
private final Journal mJournal; private final Journal mJournal;
/** The executor used for running maintenance threads for the master. */
private final ExecutorService mExecutorService; private final ExecutorService mExecutorService;


/** true if this master is in leader mode, and not standby mode. */ /** true if this master is in leader mode, and not standby mode. */
private boolean mIsLeader = false; private boolean mIsLeader = false;

/** The thread that tails the journal when the master is in standby mode. */ /** The thread that tails the journal when the master is in standby mode. */
private JournalTailerThread mStandbyJournalTailer = null; private JournalTailerThread mStandbyJournalTailer = null;

/** The journal writer for when the master is the leader. */
private JournalWriter mJournalWriter = null; private JournalWriter mJournalWriter = null;


protected MasterBase(Journal journal, ExecutorService executorService) { protected MasterBase(Journal journal, ExecutorService executorService) {
Expand All @@ -62,39 +69,48 @@ public void processJournalCheckpoint(JournalInputStream inputStream) throws IOEx


@Override @Override
public void start(boolean isLeader) throws IOException { public void start(boolean isLeader) throws IOException {
LOG.info("Starting master. isLeader: " + isLeader); LOG.info(getProcessorName() + ": Starting master. isLeader: " + isLeader);
mIsLeader = isLeader; mIsLeader = isLeader;
if (mIsLeader) { if (mIsLeader) {
// Replay all the state of the checkpoint and the completed log files. mJournalWriter = mJournal.getNewWriter();

/**
* The sequence for dealing with the journal before starting as the leader:
*
* Phase 1. Mark all the logs as completed. Since this master is the leader, it is allowed to
* write the journal, so it can mark the current log as completed. After this step, the
* current log file will not exist, and all logs will be complete.
*
* Phase 2. Reconstruct the state from the journal. This uses the JournalTailer to process all
* of the checkpoint and the complete log files. Since all logs are complete, after this step,
* the master will reflect the state of all of the journal entries.
*
* Phase 3. Write out the checkpoint file. Since this master is completely up-to-date, it
* writes out the checkpoint file. When the checkpoint file is closed, it will then delete the
* complete log files.
*/

// Phase 1: Mark all logs as complete, including the current log. After this call, the current
// log should not exist, and all the log files will be complete.
mJournalWriter.completeAllLogs();

// Phase 2: Replay all the state of the checkpoint and the completed log files.
// TODO: only do this if this is a fresh start, not if this master had already been tailing // TODO: only do this if this is a fresh start, not if this master had already been tailing
// the journal. // the journal.
LOG.info(getProcessorName() + ": process completed logs before becoming master."); LOG.info(getProcessorName() + ": process completed logs before becoming master.");
JournalTailer catchupTailer = new JournalTailer(this, mJournal); JournalTailer catchupTailer = new JournalTailer(this, mJournal);
boolean checkpointExists = true; if (catchupTailer.checkpointExists()) {
try {
catchupTailer.getCheckpointLastModifiedTimeMs();
} catch (IOException ioe) {
// The checkpoint doesn't exist yet. This is probably the first execution ever, or this is a
// testing master.
checkpointExists = false;
}
if (checkpointExists) {
catchupTailer.processJournalCheckpoint(true); catchupTailer.processJournalCheckpoint(true);
catchupTailer.processNextJournalLogFiles(); catchupTailer.processNextJournalLogFiles();
} }

long latestSequenceNumber = catchupTailer.getLatestSequenceNumber();
// initialize the journal and write out the checkpoint file (the state of all completed logs).
mJournalWriter = mJournal.getNewWriter(); // Phase 3: initialize the journal and write out the checkpoint file (the state of all
writeToJournal(mJournalWriter.getCheckpointOutputStream()); // completed logs).
mJournalWriter.getCheckpointOutputStream().close(); JournalOutputStream checkpointStream =

mJournalWriter.getCheckpointOutputStream(latestSequenceNumber);
// Final catchup stage. The last in-progress file (if it existed) was marked as complete when writeToJournal(checkpointStream);
// the checkpoint file was closed. That last completed file must be processed to get to the checkpointStream.close();
// latest state. Read and process the completed file.
LOG.info(getProcessorName() + ": process the last completed log before becoming master.");
catchupTailer = new JournalTailer(this, mJournal);
catchupTailer.processJournalCheckpoint(false);
catchupTailer.processNextJournalLogFiles();
} else { } else {
// in standby mode. Start the journal tailer thread. // in standby mode. Start the journal tailer thread.
mStandbyJournalTailer = new JournalTailerThread(this, mJournal); mStandbyJournalTailer = new JournalTailerThread(this, mJournal);
Expand All @@ -104,7 +120,7 @@ public void start(boolean isLeader) throws IOException {


@Override @Override
public void stop() throws IOException { public void stop() throws IOException {
LOG.info("Stopping master. isLeader: " + isLeaderMode()); LOG.info(getProcessorName() + ":Stopping master. isLeader: " + isLeaderMode());
if (isStandbyMode()) { if (isStandbyMode()) {
if (mStandbyJournalTailer != null) { if (mStandbyJournalTailer != null) {
// stop and wait for the journal tailer thread. // stop and wait for the journal tailer thread.
Expand Down Expand Up @@ -160,6 +176,9 @@ protected void flushJournal() {
} }
} }


/**
* @return the executor service for this master.
*/
protected ExecutorService getExecutorService() { protected ExecutorService getExecutorService() {
return mExecutorService; return mExecutorService;
} }
Expand Down
44 changes: 13 additions & 31 deletions servers/src/main/java/tachyon/master/file/FileSystemMaster.java
Expand Up @@ -42,7 +42,6 @@
import tachyon.master.file.journal.CompleteFileEntry; import tachyon.master.file.journal.CompleteFileEntry;
import tachyon.master.file.journal.DeleteFileEntry; import tachyon.master.file.journal.DeleteFileEntry;
import tachyon.master.file.journal.DependencyEntry; import tachyon.master.file.journal.DependencyEntry;
import tachyon.master.file.journal.FreeEntry;
import tachyon.master.file.journal.InodeDirectoryIdGeneratorEntry; import tachyon.master.file.journal.InodeDirectoryIdGeneratorEntry;
import tachyon.master.file.journal.InodeEntry; import tachyon.master.file.journal.InodeEntry;
import tachyon.master.file.journal.RenameEntry; import tachyon.master.file.journal.RenameEntry;
Expand Down Expand Up @@ -144,8 +143,6 @@ public void processJournalEntry(JournalEntry entry) throws IOException {
completeFileFromEntry((CompleteFileEntry) entry); completeFileFromEntry((CompleteFileEntry) entry);
} else if (entry instanceof AddCheckpointEntry) { } else if (entry instanceof AddCheckpointEntry) {
completeFileCheckpointFromEntry((AddCheckpointEntry) entry); completeFileCheckpointFromEntry((AddCheckpointEntry) entry);
} else if (entry instanceof FreeEntry) {
freeFromEntry((FreeEntry) entry);
} else if (entry instanceof SetPinnedEntry) { } else if (entry instanceof SetPinnedEntry) {
setPinnedFromEntry((SetPinnedEntry) entry); setPinnedFromEntry((SetPinnedEntry) entry);
} else if (entry instanceof DeleteFileEntry) { } else if (entry instanceof DeleteFileEntry) {
Expand Down Expand Up @@ -799,41 +796,26 @@ public boolean free(long fileId, boolean recursive)
// true // true
return false; return false;
} }
freeInternal(inode);
writeJournalEntry(new FreeEntry(fileId));
flushJournal();
}

return true;
}


private void freeInternal(Inode inode) { List<Inode> freeInodes = new ArrayList<Inode>();
List<Inode> freeInodes = new ArrayList<Inode>(); freeInodes.add(inode);
freeInodes.add(inode); if (inode.isDirectory()) {
if (inode.isDirectory()) { freeInodes.addAll(mInodeTree.getInodeChildrenRecursive((InodeDirectory) inode));
freeInodes.addAll(mInodeTree.getInodeChildrenRecursive((InodeDirectory) inode)); }
}


// We go through each inode. // We go through each inode.
for (int i = freeInodes.size() - 1; i >= 0; i--) { for (int i = freeInodes.size() - 1; i >= 0; i --) {
Inode freeInode = freeInodes.get(i); Inode freeInode = freeInodes.get(i);


if (freeInode.isFile()) { if (freeInode.isFile()) {
// Remove corresponding blocks from workers. // Remove corresponding blocks from workers.
mBlockMaster.removeBlocks(((InodeFile) freeInode).getBlockIds()); mBlockMaster.removeBlocks(((InodeFile) freeInode).getBlockIds());
}
} }
} }
return true;
} }


private void freeFromEntry(FreeEntry entry) {
try {
freeInternal(mInodeTree.getInodeById(entry.getId()));
} catch (FileDoesNotExistException fdnee) {
throw new RuntimeException(fdnee);
}
}


/** /**
* Gets the path of a file with the given id * Gets the path of a file with the given id
* *
Expand Down
47 changes: 0 additions & 47 deletions servers/src/main/java/tachyon/master/file/journal/FreeEntry.java

This file was deleted.

Expand Up @@ -34,6 +34,7 @@ public InodeDirectoryEntry(long creationTimeMs, long id, String name, long paren
public InodeDirectory toInodeDirectory() { public InodeDirectory toInodeDirectory() {
InodeDirectory inode = new InodeDirectory(mName, mId, mParentId, mCreationTimeMs); InodeDirectory inode = new InodeDirectory(mName, mId, mParentId, mCreationTimeMs);
inode.setPinned(mIsPinned); inode.setPinned(mIsPinned);
inode.setLastModificationTimeMs(mLastModificationTimeMs);
return inode; return inode;
} }


Expand Down
Expand Up @@ -50,8 +50,13 @@ public InodeFile toInodeFile() {
if (mIsComplete) { if (mIsComplete) {
inode.setComplete(mLength); inode.setComplete(mLength);
} }
if (mBlocks != null) {
inode.setBlockIds(mBlocks);
}
inode.setPinned(mIsPinned); inode.setPinned(mIsPinned);
inode.setCache(mIsCache); inode.setCache(mIsCache);
inode.setLastModificationTimeMs(mLastModificationTimeMs);
inode.setUfsPath(mUfsPath);


return inode; return inode;
} }
Expand Down
50 changes: 40 additions & 10 deletions servers/src/main/java/tachyon/master/journal/Journal.java
Expand Up @@ -28,45 +28,63 @@
* completed entry files are in the "completed/" sub-directory. * completed entry files are in the "completed/" sub-directory.
*/ */
public class Journal { public class Journal {
/** The log number for the first completed log file. */
public static final int FIRST_COMPLETED_LOG_NUMBER = 1; public static final int FIRST_COMPLETED_LOG_NUMBER = 1;
/** The directory for completed log files, relative to the base journal directory. */
private static final String COMPLETED_DIRECTORY = "completed/"; private static final String COMPLETED_DIRECTORY = "completed/";
/** The file extension for the current log file. */
private static final String CURRENT_LOG_EXTENSION = ".out"; private static final String CURRENT_LOG_EXTENSION = ".out";

// TODO: should this be a config parameter?
/** The filename of the checkpoint file. */ /** The filename of the checkpoint file. */
private final String mCheckpointFilename = "checkpoint.data"; private static final String CHECKPOINT_FILENAME = "checkpoint.data";
// TODO: should this be a config parameter?
/** The base of the entry log filenames, without the file extension. */ /** The base of the entry log filenames, without the file extension. */
private final String mEntryLogFilenameBase = "log"; private static final String ENTRY_LOG_FILENAME_BASE = "log";
private final String mDirectory;
private final TachyonConf mTachyonConf; private final TachyonConf mTachyonConf;
/** The directory where this journal is stored. */
private final String mDirectory;
/** The formatter for this journal. */
private final JournalFormatter mJournalFormatter; private final JournalFormatter mJournalFormatter;


/**
* @param directory the base directory for this journal
* @param tachyonConf the tachyon conf
*/
public Journal(String directory, TachyonConf tachyonConf) { public Journal(String directory, TachyonConf tachyonConf) {
if (!directory.endsWith(TachyonURI.SEPARATOR)) { if (!directory.endsWith(TachyonURI.SEPARATOR)) {
// Ensure directory format. // Ensure directory format.
directory += TachyonURI.SEPARATOR; directory += TachyonURI.SEPARATOR;
} }
mDirectory = directory; mDirectory = directory;
mTachyonConf = tachyonConf; mTachyonConf = tachyonConf;
// TODO: maybe this can be constructed, specified by a parameter in tachyonConf.
mJournalFormatter = new JsonJournalFormatter(); mJournalFormatter = new JsonJournalFormatter();
} }


/**
* @return the base directory for this journal
*/
public String getDirectory() { public String getDirectory() {
return mDirectory; return mDirectory;
} }


/**
* @return the directory for where the completed log files are stored
*/
public String getCompletedDirectory() { public String getCompletedDirectory() {
return mDirectory + COMPLETED_DIRECTORY; return mDirectory + COMPLETED_DIRECTORY;
} }


/**
* @return the absolute path for the journal checkpoint file
*/
public String getCheckpointFilePath() { public String getCheckpointFilePath() {
return mDirectory + mCheckpointFilename; return mDirectory + CHECKPOINT_FILENAME;
} }


/**
* @return the absolute path for the current log file.
*/
public String getCurrentLogFilePath() { public String getCurrentLogFilePath() {
return mDirectory + mEntryLogFilenameBase + CURRENT_LOG_EXTENSION; return mDirectory + ENTRY_LOG_FILENAME_BASE + CURRENT_LOG_EXTENSION;
} }


/** /**
Expand All @@ -76,21 +94,33 @@ public String getCurrentLogFilePath() {
* @return The absolute path of the completed log for a given log number. * @return The absolute path of the completed log for a given log number.
*/ */
public String getCompletedLogFilePath(int logNumber) { public String getCompletedLogFilePath(int logNumber) {
return getCompletedDirectory() + String.format("%s.%07d", mEntryLogFilenameBase, logNumber); return getCompletedDirectory() + String.format("%s.%07d", ENTRY_LOG_FILENAME_BASE, logNumber);
} }


/**
* @return the formatter for this journal
*/
public JournalFormatter getJournalFormatter() { public JournalFormatter getJournalFormatter() {
return mJournalFormatter; return mJournalFormatter;
} }


/**
* @return a readonly version of this journal
*/
public ReadOnlyJournal getReadOnlyJournal() { public ReadOnlyJournal getReadOnlyJournal() {
return new ReadOnlyJournal(mDirectory, mTachyonConf); return new ReadOnlyJournal(mDirectory, mTachyonConf);
} }


/**
* @return the writer for this journal
*/
public JournalWriter getNewWriter() { public JournalWriter getNewWriter() {
return new JournalWriter(this, mTachyonConf); return new JournalWriter(this, mTachyonConf);
} }


/**
* @return the reader for this journal
*/
public JournalReader getNewReader() { public JournalReader getNewReader() {
return new JournalReader(this, mTachyonConf); return new JournalReader(this, mTachyonConf);
} }
Expand Down
Expand Up @@ -20,10 +20,14 @@
// TODO: In the future, implementations of this interface can be represented as ProtoBuf // TODO: In the future, implementations of this interface can be represented as ProtoBuf
public interface JournalEntry { public interface JournalEntry {


/**
* @return the {@link JournalEntryType} of this entry.
*/
JournalEntryType getType(); JournalEntryType getType();


/** /**
* Gets parameters of this entry which is a map from parameter name to parameter value. *
* @return parameters of this entry which is a map from parameter name to parameter value.
*/ */
Map<String, Object> getParameters(); Map<String, Object> getParameters();
} }

0 comments on commit 9467a60

Please sign in to comment.