Skip to content

Commit

Permalink
Add journaling for async compelete file
Browse files Browse the repository at this point in the history
  • Loading branch information
yupeng9 committed Sep 23, 2015
1 parent 2157bbe commit 39094da
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 2 deletions.
17 changes: 16 additions & 1 deletion servers/src/main/java/tachyon/master/file/FileSystemMaster.java
Expand Up @@ -49,6 +49,7 @@
import tachyon.master.file.journal.InodeEntry;
import tachyon.master.file.journal.InodeLastModificationTimeEntry;
import tachyon.master.file.journal.RenameEntry;
import tachyon.master.file.journal.ResizeBlockEntry;
import tachyon.master.file.journal.SetPinnedEntry;
import tachyon.master.file.meta.Dependency;
import tachyon.master.file.meta.DependencyMap;
Expand Down Expand Up @@ -167,6 +168,8 @@ public void processJournalEntry(JournalEntry entry) throws IOException {
renameFromEntry((RenameEntry) entry);
} else if (entry instanceof InodeDirectoryIdGeneratorEntry) {
mDirectoryIdGenerator.fromJournalEntry((InodeDirectoryIdGeneratorEntry) entry);
} else if (entry instanceof ResizeBlockEntry) {
resetBlockSizeFromEntry((ResizeBlockEntry) entry);
} else {
throw new IOException(ExceptionMessage.UNEXPECETD_JOURNAL_ENTRY.getMessage(entry));
}
Expand Down Expand Up @@ -484,8 +487,20 @@ InodeTree.CreatePathResult createFileInternal(TachyonURI path, long blockSizeByt
*/
public long resetBlockSize(TachyonURI path, long blockSizeBytes) throws InvalidPathException {
// TODO(yupeng): add validation
return mInodeTree.resetBlockSize(path, blockSizeBytes);
long id = mInodeTree.resetBlockSize(path, blockSizeBytes);
writeJournalEntry(new ResizeBlockEntry(path.getPath(), blockSizeBytes));
flushJournal();
return id;
}

private void resetBlockSizeFromEntry(ResizeBlockEntry entry) {
try {
mInodeTree.resetBlockSize(new TachyonURI(entry.getPath()), entry.getBlockSizeBytes());
} catch (InvalidPathException e) {
throw new RuntimeException(e);
}
}

/**
* Returns the next block id for a given file id. Called via RPC.
*
Expand Down
@@ -0,0 +1,55 @@
/*
* Licensed to the University of California, Berkeley under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package tachyon.master.file.journal;

import java.util.Map;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;

import tachyon.master.journal.JournalEntry;
import tachyon.master.journal.JournalEntryType;

public class ResizeBlockEntry implements JournalEntry {
private final String mPath;
private final long mBlockSizeBytes;

public ResizeBlockEntry(String path, long blockSizeBytes) {
mPath = Preconditions.checkNotNull(path);
mBlockSizeBytes = blockSizeBytes;
}

public String getPath() {
return mPath;
}

public long getBlockSizeBytes() {
return mBlockSizeBytes;
}

@Override
public JournalEntryType getType() {
return JournalEntryType.RESIZE_BLOCK;
}

@Override
public Map<String, Object> getParameters() {
Map<String, Object> parameters = Maps.newHashMapWithExpectedSize(2);
parameters.put("path", mPath);
parameters.put("blockSizeBytes", mBlockSizeBytes);
return parameters;
}
}
Expand Up @@ -35,12 +35,14 @@ public enum JournalEntryType {
DELETE_FILE,
RENAME,
INODE_DIRECTORY_ID_GENERATOR,
RESIZE_BLOCK,

// Raw table master entries
RAW_TABLE,
UPDATE_METADATA,

// Lineage master entries
ASYNC_COMPLETE_FILE,
LINEAGE,
LINEAGE_FILE,
LINEAGE_ID_GENERATOR,
Expand Down
10 changes: 9 additions & 1 deletion servers/src/main/java/tachyon/master/lineage/LineageMaster.java
Expand Up @@ -42,6 +42,7 @@
import tachyon.master.journal.JournalOutputStream;
import tachyon.master.lineage.checkpoint.CheckpointManager;
import tachyon.master.lineage.checkpoint.CheckpointPlanningExecutor;
import tachyon.master.lineage.journal.AsyncCompleteFileEntry;
import tachyon.master.lineage.journal.LineageEntry;
import tachyon.master.lineage.journal.LineageIdGeneratorEntry;
import tachyon.master.lineage.meta.Lineage;
Expand Down Expand Up @@ -115,6 +116,8 @@ public void processJournalEntry(JournalEntry entry) throws IOException {
mLineageStore.addLineageFromJournal((LineageEntry) entry);
} else if (entry instanceof LineageIdGeneratorEntry) {
mLineageIdGenerator.fromJournalEntry((LineageIdGeneratorEntry) entry);
} else if (entry instanceof AsyncCompleteFileEntry) {
asyncCompleteFileFromEntry((AsyncCompleteFileEntry) entry);
} else {
throw new IOException(ExceptionMessage.UNEXPECETD_JOURNAL_ENTRY.getMessage(entry));
}
Expand Down Expand Up @@ -199,7 +202,6 @@ public boolean deleteLineage(long lineageId, boolean cascade) {
public long recreateFile(String path, long blockSizeBytes) throws InvalidPathException {
LOG.info("Recreate the file " + path + " with block size of " + blockSizeBytes + " bytes");
return mFileSystemMaster.resetBlockSize(new TachyonURI(path), blockSizeBytes);

}

public void asyncCompleteFile(long fileId, String underFsPath)
Expand All @@ -208,6 +210,12 @@ public void asyncCompleteFile(long fileId, String underFsPath)
mLineageStore.completeFileForAsyncWrite(fileId, underFsPath);
// complete file in Tachyon.
mFileSystemMaster.completeFile(fileId);
writeJournalEntry(new AsyncCompleteFileEntry(fileId, underFsPath));
flushJournal();
}

private void asyncCompleteFileFromEntry(AsyncCompleteFileEntry entry) {
mLineageStore.completeFileForAsyncWrite(entry.getFileId(), entry.getUnderFsPath());
}

/**
Expand Down
@@ -0,0 +1,56 @@
/*
* Licensed to the University of California, Berkeley under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package tachyon.master.lineage.journal;

import java.util.Map;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;

import tachyon.master.journal.JournalEntry;
import tachyon.master.journal.JournalEntryType;

public class AsyncCompleteFileEntry implements JournalEntry {
private long mFileId;
private String mUnderFsPath;

public AsyncCompleteFileEntry(long fileId, String underFsPath) {
mFileId = fileId;
mUnderFsPath = Preconditions.checkNotNull(underFsPath);
}

public long getFileId() {
return mFileId;
}

public String getUnderFsPath() {
return mUnderFsPath;
}

@Override
public JournalEntryType getType() {
return JournalEntryType.ASYNC_COMPLETE_FILE;
}

@Override
public Map<String, Object> getParameters() {
Map<String, Object> parameters = Maps.newHashMapWithExpectedSize(2);
parameters.put("fileId", mFileId);
parameters.put("underFsPath", mUnderFsPath);
return parameters;
}

}

0 comments on commit 39094da

Please sign in to comment.