Skip to content

Commit

Permalink
[SPARK-33215][WEBUI] Speed up event log download by skipping UI rebuild
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This patch separates the view permission checks from the getAppUi in FsHistoryServerProvider, thus enabling SHS to do view permissions check of a given attempt for a given user without rebuilding the UI. This is achieved by adding a method "checkUIViewPermissions(appId: String, attemptId: Option[String], user: String): Boolean" to many layers of history server components. Currently, this feature is useful for event log download.

### Why are the changes needed?
Right now, when we want to download the event logs from the spark history server, SHS will need to parse entire the event log to rebuild UI, and this is just for view permission checks. UI rebuilding is a time-consuming and memory-intensive task, especially for large logs. However, this process is unnecessary for event log download. With this patch, UI rebuild can be skipped when downloading event logs from the history server. Thus the time of downloading a GB scale event log can be reduced from several minutes to several seconds, and the memory consumption of UI rebuilding can be avoided.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added test cases to confirm the view permission checks work properly and download event logs won't trigger UI loading. Also did some manual tests to verify the download speed can be drastically improved and the authentication works properly.

Closes #30126 from baohe-zhang/bypass_ui_rebuild_for_log_download.

Authored-by: Baohe Zhang <baohe.zhang@verizonmedia.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
  • Loading branch information
Baohe Zhang authored and HeartSaVioR committed Oct 27, 2020
1 parent 9818f07 commit 4b0e23e
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 15 deletions.
Expand Up @@ -150,4 +150,11 @@ private[history] abstract class ApplicationHistoryProvider {
*/
def onUIDetached(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { }

/**
* Returns true if the given user has permission to view the UI of the given attempt.
*
* @throws NoSuchElementException if the given attempt doesn't exist
*/
def checkUIViewPermissions(appId: String, attemptId: Option[String], user: String): Boolean

}
Expand Up @@ -359,15 +359,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

val conf = this.conf.clone()
val secManager = new SecurityManager(conf)

secManager.setAcls(historyUiAclsEnable)
// make sure to set admin acls before view acls so they are properly picked up
secManager.setAdminAcls(historyUiAdminAcls ++ stringToSeq(attempt.adminAcls.getOrElse("")))
secManager.setViewAcls(attempt.info.sparkUser, stringToSeq(attempt.viewAcls.getOrElse("")))
secManager.setAdminAclsGroups(historyUiAdminAclsGroups ++
stringToSeq(attempt.adminAclsGroups.getOrElse("")))
secManager.setViewAclsGroups(stringToSeq(attempt.viewAclsGroups.getOrElse("")))
val secManager = createSecurityManager(conf, attempt)

val kvstore = try {
diskManager match {
Expand Down Expand Up @@ -461,6 +453,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

override def checkUIViewPermissions(appId: String, attemptId: Option[String],
user: String): Boolean = {
val app = load(appId)
val attempt = app.attempts.find(_.info.attemptId == attemptId).orNull
if (attempt == null) {
throw new NoSuchElementException()
}
val secManager = createSecurityManager(this.conf.clone(), attempt)
secManager.checkUIViewPermissions(user)
}

/**
* Builds the application list based on the current contents of the log directory.
* Tries to reuse as much of the data already in memory as possible, by not reading
Expand Down Expand Up @@ -1376,6 +1379,19 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
endProcessing(rootPath)
}
}

private def createSecurityManager(conf: SparkConf,
attempt: AttemptInfoWrapper): SecurityManager = {
val secManager = new SecurityManager(conf)
secManager.setAcls(historyUiAclsEnable)
// make sure to set admin acls before view acls so they are properly picked up
secManager.setAdminAcls(historyUiAdminAcls ++ stringToSeq(attempt.adminAcls.getOrElse("")))
secManager.setViewAcls(attempt.info.sparkUser, stringToSeq(attempt.viewAcls.getOrElse("")))
secManager.setAdminAclsGroups(historyUiAdminAclsGroups ++
stringToSeq(attempt.adminAclsGroups.getOrElse("")))
secManager.setViewAclsGroups(stringToSeq(attempt.viewAclsGroups.getOrElse("")))
secManager
}
}

private[history] object FsHistoryProvider {
Expand Down
Expand Up @@ -128,6 +128,11 @@ class HistoryServer(
appCache.withSparkUI(appId, attemptId)(fn)
}

override def checkUIViewPermissions(appId: String, attemptId: Option[String],
user: String): Boolean = {
provider.checkUIViewPermissions(appId, attemptId, user)
}

initialize()

/**
Expand Down
Expand Up @@ -95,6 +95,8 @@ private[spark] trait UIRoot {
.build()
}
def securityManager: SecurityManager

def checkUIViewPermissions(appId: String, attemptId: Option[String], user: String): Boolean
}

private[v1] object UIRootFromServletContext {
Expand Down Expand Up @@ -145,6 +147,19 @@ private[v1] trait BaseAppResource extends ApiRequestContext {
throw new NotFoundException(s"no such app: $appKey")
}
}

protected def checkUIViewPermissions(): Unit = {
try {
val user = httpRequest.getRemoteUser()
if (!uiRoot.checkUIViewPermissions(appId, Option(attemptId), user)) {
throw new ForbiddenException(raw"""user "$user" is not authorized""")
}
} catch {
case _: NoSuchElementException =>
val appKey = Option(attemptId).map(appId + "/" + _).getOrElse(appId)
throw new NotFoundException(s"no such app: $appKey")
}
}
}

private[v1] class ForbiddenException(msg: String) extends WebApplicationException(
Expand Down
Expand Up @@ -115,15 +115,14 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
@Path("logs")
@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
def getEventLogs(): Response = {
// Retrieve the UI for the application just to do access permission checks. For backwards
// compatibility, this code also tries with attemptId "1" if the UI without an attempt ID does
// not exist.
// For backwards compatibility, this code also tries with attemptId "1" if the UI
// without an attempt ID does not exist.
try {
withUI { _ => }
checkUIViewPermissions()
} catch {
case _: NotFoundException if attemptId == null =>
attemptId = "1"
withUI { _ => }
checkUIViewPermissions()
attemptId = null
}

Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Expand Up @@ -110,6 +110,11 @@ private[spark] class SparkUI private (
}
}

override def checkUIViewPermissions(appId: String, attemptId: Option[String],
user: String): Boolean = {
securityManager.checkUIViewPermissions(user)
}

def getApplicationInfoList: Iterator[ApplicationInfo] = {
Iterator(new ApplicationInfo(
id = appId,
Expand Down
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.deploy.history.EventLogTestHelper._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DRIVER_LOG_DFS_DIR
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.UI.{ADMIN_ACLS, ADMIN_ACLS_GROUPS, USER_GROUPS_MAPPING}
import org.apache.spark.internal.config.UI.{ADMIN_ACLS, ADMIN_ACLS_GROUPS, UI_VIEW_ACLS, UI_VIEW_ACLS_GROUPS, USER_GROUPS_MAPPING}
import org.apache.spark.io._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
Expand Down Expand Up @@ -1524,6 +1524,58 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
}
}

test("SPARK-33215: check ui view permissions without retrieving ui") {
val conf = createTestConf()
.set(HISTORY_SERVER_UI_ACLS_ENABLE, true)
.set(HISTORY_SERVER_UI_ADMIN_ACLS, Seq("user1", "user2"))
.set(HISTORY_SERVER_UI_ADMIN_ACLS_GROUPS, Seq("group1"))
.set(USER_GROUPS_MAPPING, classOf[TestGroupsMappingProvider].getName)

val provider = new FsHistoryProvider(conf)
val log = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log, None,
SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(),
"test", Some("attempt1")),
SparkListenerEnvironmentUpdate(Map(
"Spark Properties" -> List((UI_VIEW_ACLS.key, "user"), (UI_VIEW_ACLS_GROUPS.key, "group")),
"Hadoop Properties" -> Seq.empty,
"JVM Information" -> Seq.empty,
"System Properties" -> Seq.empty,
"Classpath Entries" -> Seq.empty
)),
SparkListenerApplicationEnd(System.currentTimeMillis()))

provider.checkForLogs()

// attempt2 doesn't exist
intercept[NoSuchElementException] {
provider.checkUIViewPermissions("app1", Some("attempt2"), "user1")
}
// app2 doesn't exist
intercept[NoSuchElementException] {
provider.checkUIViewPermissions("app2", Some("attempt1"), "user1")
}

// user1 and user2 are admins
assert(provider.checkUIViewPermissions("app1", Some("attempt1"), "user1"))
assert(provider.checkUIViewPermissions("app1", Some("attempt1"), "user2"))
// user3 is a member of admin group "group1"
assert(provider.checkUIViewPermissions("app1", Some("attempt1"), "user3"))
// test is the app owner
assert(provider.checkUIViewPermissions("app1", Some("attempt1"), "test"))
// user is in the app's view acls
assert(provider.checkUIViewPermissions("app1", Some("attempt1"), "user"))
// user5 is a member of the app's view acls group "group"
assert(provider.checkUIViewPermissions("app1", Some("attempt1"), "user5"))

// abc, user6, user7 don't have permissions
assert(!provider.checkUIViewPermissions("app1", Some("attempt1"), "abc"))
assert(!provider.checkUIViewPermissions("app1", Some("attempt1"), "user6"))
assert(!provider.checkUIViewPermissions("app1", Some("attempt1"), "user7"))

provider.stop()
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down
Expand Up @@ -584,6 +584,24 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
}
}

test("SPARK-33215: speed up event log download by skipping UI rebuild") {
val appId = "local-1430917381535"

stop()
init()

val port = server.boundPort
val testUrls = Seq(
s"http://localhost:$port/api/v1/applications/$appId/logs",
s"http://localhost:$port/api/v1/applications/$appId/1/logs",
s"http://localhost:$port/api/v1/applications/$appId/2/logs")

testUrls.foreach { url =>
TestUtils.httpResponseCode(new URL(url))
}
assert(server.cacheMetrics.loadCount.getCount === 0, "downloading event log shouldn't load ui")
}

test("access history application defaults to the last attempt id") {

def getRedirectUrl(url: URL): (Int, String) = {
Expand Down

0 comments on commit 4b0e23e

Please sign in to comment.