Skip to content

Commit

Permalink
Address the comments
Browse files Browse the repository at this point in the history
Change-Id: Id11665cfac20c5112b52868ebbc67d40f676f528
  • Loading branch information
jerryshao committed Jan 5, 2017
1 parent c0c1933 commit f4357e8
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.map { d => Utils.resolveURI(d).toString }
.getOrElse(DEFAULT_LOG_DIR)

private val HISTORY_UI_ACLS_ENABLE = conf.getBoolean("spark.history.ui.acls.enable", false)
private val HISTORY_UI_ADMIN_ACLS = conf.get("spark.history.ui.admin.acls", "")
private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get("spark.history.ui.admin.acls.groups", "")
logInfo(s"History server ui acls " + (if (HISTORY_UI_ACLS_ENABLE) "enabled" else "disabled") +
"; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString +
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
private val fs = Utils.getHadoopFileSystem(logDir, hadoopConf)

Expand Down Expand Up @@ -250,14 +257,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)

if (appListener.appId.isDefined) {
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE)
// make sure to set admin acls before view acls so they are properly picked up
val adminAcls = conf.get("spark.history.ui.admin.acls", "") + "," +
appListener.adminAcls.getOrElse("")
val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("")
ui.getSecurityManager.setAdminAcls(adminAcls)
ui.getSecurityManager.setViewAcls(attempt.sparkUser, appListener.viewAcls.getOrElse(""))
val adminAclsGroups = conf.get("spark.history.ui.admin.acls.groups", "") + "," +
val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," +
appListener.adminAclsGroups.getOrElse("")
ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups)
ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.io._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -475,47 +475,101 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}


test("support history server ui admin acls") {
val conf = createTestConf()
def createAndCheck(conf: SparkConf, properties: (String, String)*)
(checkFn: SecurityManager => Unit): Unit = {
// Empty the testDir for each test.
if (testDir.exists() && testDir.isDirectory) {
testDir.listFiles().foreach { f => if (f.isFile) f.delete() }
}

var provider: FsHistoryProvider = null
try {
provider = new FsHistoryProvider(conf)
val log = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log, true, None,
SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(),
"test", Some("attempt1")),
SparkListenerEnvironmentUpdate(Map(
"Spark Properties" -> properties.toSeq,
"JVM Information" -> Seq.empty,
"System Properties" -> Seq.empty,
"Classpath Entries" -> Seq.empty
)),
SparkListenerApplicationEnd(System.currentTimeMillis()))

provider.checkForLogs()
val appUi = provider.getAppUI("app1", Some("attempt1"))

assert(appUi.nonEmpty)
val securityManager = appUi.get.ui.securityManager
checkFn(securityManager)
} finally {
if (provider != null) {
provider.stop()
}
}
}

// Test both history ui admin acls and application acls are configured.
val conf1 = createTestConf()
.set("spark.history.ui.acls.enable", "true")
.set("spark.history.ui.admin.acls", "user1,user2")
.set("spark.history.ui.admin.acls.groups", "group1")
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)

val provider = new FsHistoryProvider(conf)

val log = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log, true, None,
SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(),
"test", Some("attempt1")),
SparkListenerEnvironmentUpdate(Map(
"Spark Properties" -> Seq(
("spark.admin.acls", "user"),
("spark.admin.acls.groups", "group")),
"JVM Information" -> Seq.empty,
"System Properties" -> Seq.empty,
"Classpath Entries" -> Seq.empty
)),
SparkListenerApplicationEnd(System.currentTimeMillis()))

provider.checkForLogs()
val appUi = provider.getAppUI("app1", Some("attempt1"))

assert (appUi.nonEmpty)
val securityManager = appUi.get.ui.securityManager
createAndCheck(conf1, ("spark.admin.acls", "user"), ("spark.admin.acls.groups", "group")) {
securityManager =>
// Test whether user has permission to access UI.
securityManager.checkUIViewPermissions("user1") should be (true)
securityManager.checkUIViewPermissions("user2") should be (true)
securityManager.checkUIViewPermissions("user") should be (true)
securityManager.checkUIViewPermissions("abc") should be (false)

// Test whether user with admin group has permission to access UI.
securityManager.checkUIViewPermissions("user3") should be (true)
securityManager.checkUIViewPermissions("user4") should be (true)
securityManager.checkUIViewPermissions("user5") should be (true)
securityManager.checkUIViewPermissions("user6") should be (false)
}

// Test whether user has permission to access UI.
securityManager.checkUIViewPermissions("user1") should be (true)
securityManager.checkUIViewPermissions("user2") should be (true)
securityManager.checkUIViewPermissions("user") should be (true)
securityManager.checkUIViewPermissions("abc") should be (false)
// Test only history ui admin acls are configured.
val conf2 = createTestConf()
.set("spark.history.ui.acls.enable", "true")
.set("spark.history.ui.admin.acls", "user1,user2")
.set("spark.history.ui.admin.acls.groups", "group1")
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
createAndCheck(conf2) { securityManager =>
// Test whether user has permission to access UI.
securityManager.checkUIViewPermissions("user1") should be (true)
securityManager.checkUIViewPermissions("user2") should be (true)
// Check the unknown "user" should return false
securityManager.checkUIViewPermissions("user") should be (false)

// Test whether user with admin group has permission to access UI.
securityManager.checkUIViewPermissions("user3") should be (true)
securityManager.checkUIViewPermissions("user4") should be (true)
// Check the "user5" without mapping relation should return false
securityManager.checkUIViewPermissions("user5") should be (false)
}

// Test whether user with admin group has permission to access UI.
securityManager.checkUIViewPermissions("user3") should be (true)
securityManager.checkUIViewPermissions("user4") should be (true)
securityManager.checkUIViewPermissions("user5") should be (false)
}
// Test neither history ui admin acls nor application acls are configured.
val conf3 = createTestConf()
.set("spark.history.ui.acls.enable", "true")
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
createAndCheck(conf3) { securityManager =>
// Test whether user has permission to access UI.
securityManager.checkUIViewPermissions("user1") should be (false)
securityManager.checkUIViewPermissions("user2") should be (false)
securityManager.checkUIViewPermissions("user") should be (false)

// Test whether user with admin group has permission to access UI.
// Check should be failed since we don't have acl group settings.
securityManager.checkUIViewPermissions("user3") should be (false)
securityManager.checkUIViewPermissions("user4") should be (false)
securityManager.checkUIViewPermissions("user5") should be (false)
}
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
Expand Down Expand Up @@ -579,7 +633,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
class TestGroupsMappingProvider extends GroupMappingServiceProvider {
private val mappings = Map(
"user3" -> "group1",
"user4" -> "group1")
"user4" -> "group1",
"user5" -> "group")

override def getGroups(username: String): Set[String] = {
mappings.get(username).map(Set(_)).getOrElse(Set.empty)
Expand Down

0 comments on commit f4357e8

Please sign in to comment.