Skip to content

Commit

Permalink
Merge pull request #2366 from yupeng9/faultTolerant
Browse files Browse the repository at this point in the history
[TACHYON-1456] Add fault-tolerance support to async persistence
  • Loading branch information
yupeng9 committed Jan 11, 2016
2 parents 0a6a6e0 + b292928 commit 799b08b
Show file tree
Hide file tree
Showing 16 changed files with 992 additions and 1,618 deletions.
2 changes: 2 additions & 0 deletions common/src/main/java/tachyon/heartbeat/HeartbeatContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public final class HeartbeatContext {
// Names of different heartbeat executors.
public static final String MASTER_CHECKPOINT_SCHEDULING = "Master Checkpoint Scheduling";
public static final String MASTER_FILE_RECOMPUTATION = "Master File Recomputation";
public static final String MASTER_LOST_FILES_DETECTION = "Master Lost Files Detection";
public static final String MASTER_LOST_WORKER_DETECTION = "Master Lost Worker Detection";
public static final String MASTER_TTL_CHECK = "Master TTL Check";
public static final String WORKER_BLOCK_SYNC = "Worker Block Sync";
Expand All @@ -42,6 +43,7 @@ public final class HeartbeatContext {
sTimerClasses = new HashMap<String, Class<? extends HeartbeatTimer>>();
sTimerClasses.put(MASTER_CHECKPOINT_SCHEDULING, SLEEPING_TIMER_CLASS);
sTimerClasses.put(MASTER_FILE_RECOMPUTATION, SLEEPING_TIMER_CLASS);
sTimerClasses.put(MASTER_LOST_FILES_DETECTION, SLEEPING_TIMER_CLASS);
sTimerClasses.put(MASTER_LOST_WORKER_DETECTION, SLEEPING_TIMER_CLASS);
sTimerClasses.put(MASTER_TTL_CHECK, SLEEPING_TIMER_CLASS);
sTimerClasses.put(WORKER_FILESYSTEM_MASTER_SYNC, SLEEPING_TIMER_CLASS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ public void lineageCompleteAndAsyncPersistTest() throws Exception {
fileInfo.getPersistenceState());

// sleep and wait for worker to persist the file
CommonUtils.sleepMs(5);
// TODO(yupeng) use a deterministic way of control the completion of the task, to avoid flaky
// tests
CommonUtils.sleepMs(200);

// worker notifies the master
HeartbeatScheduler.schedule(HeartbeatContext.WORKER_FILESYSTEM_MASTER_SYNC);
Expand Down
75 changes: 49 additions & 26 deletions servers/src/main/java/tachyon/master/file/FileSystemMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import tachyon.master.journal.JournalOutputStream;
import tachyon.master.journal.JournalProtoUtils;
import tachyon.proto.journal.File.AddMountPointEntry;
import tachyon.proto.journal.File.AsyncPersistRequestEntry;
import tachyon.proto.journal.File.CompleteFileEntry;
import tachyon.proto.journal.File.DeleteFileEntry;
import tachyon.proto.journal.File.DeleteMountPointEntry;
Expand All @@ -89,7 +90,6 @@
import tachyon.proto.journal.File.RenameEntry;
import tachyon.proto.journal.File.SetStateEntry;
import tachyon.proto.journal.Journal.JournalEntry;
import tachyon.proto.journal.Lineage.PersistFilesRequestEntry;
import tachyon.security.authorization.PermissionStatus;
import tachyon.thrift.BlockInfo;
import tachyon.thrift.BlockLocation;
Expand Down Expand Up @@ -134,6 +134,12 @@ public final class FileSystemMaster extends MasterBase {
@SuppressFBWarnings("URF_UNREAD_FIELD")
private Future<?> mTtlCheckerService;

/**
* The service that detects lost files. We store it here so that it can be accessed from tests.
*/
@SuppressFBWarnings("URF_UNREAD_FIELD")
private Future<?> mLostFilesDetectionService;

private final TtlBucketList mTtlBuckets = new TtlBucketList();

/**
Expand Down Expand Up @@ -241,9 +247,9 @@ public void processJournalEntry(JournalEntry entry) throws IOException {
} catch (InvalidPathException e) {
throw new RuntimeException(e);
}
} else if (innerEntry instanceof PersistFilesRequestEntry) {
} else if (innerEntry instanceof AsyncPersistRequestEntry) {
try {
setPersistingState(((PersistFilesRequestEntry) innerEntry).getFileIdsList());
scheduleAsyncPersistenceInternal(((AsyncPersistRequestEntry) innerEntry).getFileId());
} catch (FileDoesNotExistException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -280,6 +286,9 @@ public void start(boolean isLeader) throws IOException {
mTtlCheckerService = getExecutorService().submit(
new HeartbeatThread(HeartbeatContext.MASTER_TTL_CHECK, new MasterInodeTtlCheckExecutor(),
MasterContext.getConf().getInt(Constants.MASTER_TTLCHECKER_INTERVAL_MS)));
mLostFilesDetectionService = getExecutorService().submit(new HeartbeatThread(
HeartbeatContext.MASTER_LOST_FILES_DETECTION, new LostFilesDetectionHeartbeatExecutor(),
MasterContext.getConf().getInt(Constants.MASTER_HEARTBEAT_INTERVAL_MS)));
}
}

Expand Down Expand Up @@ -1580,6 +1589,20 @@ public void setState(long fileId, SetStateOptions options) throws FileDoesNotExi
* @throws FileDoesNotExistException when the file does not exist
*/
public long scheduleAsyncPersistence(long fileId) throws FileDoesNotExistException {
long workerId = scheduleAsyncPersistenceInternal(fileId);

synchronized (mInodeTree) {
// write to journal
AsyncPersistRequestEntry asyncPersistRequestEntry =
AsyncPersistRequestEntry.newBuilder().setFileId(fileId).build();
writeJournalEntry(
JournalEntry.newBuilder().setAsyncPersistRequest(asyncPersistRequestEntry).build());
flushJournal();
return workerId;
}
}

private long scheduleAsyncPersistenceInternal(long fileId) throws FileDoesNotExistException {
// find the worker
long workerId = getWorkerStoringFile(fileId);

Expand All @@ -1601,9 +1624,6 @@ public long scheduleAsyncPersistence(long fileId) throws FileDoesNotExistExcepti
}
mWorkerToAsyncPersistFiles.get(workerId).add(fileId);
}

// TODO(yupeng) TACHYON-1456 add fault tolerance and flush journal

return workerId;
}

Expand Down Expand Up @@ -1692,29 +1712,9 @@ private List<PersistFile> pollFilesToCheckpoint(long workerId)
}
}
mWorkerToAsyncPersistFiles.get(workerId).removeAll(fileIdsToPersist);

// write to journal
PersistFilesRequestEntry persistFilesRequest =
PersistFilesRequestEntry.newBuilder().addAllFileIds(fileIdsToPersist).build();
writeJournalEntry(
JournalEntry.newBuilder().setPersistFilesRequest(persistFilesRequest).build());
flushJournal();
return filesToPersist;
}

/**
* Updates a list of files as being persisted.
*
* @param fileIds the id of the files
* @throws FileDoesNotExistException when a file does not exist
*/
private void setPersistingState(List<Long> fileIds) throws FileDoesNotExistException {
for (long fileId : fileIds) {
InodeFile inode = (InodeFile) mInodeTree.getInodeById(fileId);
inode.setPersistenceState(PersistenceState.IN_PROGRESS);
}
}

/**
* Instructs a worker to persist the files.
*
Expand Down Expand Up @@ -1817,4 +1817,27 @@ public void heartbeat() {
}
}
}

/**
* Lost files periodic check.
*/
private final class LostFilesDetectionHeartbeatExecutor implements HeartbeatExecutor {
@Override
public void heartbeat() {
for (long fileId : getLostFiles()) {
// update the state
synchronized (mInodeTree) {
Inode inode;
try {
inode = mInodeTree.getInodeById(fileId);
if (inode.getPersistenceState() != PersistenceState.PERSISTED) {
inode.setPersistenceState(PersistenceState.LOST);
}
} catch (FileDoesNotExistException e) {
LOG.error("Exception trying to get inode from inode tree: {}", e.toString());
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@
public enum PersistenceState {
NOT_PERSISTED, // file not persisted in the under FS
IN_PROGRESS, // used for async persistence, the async persistence is scheduled and in progress
PERSISTED // the file is persisted in the under FS
PERSISTED, // the file is persisted in the under FS
LOST // the file is lost but not persisted in the under FS
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@ public static Message unwrap(JournalEntry entry) {
return entry.getLineageIdGenerator();
case PERSIST_DIRECTORY:
return entry.getPersistDirectory();
case PERSIST_FILE:
return entry.getPersistFile();
case PERSIST_FILES_REQUEST:
return entry.getPersistFilesRequest();
case ASYNC_PERSIST_REQUEST:
return entry.getAsyncPersistRequest();
case RAW_TABLE:
return entry.getRawTable();
case REINITIALIZE_FILE:
Expand Down

0 comments on commit 799b08b

Please sign in to comment.