Skip to content

Commit

Permalink
[SMALLFIX] Use RpcContext to organize contexts during RPCs (#7009)
Browse files Browse the repository at this point in the history
* Use RpcContext to organize contexts during RPCs

* Add license header

* Fix test and avoid whitebox usage

* Fix checkstyle

* Update comments and javadocs

* Add comments

* Use RpcContext consistenly across filesystem master rpcs

* Simplify getJournalContext.append

* Fix compilation
  • Loading branch information
aaudiber authored and apc999 committed Apr 3, 2018
1 parent 9d13ac0 commit 8a6d24b
Show file tree
Hide file tree
Showing 7 changed files with 486 additions and 282 deletions.

Large diffs are not rendered by default.

107 changes: 107 additions & 0 deletions core/server/master/src/main/java/alluxio/master/file/RpcContext.java
@@ -0,0 +1,107 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.file;

import alluxio.exception.status.UnavailableException;
import alluxio.master.journal.JournalContext;
import alluxio.master.journal.NoopJournalContext;
import alluxio.proto.journal.Journal.JournalEntry;

import com.google.common.base.Throwables;

import java.io.Closeable;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

/**
* Context passed through the span of a file system master RPC to manage actions that need to be
* taken when the body of the RPC is finished.
*
* Note that we cannot use Guava's {@link com.google.common.io.Closer} because it doesn't make
* guarantees about the order in which resources are closed.
*/
@NotThreadSafe
public final class RpcContext implements Closeable {
public static final RpcContext NOOP =
new RpcContext(NoopBlockDeletionContext.INSTANCE, NoopJournalContext.INSTANCE);

@Nullable
private final BlockDeletionContext mBlockDeletionContext;
private final JournalContext mJournalContext;

// Used during close to keep track of thrown exceptions.
private Throwable mThrown = null;

/**
* Creates an {@link RpcContext}. This class aggregates different contexts used over the course of
* an RPC, and makes sure they are closed in the right order when the RPC is finished.
*
* @param blockDeleter block deletion context
* @param journalContext journal context
*/
public RpcContext(BlockDeletionContext blockDeleter, JournalContext journalContext) {
mBlockDeletionContext = blockDeleter;
mJournalContext = journalContext;
}

/**
* @return the journal context
*/
public JournalContext getJournalContext() {
return mJournalContext;
}

/**
* Syntax sugar for getJournalContext().append(entry).
*
* @param entry the {@link JournalEntry} to append to the journal
*/
public void journal(JournalEntry entry) {
mJournalContext.append(entry);
}

/**
* @return the block deletion context
*/
public BlockDeletionContext getBlockDeletionContext() {
return mBlockDeletionContext;
}

@Override
public void close() throws UnavailableException {
// JournalContext is closed before block deletion context so that file system master changes
// get written before block master changes. If a failure occurs between deleting an inode and
// remove its blocks, it's better to have an orphaned block than an inode with a missing block.
closeQuietly(mJournalContext);
closeQuietly(mBlockDeletionContext);

if (mThrown != null) {
Throwables.propagateIfPossible(mThrown, UnavailableException.class);
throw new RuntimeException(mThrown);
}
}

private void closeQuietly(AutoCloseable c) {
if (c != null) {
try {
c.close();
} catch (Throwable t) {
if (mThrown != null) {
mThrown.addSuppressed(t);
} else {
mThrown = t;
}
}
}
}
}
Expand Up @@ -25,14 +25,12 @@
import alluxio.exception.PreconditionMessage; import alluxio.exception.PreconditionMessage;
import alluxio.exception.status.UnavailableException; import alluxio.exception.status.UnavailableException;
import alluxio.master.block.ContainerIdGenerable; import alluxio.master.block.ContainerIdGenerable;
import alluxio.master.file.BlockDeletionContext; import alluxio.master.file.RpcContext;
import alluxio.master.file.options.CreateDirectoryOptions; import alluxio.master.file.options.CreateDirectoryOptions;
import alluxio.master.file.options.CreateFileOptions; import alluxio.master.file.options.CreateFileOptions;
import alluxio.master.file.options.CreatePathOptions; import alluxio.master.file.options.CreatePathOptions;
import alluxio.master.file.options.DeleteOptions; import alluxio.master.file.options.DeleteOptions;
import alluxio.master.journal.JournalContext;
import alluxio.master.journal.JournalEntryIterable; import alluxio.master.journal.JournalEntryIterable;
import alluxio.master.journal.NoopJournalContext;
import alluxio.proto.journal.File; import alluxio.proto.journal.File;
import alluxio.proto.journal.File.InodeDirectoryEntry; import alluxio.proto.journal.File.InodeDirectoryEntry;
import alluxio.proto.journal.File.InodeFileEntry; import alluxio.proto.journal.File.InodeFileEntry;
Expand Down Expand Up @@ -473,9 +471,9 @@ public InodeDirectory getRoot() {
/** /**
* Creates a file or directory at path. * Creates a file or directory at path.
* *
* @param rpcContext the rpc context
* @param inodePath the path * @param inodePath the path
* @param options method options * @param options method options
* @param journalContext the journal context
* @return a {@link CreatePathResult} representing the modified inodes and created inodes during * @return a {@link CreatePathResult} representing the modified inodes and created inodes during
* path creation * path creation
* @throws FileAlreadyExistsException when there is already a file at path if we want to create a * @throws FileAlreadyExistsException when there is already a file at path if we want to create a
Expand All @@ -487,10 +485,9 @@ public InodeDirectory getRoot() {
* @throws FileDoesNotExistException if the parent of the path does not exist and the recursive * @throws FileDoesNotExistException if the parent of the path does not exist and the recursive
* option is false * option is false
*/ */
public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<?> options, public CreatePathResult createPath(RpcContext rpcContext, LockedInodePath inodePath,
JournalContext journalContext) CreatePathOptions<?> options) throws FileAlreadyExistsException, BlockInfoException,
throws FileAlreadyExistsException, BlockInfoException, InvalidPathException, IOException, InvalidPathException, IOException, FileDoesNotExistException {
FileDoesNotExistException {
// TODO(gpang): consider splitting this into createFilePath and createDirectoryPath, with a // TODO(gpang): consider splitting this into createFilePath and createDirectoryPath, with a
// helper method for the shared logic. // helper method for the shared logic.
AlluxioURI path = inodePath.getUri(); AlluxioURI path = inodePath.getUri();
Expand Down Expand Up @@ -556,7 +553,7 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
// Synchronously persist directories. These inodes are already READ locked. // Synchronously persist directories. These inodes are already READ locked.
for (Inode inode : traversalResult.getNonPersisted()) { for (Inode inode : traversalResult.getNonPersisted()) {
// This cast is safe because we've already verified that the file inode doesn't exist. // This cast is safe because we've already verified that the file inode doesn't exist.
syncPersistDirectory((InodeDirectory) inode, journalContext); syncPersistDirectory(rpcContext, (InodeDirectory) inode);
} }
} }
if ((pathIndex < (pathComponents.length - 1) || currentInodeDirectory.getChild(name) == null) if ((pathIndex < (pathComponents.length - 1) || currentInodeDirectory.getChild(name) == null)
Expand All @@ -571,9 +568,8 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
File.InodeLastModificationTimeEntry inodeLastModificationTime = File.InodeLastModificationTimeEntry inodeLastModificationTime =
File.InodeLastModificationTimeEntry.newBuilder().setId(currentInodeDirectory.getId()) File.InodeLastModificationTimeEntry.newBuilder().setId(currentInodeDirectory.getId())
.setLastModificationTimeMs(options.getOperationTimeMs()).build(); .setLastModificationTimeMs(options.getOperationTimeMs()).build();
journalContext.append( rpcContext.journal(Journal.JournalEntry.newBuilder()
Journal.JournalEntry.newBuilder().setInodeLastModificationTime(inodeLastModificationTime) .setInodeLastModificationTime(inodeLastModificationTime).build());
.build());
} }


// Fill in the ancestor directories that were missing. // Fill in the ancestor directories that were missing.
Expand All @@ -589,7 +585,8 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
for (int k = pathIndex; k < (pathComponents.length - 1); k++) { for (int k = pathIndex; k < (pathComponents.length - 1); k++) {
InodeDirectory dir = null; InodeDirectory dir = null;
while (dir == null) { while (dir == null) {
dir = InodeDirectory.create(mDirectoryIdGenerator.getNewDirectoryId(journalContext), dir = InodeDirectory.create(
mDirectoryIdGenerator.getNewDirectoryId(rpcContext.getJournalContext()),
currentInodeDirectory.getId(), pathComponents[k], missingDirOptions); currentInodeDirectory.getId(), pathComponents[k], missingDirOptions);
// Lock the newly created inode before subsequent operations, and add it to the lock group. // Lock the newly created inode before subsequent operations, and add it to the lock group.
lockList.lockWriteAndCheckNameAndParent(dir, currentInodeDirectory, pathComponents[k]); lockList.lockWriteAndCheckNameAndParent(dir, currentInodeDirectory, pathComponents[k]);
Expand All @@ -610,15 +607,15 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
dir.setPinned(currentInodeDirectory.isPinned()); dir.setPinned(currentInodeDirectory.isPinned());
if (options.isPersisted()) { if (options.isPersisted()) {
// Do not journal the persist entry, since a creation entry will be journaled instead. // Do not journal the persist entry, since a creation entry will be journaled instead.
syncPersistDirectory(dir, NoopJournalContext.INSTANCE); syncPersistDirectory(RpcContext.NOOP, dir);
} }
} catch (Exception e) { } catch (Exception e) {
// Failed to persist the directory, so remove it from the parent. // Failed to persist the directory, so remove it from the parent.
currentInodeDirectory.removeChild(dir); currentInodeDirectory.removeChild(dir);
throw e; throw e;
} }
// Journal the new inode. // Journal the new inode.
journalContext.append(dir.toJournalEntry()); rpcContext.getJournalContext().append(dir.toJournalEntry());
mInodes.add(dir); mInodes.add(dir);


// After creation and journaling, downgrade to a read lock. // After creation and journaling, downgrade to a read lock.
Expand Down Expand Up @@ -656,7 +653,7 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
.isPersisted() && options.isPersisted()) { .isPersisted() && options.isPersisted()) {
// The final path component already exists and is not persisted, so it should be added // The final path component already exists and is not persisted, so it should be added
// to the non-persisted Inodes of traversalResult. // to the non-persisted Inodes of traversalResult.
syncPersistDirectory((InodeDirectory) lastInode, journalContext); syncPersistDirectory(rpcContext, (InodeDirectory) lastInode);
} else if (!lastInode.isDirectory() || !(options instanceof CreateDirectoryOptions } else if (!lastInode.isDirectory() || !(options instanceof CreateDirectoryOptions
&& ((CreateDirectoryOptions) options).isAllowExists())) { && ((CreateDirectoryOptions) options).isAllowExists())) {
String errorMessage = ExceptionMessage.FILE_ALREADY_EXISTS.getMessage(path); String errorMessage = ExceptionMessage.FILE_ALREADY_EXISTS.getMessage(path);
Expand All @@ -667,13 +664,14 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
// create the new inode, with a write lock // create the new inode, with a write lock
if (options instanceof CreateDirectoryOptions) { if (options instanceof CreateDirectoryOptions) {
CreateDirectoryOptions directoryOptions = (CreateDirectoryOptions) options; CreateDirectoryOptions directoryOptions = (CreateDirectoryOptions) options;
lastInode = InodeDirectory.create(mDirectoryIdGenerator.getNewDirectoryId(journalContext), lastInode = InodeDirectory.create(
mDirectoryIdGenerator.getNewDirectoryId(rpcContext.getJournalContext()),
currentInodeDirectory.getId(), name, directoryOptions); currentInodeDirectory.getId(), name, directoryOptions);
// Lock the created inode before subsequent operations, and add it to the lock group. // Lock the created inode before subsequent operations, and add it to the lock group.
lockList.lockWriteAndCheckNameAndParent(lastInode, currentInodeDirectory, name); lockList.lockWriteAndCheckNameAndParent(lastInode, currentInodeDirectory, name);
if (directoryOptions.isPersisted()) { if (directoryOptions.isPersisted()) {
// Do not journal the persist entry, since a creation entry will be journaled instead. // Do not journal the persist entry, since a creation entry will be journaled instead.
syncPersistDirectory((InodeDirectory) lastInode, NoopJournalContext.INSTANCE); syncPersistDirectory(RpcContext.NOOP, (InodeDirectory) lastInode);
} }
} else if (options instanceof CreateFileOptions) { } else if (options instanceof CreateFileOptions) {
CreateFileOptions fileOptions = (CreateFileOptions) options; CreateFileOptions fileOptions = (CreateFileOptions) options;
Expand Down Expand Up @@ -703,7 +701,7 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
} }


// Journal the new inode. // Journal the new inode.
journalContext.append(lastInode.toJournalEntry()); rpcContext.getJournalContext().append(lastInode.toJournalEntry());


// Update state while holding the write lock. // Update state while holding the write lock.
mInodes.add(lastInode); mInodes.add(lastInode);
Expand All @@ -729,8 +727,7 @@ public CreatePathResult createPath(LockedInodePath inodePath, CreatePathOptions<
* @throws FileDoesNotExistException if the path does not exist * @throws FileDoesNotExistException if the path does not exist
*/ */
public long reinitializeFile(LockedInodePath inodePath, long blockSizeBytes, long ttl, public long reinitializeFile(LockedInodePath inodePath, long blockSizeBytes, long ttl,
TtlAction ttlAction) TtlAction ttlAction) throws InvalidPathException, FileDoesNotExistException {
throws InvalidPathException, FileDoesNotExistException {
InodeFile file = inodePath.getInodeFile(); InodeFile file = inodePath.getInodeFile();
file.setBlockSizeBytes(blockSizeBytes); file.setBlockSizeBytes(blockSizeBytes);
file.setTtl(ttl); file.setTtl(ttl);
Expand Down Expand Up @@ -785,16 +782,14 @@ private InodeLockList lockDescendantsInternal(InodeDirectory inodeDirectory,
/** /**
* Deletes a single inode from the inode tree by removing it from the parent inode. * Deletes a single inode from the inode tree by removing it from the parent inode.
* *
* @param inodePath The {@link LockedInodePath} to delete * @param rpcContext the rpc context
* @param opTimeMs The operation time * @param inodePath the {@link LockedInodePath} to delete
* @param opTimeMs the operation time
* @param deleteOptions the delete options * @param deleteOptions the delete options
* @param journalContext the journal context
* @param blockDeletionContext the block deletion context
* @throws FileDoesNotExistException if the Inode cannot be retrieved * @throws FileDoesNotExistException if the Inode cannot be retrieved
*/ */
public void deleteInode(LockedInodePath inodePath, long opTimeMs, DeleteOptions deleteOptions, public void deleteInode(RpcContext rpcContext, LockedInodePath inodePath, long opTimeMs,
JournalContext journalContext, BlockDeletionContext blockDeletionContext) DeleteOptions deleteOptions) throws FileDoesNotExistException {
throws FileDoesNotExistException {
Inode<?> inode = inodePath.getInode(); Inode<?> inode = inodePath.getInode();
InodeDirectory parent = (InodeDirectory) mInodes.getFirst(inode.getParentId()); InodeDirectory parent = (InodeDirectory) mInodes.getFirst(inode.getParentId());
if (parent == null) { if (parent == null) {
Expand All @@ -808,10 +803,11 @@ public void deleteInode(LockedInodePath inodePath, long opTimeMs, DeleteOptions
.setAlluxioOnly(deleteOptions.isAlluxioOnly()) .setAlluxioOnly(deleteOptions.isAlluxioOnly())
.setRecursive(deleteOptions.isRecursive()) .setRecursive(deleteOptions.isRecursive())
.setOpTimeMs(opTimeMs).build(); .setOpTimeMs(opTimeMs).build();
journalContext.append(Journal.JournalEntry.newBuilder().setDeleteFile(deleteFile).build()); rpcContext.journal(Journal.JournalEntry.newBuilder().setDeleteFile(deleteFile).build());


if (inode.isFile()) { if (inode.isFile()) {
blockDeletionContext.registerBlocksForDeletion(((InodeFile) inode).getBlockIds()); rpcContext.getBlockDeletionContext()
.registerBlocksForDeletion(((InodeFile) inode).getBlockIds());
} }


parent.removeChild(inode); parent.removeChild(inode);
Expand Down Expand Up @@ -996,12 +992,12 @@ private void addInodeFromJournalInternal(Inode<?> inode) {
* Synchronously persists an {@link InodeDirectory} to the UFS. If concurrent calls are made, only * Synchronously persists an {@link InodeDirectory} to the UFS. If concurrent calls are made, only
* one thread will persist to UFS, and the others will wait until it is persisted. * one thread will persist to UFS, and the others will wait until it is persisted.
* *
* @param rpcContext the rpc context
* @param dir the {@link InodeDirectory} to persist * @param dir the {@link InodeDirectory} to persist
* @param journalContext the journal context
* @throws InvalidPathException if the path for the inode is invalid * @throws InvalidPathException if the path for the inode is invalid
* @throws FileDoesNotExistException if the path for the inode is invalid * @throws FileDoesNotExistException if the path for the inode is invalid
*/ */
public void syncPersistDirectory(InodeDirectory dir, JournalContext journalContext) public void syncPersistDirectory(RpcContext rpcContext, InodeDirectory dir)
throws IOException, InvalidPathException, FileDoesNotExistException { throws IOException, InvalidPathException, FileDoesNotExistException {
RetryPolicy retry = RetryPolicy retry =
new ExponentialBackoffRetry(PERSIST_WAIT_BASE_SLEEP_MS, PERSIST_WAIT_MAX_SLEEP_MS, new ExponentialBackoffRetry(PERSIST_WAIT_BASE_SLEEP_MS, PERSIST_WAIT_MAX_SLEEP_MS,
Expand Down Expand Up @@ -1049,7 +1045,7 @@ public void syncPersistDirectory(InodeDirectory dir, JournalContext journalConte
// Append the persist entry to the journal. // Append the persist entry to the journal.
File.PersistDirectoryEntry persistDirectory = File.PersistDirectoryEntry persistDirectory =
File.PersistDirectoryEntry.newBuilder().setId(dir.getId()).build(); File.PersistDirectoryEntry.newBuilder().setId(dir.getId()).build();
journalContext.append( rpcContext.journal(
Journal.JournalEntry.newBuilder().setPersistDirectory(persistDirectory).build()); Journal.JournalEntry.newBuilder().setPersistDirectory(persistDirectory).build());
success = true; success = true;
} finally { } finally {
Expand Down
Expand Up @@ -32,7 +32,6 @@
import alluxio.master.file.meta.MountTable; import alluxio.master.file.meta.MountTable;
import alluxio.master.file.options.CreateFileOptions; import alluxio.master.file.options.CreateFileOptions;
import alluxio.master.journal.JournalSystem; import alluxio.master.journal.JournalSystem;
import alluxio.master.journal.NoopJournalContext;
import alluxio.master.journal.noop.NoopJournalSystem; import alluxio.master.journal.noop.NoopJournalSystem;
import alluxio.security.GroupMappingServiceTestUtils; import alluxio.security.GroupMappingServiceTestUtils;
import alluxio.security.authentication.AuthType; import alluxio.security.authentication.AuthType;
Expand Down Expand Up @@ -223,8 +222,7 @@ private static void createAndSetPermission(String path, CreateFileOptions option
try ( try (
LockedInodePath inodePath = sTree LockedInodePath inodePath = sTree
.lockInodePath(new AlluxioURI(path), InodeTree.LockMode.WRITE)) { .lockInodePath(new AlluxioURI(path), InodeTree.LockMode.WRITE)) {
InodeTree.CreatePathResult result = InodeTree.CreatePathResult result = sTree.createPath(RpcContext.NOOP, inodePath, option);
sTree.createPath(inodePath, option, new NoopJournalContext());
((InodeFile) result.getCreated().get(result.getCreated().size() - 1)) ((InodeFile) result.getCreated().get(result.getCreated().size() - 1))
.setOwner(option.getOwner()).setGroup(option.getGroup()) .setOwner(option.getOwner()).setGroup(option.getGroup())
.setMode(option.getMode().toShort()); .setMode(option.getMode().toShort());
Expand Down

0 comments on commit 8a6d24b

Please sign in to comment.