Skip to content

Commit

Permalink
Updated the worker heartbeat of file system master
Browse files Browse the repository at this point in the history
  • Loading branch information
yupeng9 committed Dec 12, 2015
1 parent 5daf302 commit 5aa73f0
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 27 deletions.
48 changes: 21 additions & 27 deletions servers/src/main/java/tachyon/master/file/FileSystemMaster.java
Expand Up @@ -51,7 +51,6 @@
import tachyon.exception.FileDoesNotExistException; import tachyon.exception.FileDoesNotExistException;
import tachyon.exception.InvalidFileSizeException; import tachyon.exception.InvalidFileSizeException;
import tachyon.exception.InvalidPathException; import tachyon.exception.InvalidPathException;
import tachyon.exception.LineageDoesNotExistException;
import tachyon.exception.PreconditionMessage; import tachyon.exception.PreconditionMessage;
import tachyon.heartbeat.HeartbeatContext; import tachyon.heartbeat.HeartbeatContext;
import tachyon.heartbeat.HeartbeatExecutor; import tachyon.heartbeat.HeartbeatExecutor;
Expand Down Expand Up @@ -94,14 +93,14 @@
import tachyon.security.authorization.PermissionStatus; import tachyon.security.authorization.PermissionStatus;
import tachyon.thrift.BlockInfo; import tachyon.thrift.BlockInfo;
import tachyon.thrift.BlockLocation; import tachyon.thrift.BlockLocation;
import tachyon.thrift.CheckpointFile;
import tachyon.thrift.CommandType; import tachyon.thrift.CommandType;
import tachyon.thrift.FileBlockInfo; import tachyon.thrift.FileBlockInfo;
import tachyon.thrift.FileInfo; import tachyon.thrift.FileInfo;
import tachyon.thrift.FileSystemMasterClientService; import tachyon.thrift.FileSystemMasterClientService;
import tachyon.thrift.FileSystemMasterWorkerService; import tachyon.thrift.FileSystemMasterWorkerService;
import tachyon.thrift.LineageCommand;
import tachyon.thrift.NetAddress; import tachyon.thrift.NetAddress;
import tachyon.thrift.PersistCommand;
import tachyon.thrift.PersistFile;
import tachyon.underfs.UnderFileSystem; import tachyon.underfs.UnderFileSystem;
import tachyon.util.IdUtils; import tachyon.util.IdUtils;
import tachyon.util.io.PathUtils; import tachyon.util.io.PathUtils;
Expand Down Expand Up @@ -1475,6 +1474,7 @@ public synchronized void scheduleAsyncPersistence(long fileId) throws FileDoesNo


/** /**
* Gets the worker where the given file is stored. * Gets the worker where the given file is stored.
*
* @param fileId the file id * @param fileId the file id
* @return the storing worker * @return the storing worker
* @throws FileDoesNotExistException when the file does not exist on any worker * @throws FileDoesNotExistException when the file does not exist on any worker
Expand All @@ -1497,39 +1497,38 @@ private long getWorkerStoringFile(long fileId) throws FileDoesNotExistException


if (workers.size() == 0) { if (workers.size() == 0) {
throw new FileDoesNotExistException("The file " + fileId + " does not exist on any worker"); throw new FileDoesNotExistException("The file " + fileId + " does not exist on any worker");
} else if(workers.size() > 1) { } else if (workers.size() > 1) {
LOG.info("the file is stored at more than one worker: " + workers); LOG.info("the file is stored at more than one worker: " + workers);
} }
return workers.get(0); return workers.get(0);
} }


/** /**
* Polls the files to send to the given worker for checkpoint * Polls the files to send to the given worker for persistence.
* *
* @param workerId the worker id * @param workerId the worker id
* @return the list of files * @return the list of files
* @throws FileDoesNotExistException if the file does not exist * @throws FileDoesNotExistException if the file does not exist
* @throws InvalidPathException if the path is invalid * @throws InvalidPathException if the path is invalid
* @throws LineageDoesNotExistException if the lineage does not exist
*/ */
private synchronized List<CheckpointFile> pollToCheckpoint(long workerId) private synchronized List<PersistFile> pollToCheckpoint(long workerId)
throws FileDoesNotExistException, InvalidPathException, LineageDoesNotExistException { throws FileDoesNotExistException, InvalidPathException {
List<CheckpointFile> files = Lists.newArrayList(); List<PersistFile> files = Lists.newArrayList();
if (!mWorkerToAsyncPersistFile.containsKey(workerId)) { if (!mWorkerToAsyncPersistFile.containsKey(workerId)) {
return files; return files;
} }


List<Long> toRequestFilePersistence = Lists.newArrayList(); List<Long> toRequestFilePersistence = Lists.newArrayList();
for (long fileId : mWorkerToAsyncPersistFile.get(workerId)) { for (long fileId : mWorkerToAsyncPersistFile.get(workerId)) {
InodeFile inode = (InodeFile)mInodeTree.getInodeById(fileId); InodeFile inode = (InodeFile) mInodeTree.getInodeById(fileId);
if (inode.isCompleted()) { if (inode.isCompleted()) {
toRequestFilePersistence.add(fileId); toRequestFilePersistence.add(fileId);
List<Long> blockIds = Lists.newArrayList(); List<Long> blockIds = Lists.newArrayList();
for (FileBlockInfo fileBlockInfo : getFileBlockInfoList(fileId)) { for (FileBlockInfo fileBlockInfo : getFileBlockInfoList(fileId)) {
blockIds.add(fileBlockInfo.blockInfo.blockId); blockIds.add(fileBlockInfo.blockInfo.blockId);
} }


CheckpointFile toCheckpoint = new CheckpointFile(fileId, blockIds); PersistFile toCheckpoint = new PersistFile(fileId, blockIds);
files.add(toCheckpoint); files.add(toCheckpoint);
// update the inode file persisence state // update the inode file persisence state
inode.setPersistenceState(FilePersistenceState.PERSISTING); inode.setPersistenceState(FilePersistenceState.PERSISTING);
Expand All @@ -1538,9 +1537,8 @@ private synchronized List<CheckpointFile> pollToCheckpoint(long workerId)


requestFilePersistence(toRequestFilePersistence); requestFilePersistence(toRequestFilePersistence);
// write to journal // write to journal
PersistFilesRequestEntry persistFilesRequest = PersistFilesRequestEntry.newBuilder() PersistFilesRequestEntry persistFilesRequest =
.addAllFileIds(toRequestFilePersistence) PersistFilesRequestEntry.newBuilder().addAllFileIds(toRequestFilePersistence).build();
.build();
writeJournalEntry( writeJournalEntry(
JournalEntry.newBuilder().setPersistFilesRequest(persistFilesRequest).build()); JournalEntry.newBuilder().setPersistFilesRequest(persistFilesRequest).build());
flushJournal(); flushJournal();
Expand All @@ -1551,44 +1549,40 @@ private synchronized List<CheckpointFile> pollToCheckpoint(long workerId)
* Request a list of files as being persisted. * Request a list of files as being persisted.
* *
* @param fileIds the id of the files * @param fileIds the id of the files
* @throws LineageDoesNotExistException if the lineage does not exist
* @throws FileDoesNotExistException when a file does not exist * @throws FileDoesNotExistException when a file does not exist
*/ */
private void requestFilePersistence(List<Long> fileIds) private void requestFilePersistence(List<Long> fileIds) throws FileDoesNotExistException {
throws LineageDoesNotExistException, FileDoesNotExistException {
if (!fileIds.isEmpty()) { if (!fileIds.isEmpty()) {
LOG.info("Request file persistency: {}", fileIds); LOG.info("Request file persistency: {}", fileIds);
} }
for (long fileId : fileIds) { for (long fileId : fileIds) {
InodeFile inode = (InodeFile)mInodeTree.getInodeById(fileId); InodeFile inode = (InodeFile) mInodeTree.getInodeById(fileId);
inode.setPersistenceState(FilePersistenceState.PERSISTING); inode.setPersistenceState(FilePersistenceState.PERSISTING);
} }
} }


/** /**
* Instructs a worker to persist the files for checkpoint. * Instructs a worker to persist the files.
* *
* @param workerId the id of the worker that heartbeats * @param workerId the id of the worker that heartbeats
* @return the command for checkpointing the blocks of a file * @return the command for persisting the blocks of a file
* @throws FileDoesNotExistException if the file does not exist * @throws FileDoesNotExistException if the file does not exist
* @throws InvalidPathException if the file path is invalid * @throws InvalidPathException if the file path is invalid
* @throws LineageDoesNotExistException if the lineage does not exist
*/ */
public synchronized LineageCommand lineageWorkerHeartbeat(long workerId, public synchronized PersistCommand workerHeartbeat(long workerId, List<Long> persistedFiles)
List<Long> persistedFiles) throws FileDoesNotExistException, InvalidPathException {
throws FileDoesNotExistException, InvalidPathException, LineageDoesNotExistException {
if (!persistedFiles.isEmpty()) { if (!persistedFiles.isEmpty()) {
// notify checkpoint manager the persisted files // notify checkpoint manager the persisted files
persistFiles(workerId, persistedFiles); persistFiles(workerId, persistedFiles);
} }


// get the files for the given worker to checkpoint // get the files for the given worker to checkpoint
List<CheckpointFile> filesToCheckpoint = null; List<PersistFile> filesToCheckpoint = null;
filesToCheckpoint = pollToCheckpoint(workerId); filesToCheckpoint = pollToCheckpoint(workerId);
if (!filesToCheckpoint.isEmpty()) { if (!filesToCheckpoint.isEmpty()) {
LOG.info("Sent files {} to worker {} to persist", filesToCheckpoint, workerId); LOG.info("Sent files {} to worker {} to persist", filesToCheckpoint, workerId);
} }
return new LineageCommand(CommandType.Persist, filesToCheckpoint); return new PersistCommand(CommandType.Persist, filesToCheckpoint);
} }


/** /**
Expand Down
Expand Up @@ -15,14 +15,20 @@


package tachyon.master.file; package tachyon.master.file;


import java.util.List;
import java.util.Set; import java.util.Set;


import org.apache.thrift.TException;

import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;


import tachyon.Constants; import tachyon.Constants;
import tachyon.exception.FileDoesNotExistException;
import tachyon.exception.InvalidPathException;
import tachyon.exception.TachyonException; import tachyon.exception.TachyonException;
import tachyon.thrift.FileInfo; import tachyon.thrift.FileInfo;
import tachyon.thrift.FileSystemMasterWorkerService; import tachyon.thrift.FileSystemMasterWorkerService;
import tachyon.thrift.PersistCommand;
import tachyon.thrift.TachyonTException; import tachyon.thrift.TachyonTException;


/** /**
Expand Down Expand Up @@ -55,4 +61,16 @@ public FileInfo getFileInfo(long fileId) throws TachyonTException {
public Set<Long> getPinIdList() { public Set<Long> getPinIdList() {
return mFileSystemMaster.getPinIdList(); return mFileSystemMaster.getPinIdList();
} }

@Override
public PersistCommand heartbeat(long workerId, List<Long> persistedFiles)
throws TachyonTException, TException {
try {
return mFileSystemMaster.workerHeartbeat(workerId, persistedFiles);
} catch (FileDoesNotExistException e) {
throw e.toTachyonTException();
} catch (InvalidPathException e) {
throw e.toTachyonTException();
}
}
} }

0 comments on commit 5aa73f0

Please sign in to comment.