Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
yupeng9 committed Apr 9, 2016
1 parent 3461ee8 commit 6237dc6
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 289 deletions.
240 changes: 0 additions & 240 deletions core/server/src/main/java/alluxio/master/file/FileSystemMaster.java
Expand Up @@ -190,16 +190,9 @@ public FileSystemMaster(BlockMaster blockMaster, Journal journal) {
mWhitelist = new PrefixList(conf.getList(Constants.MASTER_WHITELIST, ","));

mWorkerToAsyncPersistFiles = Maps.newHashMap();
<<<<<<< HEAD
mGroupMappingService = GroupMappingService.Factory.getUserToGroupsMappingService(conf);

mAsyncPersistHandler =
AsyncPersistHandler.Factory.create(MasterContext.getConf(), new FileSystemMasterView(this));
||||||| merged common ancestors
mGroupMappingService = GroupMappingService.Factory.getUserToGroupsMappingService(conf);
=======
mPermissionChecker = new PermissionChecker(mInodeTree);
>>>>>>> upstream/master
}

@Override
Expand Down Expand Up @@ -282,15 +275,7 @@ public void processJournalEntry(JournalEntry entry) throws IOException {
try {
long fileId = ((AsyncPersistRequestEntry) innerEntry).getFileId();
scheduleAsyncPersistenceInternal(getPath(fileId));
<<<<<<< HEAD
} catch (AlluxioException e) {
||||||| merged common ancestors
} catch (FileDoesNotExistException e) {
throw new RuntimeException(e);
} catch (InvalidPathException e) {
=======
} catch (FileDoesNotExistException | InvalidPathException e) {
>>>>>>> upstream/master
throw new RuntimeException(e);
}
} else {
Expand Down Expand Up @@ -1919,235 +1904,10 @@ public void scheduleAsyncPersistence(AlluxioURI path) throws AlluxioException {
* @param path the path to schedule asynchronous persistence for
* @throws AlluxioException if scheduling fails
*/
<<<<<<< HEAD
private void scheduleAsyncPersistenceInternal(AlluxioURI path) throws AlluxioException {
Inode inode = mInodeTree.getInodeByPath(path);
||||||| merged common ancestors
private long scheduleAsyncPersistenceInternal(AlluxioURI path) throws
FileDoesNotExistException, InvalidPathException {
// find the worker
long workerId = getWorkerStoringFile(path);

if (workerId == IdUtils.INVALID_WORKER_ID) {
LOG.warn("No worker found to schedule async persistence for file {}", path);
// no worker found, do nothing
return workerId;
}

// update the state
Inode<?> inode = mInodeTree.getInodeByPath(path);
=======
@GuardedBy("mInodeTree")
private long scheduleAsyncPersistenceInternal(AlluxioURI path) throws
FileDoesNotExistException, InvalidPathException {
// find the worker
long workerId = getWorkerStoringFile(path);

if (workerId == IdUtils.INVALID_WORKER_ID) {
LOG.warn("No worker found to schedule async persistence for file {}", path);
// no worker found, do nothing
return workerId;
}

// update the state
Inode<?> inode = mInodeTree.getInodeByPath(path);
>>>>>>> upstream/master
inode.setPersistenceState(PersistenceState.IN_PROGRESS);
<<<<<<< HEAD
mAsyncPersistHandler.scheduleAsyncPersistence(path);
||||||| merged common ancestors
long fileId = inode.getId();

if (!mWorkerToAsyncPersistFiles.containsKey(workerId)) {
mWorkerToAsyncPersistFiles.put(workerId, Sets.<Long>newHashSet());
}
mWorkerToAsyncPersistFiles.get(workerId).add(fileId);

return workerId;
}

/**
* Gets a worker where the given file is stored.
*
* @param path the path to the file
* @return the id of the storing worker
* @throws FileDoesNotExistException when the file does not exist on any worker
*/
// TODO(calvin): Propagate the exceptions in certain cases
private long getWorkerStoringFile(AlluxioURI path) throws FileDoesNotExistException {
Map<Long, Integer> workerBlockCounts = Maps.newHashMap();
List<FileBlockInfo> blockInfoList;
try {
blockInfoList = getFileBlockInfoList(path);

for (FileBlockInfo fileBlockInfo : blockInfoList) {
for (BlockLocation blockLocation : fileBlockInfo.getBlockInfo().getLocations()) {
if (workerBlockCounts.containsKey(blockLocation.getWorkerId())) {
workerBlockCounts.put(blockLocation.getWorkerId(),
workerBlockCounts.get(blockLocation.getWorkerId()) + 1);
} else {
workerBlockCounts.put(blockLocation.getWorkerId(), 1);
}

// TODO(yupeng) remove the requirement that all the blocks of a file must be stored on the
// same worker, for now it returns the first worker that has all the blocks
if (workerBlockCounts.get(blockLocation.getWorkerId()) == blockInfoList.size()) {
return blockLocation.getWorkerId();
}
}
}
} catch (FileDoesNotExistException e) {
LOG.error("The file {} to persist does not exist", path);
return IdUtils.INVALID_WORKER_ID;
} catch (InvalidPathException e) {
LOG.error("The file {} to persist is invalid", path);
return IdUtils.INVALID_WORKER_ID;
}

if (workerBlockCounts.size() == 0) {
LOG.error("The file " + path + " does not exist on any worker");
return IdUtils.INVALID_WORKER_ID;
}

LOG.error("Not all the blocks of file {} stored on the same worker", path);
return IdUtils.INVALID_WORKER_ID;
}

/**
* Polls the files to send to the given worker for persistence. It also removes files from the
* worker entry in {@link #mWorkerToAsyncPersistFiles}.
*
* @param workerId the worker id
* @return the list of files
* @throws FileDoesNotExistException if the file does not exist
* @throws InvalidPathException if the path is invalid
*/
private List<PersistFile> pollFilesToCheckpoint(long workerId)
throws FileDoesNotExistException, InvalidPathException {
List<PersistFile> filesToPersist = Lists.newArrayList();
List<Long> fileIdsToPersist = Lists.newArrayList();

synchronized (mInodeTree) {
if (!mWorkerToAsyncPersistFiles.containsKey(workerId)) {
return filesToPersist;
}

Set<Long> scheduledFiles = mWorkerToAsyncPersistFiles.get(workerId);
for (long fileId : scheduledFiles) {
InodeFile inode = (InodeFile) mInodeTree.getInodeById(fileId);
if (inode.isCompleted()) {
fileIdsToPersist.add(fileId);
List<Long> blockIds = Lists.newArrayList();
for (FileBlockInfo fileBlockInfo : getFileBlockInfoList(mInodeTree.getPath(inode))) {
blockIds.add(fileBlockInfo.getBlockInfo().getBlockId());
}

filesToPersist.add(new PersistFile(fileId, blockIds));
// update the inode file persisence state
inode.setPersistenceState(PersistenceState.IN_PROGRESS);
}
}
mWorkerToAsyncPersistFiles.get(workerId).removeAll(fileIdsToPersist);
}
return filesToPersist;
=======
long fileId = inode.getId();

if (!mWorkerToAsyncPersistFiles.containsKey(workerId)) {
mWorkerToAsyncPersistFiles.put(workerId, Sets.<Long>newHashSet());
}
mWorkerToAsyncPersistFiles.get(workerId).add(fileId);

return workerId;
}

/**
* Gets a worker where the given file is stored.
*
* @param path the path to the file
* @return the id of the storing worker
* @throws FileDoesNotExistException when the file does not exist on any worker
*/
// TODO(calvin): Propagate the exceptions in certain cases
@GuardedBy("mInodeTree")
private long getWorkerStoringFile(AlluxioURI path) throws FileDoesNotExistException {
Map<Long, Integer> workerBlockCounts = Maps.newHashMap();
List<FileBlockInfo> blockInfoList;
try {
InodeFile inode = mInodeTree.getInodeFileByPath(path);
blockInfoList = getFileBlockInfoListInternal(inode);

for (FileBlockInfo fileBlockInfo : blockInfoList) {
for (BlockLocation blockLocation : fileBlockInfo.getBlockInfo().getLocations()) {
if (workerBlockCounts.containsKey(blockLocation.getWorkerId())) {
workerBlockCounts.put(blockLocation.getWorkerId(),
workerBlockCounts.get(blockLocation.getWorkerId()) + 1);
} else {
workerBlockCounts.put(blockLocation.getWorkerId(), 1);
}

// TODO(yupeng) remove the requirement that all the blocks of a file must be stored on the
// same worker, for now it returns the first worker that has all the blocks
if (workerBlockCounts.get(blockLocation.getWorkerId()) == blockInfoList.size()) {
return blockLocation.getWorkerId();
}
}
}
} catch (FileDoesNotExistException e) {
LOG.error("The file {} to persist does not exist", path);
return IdUtils.INVALID_WORKER_ID;
} catch (InvalidPathException e) {
LOG.error("The file {} to persist is invalid", path);
return IdUtils.INVALID_WORKER_ID;
}

if (workerBlockCounts.size() == 0) {
LOG.error("The file " + path + " does not exist on any worker");
return IdUtils.INVALID_WORKER_ID;
}

LOG.error("Not all the blocks of file {} stored on the same worker", path);
return IdUtils.INVALID_WORKER_ID;
}

/**
* Polls the files to send to the given worker for persistence. It also removes files from the
* worker entry in {@link #mWorkerToAsyncPersistFiles}.
*
* @param workerId the worker id
* @return the list of files
* @throws FileDoesNotExistException if the file does not exist
* @throws InvalidPathException if the path is invalid
*/
private List<PersistFile> pollFilesToCheckpoint(long workerId)
throws FileDoesNotExistException, InvalidPathException {
List<PersistFile> filesToPersist = Lists.newArrayList();
List<Long> fileIdsToPersist = Lists.newArrayList();

synchronized (mInodeTree) {
if (!mWorkerToAsyncPersistFiles.containsKey(workerId)) {
return filesToPersist;
}

Set<Long> scheduledFiles = mWorkerToAsyncPersistFiles.get(workerId);
for (long fileId : scheduledFiles) {
InodeFile inode = (InodeFile) mInodeTree.getInodeById(fileId);
if (inode.isCompleted()) {
fileIdsToPersist.add(fileId);
List<Long> blockIds = Lists.newArrayList();
for (FileBlockInfo fileBlockInfo : getFileBlockInfoListInternal(inode)) {
blockIds.add(fileBlockInfo.getBlockInfo().getBlockId());
}

filesToPersist.add(new PersistFile(fileId, blockIds));
// update the inode file persistence state
inode.setPersistenceState(PersistenceState.IN_PROGRESS);
}
}
mWorkerToAsyncPersistFiles.get(workerId).removeAll(fileIdsToPersist);
}
return filesToPersist;
>>>>>>> upstream/master
}

/**
Expand Down
Expand Up @@ -395,13 +395,7 @@ public void setStateTest() throws Exception {
@Test
public void isFullyInMemoryTest() throws Exception {
// add nested file
<<<<<<< HEAD
mFileSystemMaster.create(NESTED_FILE_URI, sNestedFileOptions);
||||||| merged common ancestors
long fileId = mFileSystemMaster.create(NESTED_FILE_URI, sNestedFileOptions);
=======
long fileId = mFileSystemMaster.createFile(NESTED_FILE_URI, sNestedFileOptions);
>>>>>>> upstream/master
mFileSystemMaster.createFile(NESTED_FILE_URI, sNestedFileOptions);
// add in-memory block
long blockId = mFileSystemMaster.getNewBlockIdForFile(NESTED_FILE_URI);
mBlockMaster.commitBlock(mWorkerId1, Constants.KB, "MEM", blockId, Constants.KB);
Expand Down Expand Up @@ -574,48 +568,6 @@ public void workerHeartbeatTest() throws Exception {
}

/**
<<<<<<< HEAD
||||||| merged common ancestors
* Tests the persistence of file with block on multiple workers.
*
* @throws Exception if a {@link FileSystemMaster} operation fails
*/
@Test
public void persistenceFileWithBlocksOnMultipleWorkers() throws Exception {
long fileId = mFileSystemMaster.create(ROOT_FILE_URI, sNestedFileOptions);
long blockId1 = mFileSystemMaster.getNewBlockIdForFile(ROOT_FILE_URI);
mBlockMaster.commitBlock(mWorkerId1, Constants.KB, "MEM", blockId1, Constants.KB);
long blockId2 = mFileSystemMaster.getNewBlockIdForFile(ROOT_FILE_URI);
mBlockMaster.commitBlock(mWorkerId2, Constants.KB, "MEM", blockId2, Constants.KB);
CompleteFileOptions options = CompleteFileOptions.defaults().setUfsLength(2 * Constants.KB);
mFileSystemMaster.completeFile(ROOT_FILE_URI, options);

long workerId = mFileSystemMaster.scheduleAsyncPersistence(ROOT_FILE_URI);
Assert.assertEquals(IdUtils.INVALID_WORKER_ID, workerId);
}

/**
=======
* Tests the persistence of file with block on multiple workers.
*
* @throws Exception if a {@link FileSystemMaster} operation fails
*/
@Test
public void persistenceFileWithBlocksOnMultipleWorkers() throws Exception {
long fileId = mFileSystemMaster.createFile(ROOT_FILE_URI, sNestedFileOptions);
long blockId1 = mFileSystemMaster.getNewBlockIdForFile(ROOT_FILE_URI);
mBlockMaster.commitBlock(mWorkerId1, Constants.KB, "MEM", blockId1, Constants.KB);
long blockId2 = mFileSystemMaster.getNewBlockIdForFile(ROOT_FILE_URI);
mBlockMaster.commitBlock(mWorkerId2, Constants.KB, "MEM", blockId2, Constants.KB);
CompleteFileOptions options = CompleteFileOptions.defaults().setUfsLength(2 * Constants.KB);
mFileSystemMaster.completeFile(ROOT_FILE_URI, options);

long workerId = mFileSystemMaster.scheduleAsyncPersistence(ROOT_FILE_URI);
Assert.assertEquals(IdUtils.INVALID_WORKER_ID, workerId);
}

/**
>>>>>>> upstream/master
* Tests that lost files can successfully be detected.
*
* @throws Exception if a {@link FileSystemMaster} operation fails
Expand Down

0 comments on commit 6237dc6

Please sign in to comment.