Skip to content

Commit

Permalink
Computing the file length correctly.
Browse files Browse the repository at this point in the history
  • Loading branch information
jsimsa committed Oct 26, 2015
1 parent 1204d49 commit d90c8fa
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 59 deletions.
Expand Up @@ -125,6 +125,7 @@ public void close() throws IOException {
}

Boolean canComplete = false;
CompleteFileOptions.Builder builder = new CompleteFileOptions.Builder(ClientContext.getConf());
if (mUnderStorageType.isSyncPersist()) {
String tmpPath = PathUtils.temporaryFileName(mFileId, mNonce, mUfsPath);
UnderFileSystem ufs = UnderFileSystem.get(tmpPath, ClientContext.getConf());
Expand All @@ -148,6 +149,7 @@ public void close() throws IOException {
if (!ufs.rename(tmpPath, mUfsPath)) {
throw new IOException("Failed to rename " + tmpPath + " to " + mUfsPath);
}
builder.setLength(ufs.getFileSize(mUfsPath));
canComplete = true;
}
}
Expand All @@ -172,8 +174,7 @@ public void close() throws IOException {
if (canComplete) {
FileSystemMasterClient masterClient = mContext.acquireMasterClient();
try {
CompleteFileOptions options = CompleteFileOptions.defaults();
masterClient.completeFile(mFileId, options);
masterClient.completeFile(mFileId, builder.build());
} catch (TachyonException e) {
throw new IOException(e);
} finally {
Expand Down
26 changes: 17 additions & 9 deletions servers/src/main/java/tachyon/master/file/FileSystemMaster.java
Expand Up @@ -337,11 +337,11 @@ public List<FileInfo> getFileInfoList(long fileId) throws FileDoesNotExistExcept
}

/**
* Marks a file as completed. After a file is complete, it cannot be written to. Called via RPC.
* Complete a file. After a file is completed, it cannot be written to. Called via RPC.
*
* @param fileId the file id to complete.
* @throws FileDoesNotExistException
* @throws BlockInfoException
* @param fileId the file id to complete
* @throws FileDoesNotExistException if the file does not exist
* @throws BlockInfoException if a block information exception is encountered
*/
public void completeFile(long fileId, CompleteFileOptions options) throws BlockInfoException,
FileDoesNotExistException, InvalidPathException, SuspectedFileSizeException {
Expand All @@ -360,22 +360,30 @@ public void completeFile(long fileId, CompleteFileOptions options) throws BlockI
throw new BlockInfoException("Cannot complete file without all the blocks committed");
}

// Verify that all the blocks (except the last one) is the same size as the file block size.
long fileLength = 0;
// Iterate from all in-memory file blocks, computing the length and verify that all the blocks
// (except the last one) is the same size as the file block size.
long inMemoryLength = 0;
long fileBlockSize = fileInode.getBlockSizeBytes();
for (int i = 0; i < blockInfoList.size(); i ++) {
BlockInfo blockInfo = blockInfoList.get(i);
fileLength += blockInfo.getLength();
inMemoryLength += blockInfo.getLength();
if (i < blockInfoList.size() - 1 && blockInfo.getLength() != fileBlockSize) {
throw new BlockInfoException(
"Block index " + i + " has a block size smaller than the file block size ("
+ fileInode.getBlockSizeBytes() + ")");
}
}

completeFileInternal(fileInode.getBlockIds(), fileId, fileLength, opTimeMs);
if (fileInode.isPersisted() && inMemoryLength != 0 && options.getLength() != inMemoryLength) {
throw new SuspectedFileSizeException("Inconsistent file length: Tachyon " + inMemoryLength
+ " UFS " + options.getLength());
}

long length = fileInode.isPersisted() ? options.getLength() : inMemoryLength;

completeFileInternal(fileInode.getBlockIds(), fileId, length, opTimeMs);
writeJournalEntry(
new CompleteFileEntry(fileInode.getBlockIds(), fileId, fileLength, opTimeMs));
new CompleteFileEntry(fileInode.getBlockIds(), fileId, length, opTimeMs));
flushJournal();
}
}
Expand Down
41 changes: 0 additions & 41 deletions servers/src/main/java/tachyon/worker/block/BlockDataManager.java
Expand Up @@ -142,47 +142,6 @@ public void accessBlock(long sessionId, long blockId) throws BlockDoesNotExistEx
mBlockStore.accessBlock(sessionId, blockId);
}

/**
* Completes the process of persisting a file by renaming it to its final destination.
*
* This method is normally triggered from {@link tachyon.client.file.FileOutStream#close()} if and
* only if {@link UnderStorageType#isSyncPersist()} ()} is true. The current implementation of
* persistence is that through {@link tachyon.client.UnderStorageType} operations write to
* {@link tachyon.underfs.UnderFileSystem} on the client's write path, but under a temporary file.
*
* @param fileId a file id
* @param nonce a nonce used for temporary file creation
* @param ufsPath the UFS path of the file
* @throws TachyonTException if the file does not exist or cannot be renamed
* @throws IOException if the update to the master fails
*/
public void persistFile(long fileId, long nonce, String ufsPath)
throws TachyonException, IOException {
String tmpPath = PathUtils.temporaryFileName(fileId, nonce, ufsPath);
UnderFileSystem ufs = UnderFileSystem.get(tmpPath, WorkerContext.getConf());
try {
if (!ufs.exists(tmpPath)) {
// Location of the temporary file has changed, recompute it.
FileInfo fileInfo = mFileSystemMasterClient.getFileInfo(fileId);
ufsPath = fileInfo.getUfsPath();
tmpPath = PathUtils.temporaryFileName(fileId, nonce, ufsPath);
}
if (!ufs.rename(tmpPath, ufsPath)) {
throw new FailedToCheckpointException("Failed to rename " + tmpPath + " to " + ufsPath);
}
} catch (IOException ioe) {
throw new FailedToCheckpointException(
"Failed to rename " + tmpPath + " to " + ufsPath + ": " + ioe);
}
long fileSize;
try {
fileSize = ufs.getFileSize(ufsPath);
} catch (IOException ioe) {
throw new FailedToCheckpointException("Failed to getFileSize " + ufsPath);
}
// mFileSystemMasterClient.persistFile(fileId, fileSize);
}

/**
* Cleans up after sessions, to prevent zombie sessions. This method is called periodically by
* {@link SessionCleaner} thread.
Expand Down
Expand Up @@ -58,22 +58,22 @@ public void setLengthTest() throws Exception {
}

@Test
public void setNegativeLengthTest() throws Exception {
public void completeWithNegativeTest() throws Exception {
mThrown.expect(SuspectedFileSizeException.class);
mThrown.expectMessage("InodeFile new length " + -1 + " is negative.");
mThrown.expectMessage("File length " + -1 + " cannot be negative.");

InodeFile inodeFile = createInodeFile(1);
inodeFile.setLength(-1);
inodeFile.complete(-1);
}

@Test
public void setLengthAfterCompleteTest() throws Exception {
public void completeTwiceTest() throws Exception {
mThrown.expect(SuspectedFileSizeException.class);
mThrown.expectMessage("InodeFile has been completed.");
mThrown.expectMessage("File has already been completed.");

InodeFile inodeFile = createInodeFile(1);
inodeFile.setLength(LENGTH);
inodeFile.setLength(LENGTH);
inodeFile.complete(LENGTH);
inodeFile.complete(LENGTH);
}

@Test
Expand Down
1 change: 1 addition & 0 deletions shell/src/test/java/tachyon/shell/TfsShellTest.java
Expand Up @@ -381,6 +381,7 @@ public void locationNotExistTest() throws IOException {
* locationsList.iterator(); int i = 3; while (iter.hasNext()) { commandParameters[i ++] =
* iter.next(); } Assert.assertEquals(getCommandOutput(commandParameters), mOutput.toString()); }
*/

@Test
public void lsrTest() throws IOException, TachyonException {
FileInfo[] files = new FileInfo[4];
Expand Down

0 comments on commit d90c8fa

Please sign in to comment.