Skip to content

Commit

Permalink
[SPARK-44272][YARN] Path Inconsistency when Operating statCache withi…
Browse files Browse the repository at this point in the history
…n Yarn Client

### What changes were proposed in this pull request?

1. Change `statCache.getOrElse` to `statCache.getOrElseUpdate` so that the corresponding FileStatus can be cached into `statCache`
2. Change the `Path` parameter `isPublic`,  `checkPermissionOfOther`, and `ancestorsHaveExecutePermissions` to `URI`.
3. Add `getParentURI` method when we construct the parent URI.

### Why are the changes needed?

We should not use `uri.getPath()` when constructing the Path which will not retain information like scheme. This means that `statCache` is not really taking any effect.
For example, if uri is "file:/foo.invalid.com:8080/tmp/testing", then

```
uri.getPath -> /foo.invalid.com:8080/tmp/testing
uri.toString -> file:/foo.invalid.com:8080/tmp/testing
```
Please also see more details from JIRA [ticket](https://issues.apache.org/jira/browse/SPARK-44272).
### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add additional UT to validate the FileStatus is cached as expected.

Closes #41821 from shuwang21/fixcache.

Lead-authored-by: Shu Wang <swang7@linkedin.com>
Co-authored-by: Shu Wang <wangshu1990@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
2 people authored and Mridul Muralidharan committed Jul 19, 2023
1 parent 9b43a9f commit 0879a25
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
link: String,
statCache: Map[URI, FileStatus],
appMasterOnly: Boolean = false): Unit = {
val destStatus = statCache.getOrElse(destPath.toUri(), fs.getFileStatus(destPath))
val destStatus = getFileStatus(fs, destPath.toUri, statCache)
val amJarRsrc = Records.newRecord(classOf[LocalResource])
amJarRsrc.setType(resourceType)
val visibility = getVisibility(conf, destPath.toUri(), statCache)
Expand Down Expand Up @@ -119,46 +119,61 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
*/
private def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
val fs = FileSystem.get(uri, conf)
val current = new Path(uri.getPath())
// the leaf level file should be readable by others
if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
if (!checkPermissionOfOther(fs, uri, FsAction.READ, statCache)) {
return false
}
ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
ancestorsHaveExecutePermissions(fs, getParentURI(uri), statCache)
}

/**
* Returns true if all ancestors of the specified path have the 'execute'
* Get the Parent URI of the given URI. Notes that the query & fragment of original URI will not
* be inherited when obtaining parent URI.
*
* @return the parent URI, null if the given uri is the root
*/
private[yarn] def getParentURI(uri: URI): URI = {
val path = new Path(uri.toString)
val parent = path.getParent()
if (parent == null) {
null
} else {
parent.toUri()
}
}

/**
* Returns true if all ancestors of the specified uri have the 'execute'
* permission set for all users (i.e. that other users can traverse
* the directory hierarchy to the given path)
* the directory hierarchy to the given uri)
* @return true if all ancestors have the 'execute' permission set for all users
*/
private def ancestorsHaveExecutePermissions(
fs: FileSystem,
path: Path,
uri: URI,
statCache: Map[URI, FileStatus]): Boolean = {
var current = path
var current = uri
while (current != null) {
// the subdirs in the path should have execute permissions for others
// the subdirs in the corresponding uri path should have execute permissions for others
if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
return false
}
current = current.getParent()
current = getParentURI(current)
}
true
}

/**
* Checks for a given path whether the Other permissions on it
* Checks for a given URI whether the Other permissions on it
* imply the permission in the passed FsAction
* @return true if the path in the uri is visible to all, false otherwise
*/
private def checkPermissionOfOther(
fs: FileSystem,
path: Path,
uri: URI,
action: FsAction,
statCache: Map[URI, FileStatus]): Boolean = {
val status = getFileStatus(fs, path.toUri(), statCache)
val status = getFileStatus(fs, uri, statCache)
val perms = status.getPermission()
val otherAction = perms.getOtherAction()
otherAction.implies(action)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn

import java.net.URI

import scala.collection.mutable
import scala.collection.mutable.HashMap
import scala.collection.mutable.Map

Expand All @@ -44,6 +45,60 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
}
}

test("SPARK-44272: test addResource added FileStatus to statCache and getVisibility can read" +
" from statCache") {
val distMgr = new ClientDistributedCacheManager() {
override private[yarn] def getFileStatus(fs: FileSystem, uri: URI,
statCache: mutable.Map[URI, FileStatus]): FileStatus = {
statCache.getOrElseUpdate(uri, new FileStatus())
}
}
val fs = mock[FileSystem]
val conf = new Configuration()
val destPathA = new Path("file:///foo.invalid.com:8080/tmp/A")
val localResources = HashMap[String, LocalResource]()
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
distMgr.addResource(fs, conf, destPathA, localResources, LocalResourceType.FILE, "link",
statCache, false)
assert(statCache.size === 2)
assert(statCache.contains(destPathA.toUri))
assert(statCache.contains(destPathA.getParent.toUri))

val destPathB = new Path("file:///foo.invalid.com:8080/tmp/B")
distMgr.addResource(fs, conf, destPathB, localResources, LocalResourceType.FILE, "link",
statCache, false)
assert(statCache.size === 3)
assert(statCache.contains(destPathB.toUri))

val destPathC = new Path("file:///foo.invalid.com:8080/root/C")
distMgr.addResource(fs, conf, destPathC, localResources, LocalResourceType.FILE, "link",
statCache, false)
assert(statCache.size === 5)
assert(statCache.contains(destPathC.toUri))
assert(statCache.contains(destPathC.getParent.toUri))
}

test("SPARK-44272: test getParentURI") {
val distMgr = new ClientDistributedCacheManager()
val scheme = "file"
val userInfo = "user"
val host = "foo.com"
val port = 8080
val path = "/tmp/testing"
val uri = new URI(scheme, userInfo, host, port, path, null, null)
val parentURI = distMgr.getParentURI(uri)
assert(uri.getScheme === parentURI.getScheme)
assert(uri.getUserInfo === parentURI.getUserInfo)
assert(uri.getHost === parentURI.getHost)
assert(uri.getPort === parentURI.getPort)
assert(new Path(uri.getPath).getParent.toString === parentURI.getPath)

val rootPath = "/"
val parentRootURI = distMgr.getParentURI(
new URI(scheme, userInfo, host, port, rootPath, null, null))
assert(parentRootURI === null)
}

test("test getFileStatus empty") {
val distMgr = new ClientDistributedCacheManager()
val fs = mock[FileSystem]
Expand Down

0 comments on commit 0879a25

Please sign in to comment.