Skip to content

Commit

Permalink
[KYUUBI #5575][AUTHZ] Support InsertIntoDataSourceDir/InsertIntoHiveD…
Browse files Browse the repository at this point in the history
…irCommand path check

### _Why are the changes needed?_
To close #5575
Support InsertIntoDataSourceDir/InsertIntoHiveDirCommand path check

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_
No

Closes #5578 from AngersZhuuuu/KYUUBI-5575.

Closes #5575

4ac2d3a [Angerszhuuuu] Update PrivilegesBuilderSuite.scala
5afe8ed [Angerszhuuuu] Merge branch 'master' into KYUUBI-5575
a18e1d3 [Angerszhuuuu] update
8e80d3d [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
2b1332d [Angerszhuuuu] follow comment
9e56267 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
7c95b4d [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
f47eb63 [Angerszhuuuu] Update uriExtractors.scala
1e0a6b5 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
c8fd045 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
4e1c927 [Angerszhuuuu] [KYUUBI #5575][AUTHZ] Support InsertIntoDataSourceDir/InsertIntoHiveDirCommand path check

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
AngersZhuuuu authored and pan3793 committed Nov 2, 2023
1 parent d7be197 commit ea9a78f
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
# limitations under the License.
#

org.apache.kyuubi.plugin.spark.authz.serde.CatalogStorageFormatURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.StringURIExtractor
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,11 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "storage",
"fieldExtractor" : "CatalogStorageFormatURIExtractor",
"isInput" : false
} ]
}, {
"classname" : "org.apache.spark.sql.execution.command.LoadDataCommand",
"tableDescs" : [ {
Expand Down Expand Up @@ -1370,7 +1374,11 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "storage",
"fieldExtractor" : "CatalogStorageFormatURIExtractor",
"isInput" : false
} ]
}, {
"classname" : "org.apache.spark.sql.hive.execution.InsertIntoHiveTable",
"tableDescs" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,11 @@ object PrivilegesBuilder {
}
spec.uriDescs.foreach { ud =>
try {
val uri = ud.extract(plan)
uri match {
case Some(uri) =>
if (ud.isInput) {
inputObjs += PrivilegeObject(uri)
} else {
outputObjs += PrivilegeObject(uri)
}
case None =>
val uris = ud.extract(plan)
if (ud.isInput) {
inputObjs ++= uris.map(PrivilegeObject(_))
} else {
outputObjs ++= uris.map(PrivilegeObject(_))
}
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ case class UriDesc(
fieldName: String,
fieldExtractor: String,
isInput: Boolean = false) extends Descriptor {
override def extract(v: AnyRef): Option[Uri] = {
override def extract(v: AnyRef): Seq[Uri] = {
val uriVal = invokeAs[AnyRef](v, fieldName)
val uriExtractor = lookupExtractor[URIExtractor](fieldExtractor)
uriExtractor(uriVal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.kyuubi.plugin.spark.authz.serde

trait URIExtractor extends (AnyRef => Option[Uri]) with Extractor
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat

trait URIExtractor extends (AnyRef => Seq[Uri]) with Extractor

object URIExtractor {
val uriExtractors: Map[String, URIExtractor] = {
Expand All @@ -29,7 +31,13 @@ object URIExtractor {
* String
*/
class StringURIExtractor extends URIExtractor {
override def apply(v1: AnyRef): Option[Uri] = {
Some(Uri(v1.asInstanceOf[String]))
override def apply(v1: AnyRef): Seq[Uri] = {
Seq(Uri(v1.asInstanceOf[String]))
}
}

class CatalogStorageFormatURIExtractor extends URIExtractor {
override def apply(v1: AnyRef): Seq[Uri] = {
v1.asInstanceOf[CatalogStorageFormat].locationUri.map(uri => Uri(uri.getPath)).toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1467,7 +1467,15 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite {
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)

assert(out.isEmpty)
assert(out.size == 1)
val po1 = out.head
assert(po1.actionType === PrivilegeObjectActionType.OTHER)
assert(po1.privilegeObjectType === PrivilegeObjectType.DFS_URL)
assert(po1.dbname === directory.path)
assert(po1.objectName === null)
assert(po1.columns === Seq.empty)
val accessType1 = ranger.AccessType(po1, operationType, isInput = true)
assert(accessType1 == AccessType.SELECT)
}

test("InsertIntoDataSourceCommand") {
Expand Down Expand Up @@ -1591,7 +1599,15 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite {
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)

assert(out.isEmpty)
assert(out.size == 1)
val po1 = out.head
assert(po1.actionType === PrivilegeObjectActionType.OTHER)
assert(po1.privilegeObjectType === PrivilegeObjectType.DFS_URL)
assert(po1.dbname === directory.path)
assert(po1.objectName === null)
assert(po1.columns === Seq.empty)
val accessType1 = ranger.AccessType(po1, operationType, isInput = true)
assert(accessType1 == AccessType.SELECT)
}

test("InsertIntoHiveDirCommand") {
Expand All @@ -1616,7 +1632,15 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite {
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)

assert(out.isEmpty)
assert(out.size == 1)
val po1 = out.head
assert(po1.actionType === PrivilegeObjectActionType.OTHER)
assert(po1.privilegeObjectType === PrivilegeObjectType.DFS_URL)
assert(po1.dbname === directory.path)
assert(po1.objectName === null)
assert(po1.columns === Seq.empty)
val accessType1 = ranger.AccessType(po1, operationType, isInput = true)
assert(accessType1 == AccessType.SELECT)
}

test("InsertIntoHiveTableCommand") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,13 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
val InsertIntoDataSourceDir = {
val cmd = "org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand"
val queryDesc = queryQueryDesc
val uriDesc = UriDesc("storage", classOf[CatalogStorageFormatURIExtractor])
TableCommandSpec(cmd, Nil, queryDescs = Seq(queryDesc), uriDescs = Seq(uriDesc))
}

val SaveIntoDataSourceCommand = {
val cmd = "org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand"
val queryDesc = queryQueryDesc
TableCommandSpec(cmd, Nil, queryDescs = Seq(queryDesc))
}

Expand Down Expand Up @@ -659,8 +666,7 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
DropTableV2,
InsertIntoDataSource,
InsertIntoDataSourceDir,
InsertIntoDataSourceDir.copy(classname =
"org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand"),
SaveIntoDataSourceCommand,
InsertIntoHadoopFsRelationCommand,
InsertIntoDataSourceDir.copy(classname =
"org.apache.spark.sql.hive.execution.InsertIntoHiveDirCommand"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.plugin.spark.authz.ranger

import java.nio.file.Path

import scala.util.Try

import org.apache.hadoop.security.UserGroupInformation
Expand All @@ -30,6 +32,7 @@ import org.scalatest.BeforeAndAfterAll
// scalastyle:off
import org.scalatest.funsuite.AnyFunSuite

import org.apache.kyuubi.Utils
import org.apache.kyuubi.plugin.spark.authz.{AccessControlException, SparkSessionProvider}
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
Expand Down Expand Up @@ -90,6 +93,14 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite
}
}

protected def withTempDir(f: Path => Unit): Unit = {
val dir = Utils.createTempDir()
try f(dir)
finally {
Utils.deleteDirectoryRecursively(dir.toFile)
}
}

/**
* Enables authorizing in single call mode,
* and disables authorizing in single call mode after calling `f`
Expand Down Expand Up @@ -1032,4 +1043,46 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
}
}
}

test("InsertIntoHiveDirCommand") {
val db1 = defaultDb
val table1 = "table1"
withTempDir { path =>
withSingleCallEnabled {
withCleanTmpResources(Seq((s"$db1.$table1", "table"))) {
doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int, scope int)"))
interceptContains[AccessControlException](doAs(
someone,
sql(
s"""
|INSERT OVERWRITE DIRECTORY '$path'
|ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
|SELECT * FROM $db1.$table1""".stripMargin)))(
s"does not have [select] privilege on [$db1/$table1/id,$db1/$table1/scope," +
s"[$path, $path/]]")
}
}
}
}

test("InsertIntoDataSourceDirCommand") {
val db1 = defaultDb
val table1 = "table1"
withTempDir { path =>
withSingleCallEnabled {
withCleanTmpResources(Seq((s"$db1.$table1", "table"))) {
doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int, scope int)"))
interceptContains[AccessControlException](doAs(
someone,
sql(
s"""
|INSERT OVERWRITE DIRECTORY '$path'
|USING parquet
|SELECT * FROM $db1.$table1""".stripMargin)))(
s"does not have [select] privilege on [$db1/$table1/id,$db1/$table1/scope," +
s"[$path, $path/]]")
}
}
}
}
}

0 comments on commit ea9a78f

Please sign in to comment.