Skip to content

Commit

Permalink
Refactor FSM RPC cancellation support
Browse files Browse the repository at this point in the history
Cherry-pick of existing commit.
orig-pr: #11674
orig-commit: 0236618
orig-commit-author: Göktürk Gezer <gokturk.gezer@gmail.com>

pr-link: #11723
change-id: cid-8e2c8f358f623255a4993899b8949e3be020ca06
  • Loading branch information
alluxio-bot committed Jul 7, 2020
1 parent 69e99db commit f41e6a7
Show file tree
Hide file tree
Showing 27 changed files with 284 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,16 @@
import alluxio.master.file.activesync.ActiveSyncManager;
import alluxio.master.file.contexts.CheckConsistencyContext;
import alluxio.master.file.contexts.CompleteFileContext;
import alluxio.master.file.contexts.CompositeCallTracker;
import alluxio.master.file.contexts.CreateDirectoryContext;
import alluxio.master.file.contexts.CreateFileContext;
import alluxio.master.file.contexts.DeleteContext;
import alluxio.master.file.contexts.FreeContext;
import alluxio.master.file.contexts.GetStatusContext;
import alluxio.master.file.contexts.InternalOperationContext;
import alluxio.master.file.contexts.ListStatusContext;
import alluxio.master.file.contexts.LoadMetadataContext;
import alluxio.master.file.contexts.MountContext;
import alluxio.master.file.contexts.OperationContext;
import alluxio.master.file.contexts.RenameContext;
import alluxio.master.file.contexts.ScheduleAsyncPersistenceContext;
import alluxio.master.file.contexts.SetAclContext;
Expand Down Expand Up @@ -440,7 +441,17 @@ public DefaultFileSystemMaster(BlockMaster blockMaster, CoreMasterContext master
? ServerConfiguration.getList(PropertyKey.MASTER_PERSISTENCE_BLACKLIST, ",")
: Collections.emptyList();

mStateLockCallTracker = () -> masterContext.getStateLockManager().interruptCycleTicking();
mStateLockCallTracker = new CallTracker() {
@Override
public boolean isCancelled() {
return masterContext.getStateLockManager().interruptCycleTicking();
}

@Override
public Type getType() {
return Type.STATE_LOCK_TRACKER;
}
};
mPermissionChecker = new DefaultPermissionChecker(mInodeTree);
mJobMasterClientPool = new JobMasterClientPool(JobMasterClientContext
.newBuilder(ClientContext.create(ServerConfiguration.global())).build());
Expand Down Expand Up @@ -801,7 +812,7 @@ public FileInfo getFileInfo(AlluxioURI path, GetStatusContext context)
throws FileDoesNotExistException, InvalidPathException, AccessControlException, IOException {
Metrics.GET_FILE_INFO_OPS.inc();
long opTimeMs = System.currentTimeMillis();
try (RpcContext rpcContext = createRpcContext();
try (RpcContext rpcContext = createRpcContext(context);
FileSystemMasterAuditContext auditContext =
createAuditContext("getFileInfo", path, null, null)) {

Expand Down Expand Up @@ -932,7 +943,7 @@ public void listStatus(AlluxioURI path, ListStatusContext context,
throws AccessControlException, FileDoesNotExistException, InvalidPathException, IOException {
Metrics.GET_FILE_INFO_OPS.inc();
LockingScheme lockingScheme = new LockingScheme(path, LockPattern.READ, false);
try (RpcContext rpcContext = createRpcContext();
try (RpcContext rpcContext = createRpcContext(context);
FileSystemMasterAuditContext auditContext =
createAuditContext("listStatus", path, null, null)) {

Expand Down Expand Up @@ -1046,10 +1057,7 @@ private void listStatusInternal(ListStatusContext context, RpcContext rpcContext
LockedInodePath currInodePath, AuditContext auditContext, DescendantType descendantType,
ResultStream<FileInfo> resultStream, int depth) throws FileDoesNotExistException,
UnavailableException, AccessControlException, InvalidPathException {
// Fail if the client has cancelled the rpc.
if (context.isCancelled()) {
throw new RuntimeException("Call cancelled.");
}
rpcContext.throwIfCancelled();
Inode inode = currInodePath.getInode();
if (inode.isDirectory() && descendantType != DescendantType.NONE) {
try {
Expand Down Expand Up @@ -1152,7 +1160,7 @@ public FileSystemMasterView getFileSystemMasterView() {
public List<AlluxioURI> checkConsistency(AlluxioURI path, CheckConsistencyContext context)
throws AccessControlException, FileDoesNotExistException, InvalidPathException, IOException {
List<AlluxioURI> inconsistentUris = new ArrayList<>();
try (RpcContext rpcContext = createRpcContext();
try (RpcContext rpcContext = createRpcContext(context);
FileSystemMasterAuditContext auditContext =
createAuditContext("checkConsistency", path, null, null)) {

Expand Down Expand Up @@ -1261,7 +1269,7 @@ public void completeFile(AlluxioURI path, CompleteFileContext context)
UnavailableException {
Metrics.COMPLETE_FILE_OPS.inc();
// No need to syncMetadata before complete.
try (RpcContext rpcContext = createRpcContext();
try (RpcContext rpcContext = createRpcContext(context);
LockedInodePath inodePath = mInodeTree.lockFullInodePath(path, LockPattern.WRITE_INODE);
FileSystemMasterAuditContext auditContext =
createAuditContext("completeFile", path, null, inodePath.getInodeOrNull())) {
Expand Down Expand Up @@ -1427,7 +1435,7 @@ public FileInfo createFile(AlluxioURI path, CreateFileContext context)
throws AccessControlException, InvalidPathException, FileAlreadyExistsException,
BlockInfoException, IOException, FileDoesNotExistException {
Metrics.CREATE_FILES_OPS.inc();
try (RpcContext rpcContext = createRpcContext();
try (RpcContext rpcContext = createRpcContext(context);
FileSystemMasterAuditContext auditContext =
createAuditContext("createFile", path, null, null)) {

Expand Down Expand Up @@ -1582,7 +1590,7 @@ public void delete(AlluxioURI path, DeleteContext context)
throws IOException, FileDoesNotExistException, DirectoryNotEmptyException,
InvalidPathException, AccessControlException {
Metrics.DELETE_PATHS_OPS.inc();
try (RpcContext rpcContext = createRpcContext();
try (RpcContext rpcContext = createRpcContext(context);
FileSystemMasterAuditContext auditContext =
createAuditContext("delete", path, null, null)) {

Expand Down Expand Up @@ -1703,10 +1711,7 @@ public void deleteInternal(RpcContext rpcContext, LockedInodePath inodePath,
// We go through each inode, removing it from its parent set and from mDelInodes. If it's a
// file, we deal with the checkpoints and blocks as well.
for (int i = inodesToDelete.size() - 1; i >= 0; i--) {
// Fail if the client has cancelled the rpc.
if (deleteContext.isCancelled()) {
throw new RuntimeException("Call cancelled.");
}
rpcContext.throwIfCancelled();
Pair<AlluxioURI, LockedInodePath> inodePairToDelete = inodesToDelete.get(i);
AlluxioURI alluxioUriToDelete = inodePairToDelete.getFirst();
Inode inodeToDelete = inodePairToDelete.getSecond().getInode();
Expand Down Expand Up @@ -2037,7 +2042,7 @@ public long createDirectory(AlluxioURI path, CreateDirectoryContext context)
throws InvalidPathException, FileAlreadyExistsException, IOException, AccessControlException,
FileDoesNotExistException {
Metrics.CREATE_DIRECTORIES_OPS.inc();
try (RpcContext rpcContext = createRpcContext();
try (RpcContext rpcContext = createRpcContext(context);
FileSystemMasterAuditContext auditContext =
createAuditContext("mkdir", path, null, null)) {

Expand Down Expand Up @@ -2134,7 +2139,7 @@ public void rename(AlluxioURI srcPath, AlluxioURI dstPath, RenameContext context
throws FileAlreadyExistsException, FileDoesNotExistException, InvalidPathException,
IOException, AccessControlException {
Metrics.RENAME_PATH_OPS.inc();
try (RpcContext rpcContext = createRpcContext();
try (RpcContext rpcContext = createRpcContext(context);
FileSystemMasterAuditContext auditContext =
createAuditContext("rename", srcPath, dstPath, null)) {

Expand Down Expand Up @@ -2463,7 +2468,7 @@ public void free(AlluxioURI path, FreeContext context)
UnexpectedAlluxioException, IOException {
Metrics.FREE_FILE_OPS.inc();
// No need to syncMetadata before free.
try (RpcContext rpcContext = createRpcContext();
try (RpcContext rpcContext = createRpcContext(context);
LockedInodePath inodePath = mInodeTree.lockFullInodePath(path, LockPattern.WRITE_INODE);
FileSystemMasterAuditContext auditContext =
createAuditContext("free", path, null, inodePath.getInodeOrNull())) {
Expand Down Expand Up @@ -2669,7 +2674,7 @@ public void updateMount(AlluxioURI alluxioPath, MountContext context)
IOException, AccessControlException {
LockingScheme lockingScheme = createLockingScheme(alluxioPath,
context.getOptions().getCommonOptions(), LockPattern.WRITE_EDGE);
try (RpcContext rpcContext = createRpcContext();
try (RpcContext rpcContext = createRpcContext(context);
LockedInodePath inodePath = mInodeTree
.lockInodePath(lockingScheme.getPath(), lockingScheme.getPattern());
FileSystemMasterAuditContext auditContext = createAuditContext(
Expand All @@ -2695,7 +2700,7 @@ public void mount(AlluxioURI alluxioPath, AlluxioURI ufsPath, MountContext conte
throws FileAlreadyExistsException, FileDoesNotExistException, InvalidPathException,
IOException, AccessControlException {
Metrics.MOUNT_OPS.inc();
try (RpcContext rpcContext = createRpcContext();
try (RpcContext rpcContext = createRpcContext(context);
FileSystemMasterAuditContext auditContext =
createAuditContext("mount", alluxioPath, null, null)) {

Expand Down Expand Up @@ -2874,7 +2879,7 @@ public void setAcl(AlluxioURI path, SetAclAction action, List<AclEntry> entries,
SetAclContext context)
throws FileDoesNotExistException, AccessControlException, InvalidPathException, IOException {
Metrics.SET_ACL_OPS.inc();
try (RpcContext rpcContext = createRpcContext();
try (RpcContext rpcContext = createRpcContext(context);
FileSystemMasterAuditContext auditContext =
createAuditContext("setAcl", path, null, null)) {

Expand Down Expand Up @@ -3033,10 +3038,7 @@ private void setAclRecursive(RpcContext rpcContext, SetAclAction action,
if (context.getOptions().getRecursive()) {
try (LockedInodePathList descendants = mInodeTree.getDescendants(inodePath)) {
for (LockedInodePath childPath : descendants) {
// Fail if the client has cancelled the rpc.
if (context.isCancelled()) {
throw new RuntimeException("Call cancelled.");
}
rpcContext.throwIfCancelled();
setAclSingleInode(rpcContext, action, childPath, entries, replay, opTimeMs);
}
}
Expand Down Expand Up @@ -3076,7 +3078,7 @@ public void setAttribute(AlluxioURI path, SetAttributeContext context)
} else {
commandName = "setAttribute";
}
try (RpcContext rpcContext = createRpcContext();
try (RpcContext rpcContext = createRpcContext(context);
FileSystemMasterAuditContext auditContext =
createAuditContext(commandName, path, null, null)) {

Expand Down Expand Up @@ -3157,10 +3159,7 @@ private void setAttributeInternal(RpcContext rpcContext, LockedInodePath inodePa
if (context.getOptions().getRecursive() && targetInode.isDirectory()) {
try (LockedInodePathList descendants = mInodeTree.getDescendants(inodePath)) {
for (LockedInodePath childPath : descendants) {
// Fail if the client has cancelled the rpc.
if (context.isCancelled()) {
throw new RuntimeException("Call cancelled.");
}
rpcContext.throwIfCancelled();
setAttributeSingleFile(rpcContext, childPath, true, opTimeMs, context);
}
}
Expand All @@ -3171,7 +3170,7 @@ private void setAttributeInternal(RpcContext rpcContext, LockedInodePath inodePa
@Override
public void scheduleAsyncPersistence(AlluxioURI path, ScheduleAsyncPersistenceContext context)
throws AlluxioException, UnavailableException {
try (RpcContext rpcContext = createRpcContext();
try (RpcContext rpcContext = createRpcContext(context);
LockedInodePath inodePath = mInodeTree.lockFullInodePath(path, LockPattern.WRITE_INODE)) {
scheduleAsyncPersistenceInternal(inodePath, context, rpcContext);
}
Expand Down Expand Up @@ -4323,17 +4322,23 @@ private void removeBlocks(List<Long> blocks) throws IOException {
throw new IOException("Failed to remove deleted blocks from block master", lastThrown);
}

@Override
public CallTracker composeCallTracker(CallTracker transportTracker) {
return new CompositeCallTracker(transportTracker, mStateLockCallTracker);
/**
* @return a context for executing an RPC
*/
@VisibleForTesting
public RpcContext createRpcContext() throws UnavailableException {
return createRpcContext(new InternalOperationContext());
}

/**
* @param operationContext the operation context
* @return a context for executing an RPC
*/
@VisibleForTesting
public RpcContext createRpcContext() throws UnavailableException {
return new RpcContext(createBlockDeletionContext(), createJournalContext());
public RpcContext createRpcContext(OperationContext operationContext)
throws UnavailableException {
return new RpcContext(createBlockDeletionContext(), createJournalContext(),
operationContext.withTracker(mStateLockCallTracker));
}

private LockingScheme createLockingScheme(AlluxioURI path, FileSystemMasterCommonPOptions options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import alluxio.exception.status.InvalidArgumentException;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.SetAclAction;
import alluxio.master.file.contexts.CallTracker;
import alluxio.master.Master;
import alluxio.master.file.contexts.CheckConsistencyContext;
import alluxio.master.file.contexts.CompleteFileContext;
Expand Down Expand Up @@ -585,12 +584,4 @@ void activeSyncMetadata(AlluxioURI path, Collection<AlluxioURI> changedFiles,
* @return the owner of the root inode, null if the inode tree is not initialized
*/
String getRootInodeOwner();

/**
* Composes an FSM call-tracker over given transport tracker.
*
* @param transportTracker the transport level call-tracker
* @return a composed call-tracker
*/
CallTracker composeCallTracker(CallTracker transportTracker);
}
Loading

0 comments on commit f41e6a7

Please sign in to comment.