Skip to content

Commit

Permalink
[KYUUBI #3515] [Authz] support checking rewritten Iceberg commands an…
Browse files Browse the repository at this point in the history
…d skip apply Row-filter to output tables

### _Why are the changes needed?_

to close #3515.

By replacing mapChildren in `RuleApplyRowFilterAndDataMasking`to skip head of children query as insterted by iceberg in `IcebergSparkSessionExtensions` .

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3520 from bowenliang123/3515-authz-iceberg.

Closes #3515

a10fe43 [Bowen Liang] improve mapPlanChildren
5eb7f84 [Bowen Liang] fix problem after merging from master
bb1eefb [Bowen Liang] merge from master
6c6b071 [Bowen Liang] Merge commit 'ae2df990a32c695de6f0345ea7e73e51103e87e5' into 3515-authz-iceberg
dbcfe6d [Bowen Liang] restrict skipMapchiled to Iceberg command privilege builders and add skipMappedChildren method to IcebergCommands to handling them
ae2df99 [Bowen Liang] nit
0c69179 [liangbowen] update mapPlanChildren and passSparkVersionCheck
161215d [liangbowen] nit
0006dee [liangbowen] generalize passSparkVersionCheck method to AuthZUtils
4416363 [liangbowen] refactor getFieldValOpt to getFieldValOption
5b8aa40 [Bowen Liang] improvements on skippedMapChildren of mapPlanChildren
11c2e63 [liangbowen] nit
5a97194 [liangbowen] unifiying general way for ensure skipped table is in plan's children
b161d70 [Bowen Liang] nit
952c1e1 [Bowen Liang] reuse MergeIntoTable of v2Commands to MergeIntoIcebergTable of IcebergCommands
0b25bd1 [Bowen Liang] generalize mapPlanChildren for iceberg commands
04fb651 [Bowen Liang] nit
a1f33bc [Bowen Liang] update DELETE FROM TABLE ut
34d65e5 [Bowen Liang] introduce IcebergCommands for access checking iceberg table. skip head child for iceberg commands in RuleApplyRowFilterAndDataMasking to prevent marking output tables.
085bfa3 [liangbowen] repalce mapChildren in RuleApplyRowFilterAndDataMasking with mapPlanChildren method, to skip head child for iceberg UpdateIcebergTable/MergeIntoIcebergTable/DeleteFromIcebergTable
6a24501 [liangbowen] init iceberg ut

Lead-authored-by: Bowen Liang <liangbowen@gf.com.cn>
Co-authored-by: liangbowen <liangbowen@gf.com.cn>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
bowenliang123 authored and yaooqinn committed Oct 11, 2022
1 parent e80a7db commit 0c2091c
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 6 deletions.
7 changes: 7 additions & 0 deletions extensions/spark/kyuubi-spark-authz/pom.xml
Expand Up @@ -283,6 +283,13 @@
<artifactId>guava</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-${spark.binary.version}_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.spark.authz

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import org.apache.kyuubi.plugin.spark.authz.OperationType._
import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType.PrivilegeObjectActionType
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
import org.apache.kyuubi.plugin.spark.authz.v2Commands.CommandType.CommandType

/**
* Building privilege objects
* for Iceberg commands rewritten by extension
*/
object IcebergCommands extends Enumeration {

import scala.language.implicitConversions

implicit def valueToCmdPrivilegeBuilder(x: Value): CmdPrivilegeBuilder =
x.asInstanceOf[CmdPrivilegeBuilder]

/**
* check whether commandName is implemented with supported privilege builders
* and pass the requirement checks (e.g. Spark version)
*
* @param commandName name of command
* @return true if so, false else
*/
def accept(commandName: String): Boolean = {
try {
val command = IcebergCommands.withName(commandName)

// check spark version requirements
passSparkVersionCheck(command.mostVer, command.leastVer)
} catch {
case _: NoSuchElementException => false
}
}

def skipMappedChildren(plan: LogicalPlan): Seq[LogicalPlan] = {
Seq(
getFieldValOpt[LogicalPlan](plan, "table"),
getFieldValOpt[LogicalPlan](plan, "targetTable"),
getFieldValOpt[LogicalPlan](plan, "sourceTable"))
.flatten intersect plan.children
}

/**
* Command privilege builder
*
* @param operationType OperationType for converting accessType
* @param leastVer minimum Spark version required
* @param mostVer maximum Spark version supported
* @param commandTypes Seq of [[CommandType]] hinting privilege building
* @param buildInput input [[PrivilegeObject]] for privilege check
* @param buildOutput output [[PrivilegeObject]] for privilege check
* @param outputActionType [[PrivilegeObjectActionType]] for output [[PrivilegeObject]]
*/
case class CmdPrivilegeBuilder(
operationType: OperationType = QUERY,
leastVer: Option[String] = None,
mostVer: Option[String] = None,
commandTypes: Seq[CommandType] = Seq.empty,
buildInput: (LogicalPlan, ArrayBuffer[PrivilegeObject], Seq[CommandType]) => Unit =
v2Commands.defaultBuildInput,
buildOutput: (
LogicalPlan,
ArrayBuffer[PrivilegeObject],
Seq[CommandType],
PrivilegeObjectActionType) => Unit = v2Commands.defaultBuildOutput,
outputActionType: PrivilegeObjectActionType = PrivilegeObjectActionType.OTHER)
extends super.Val {

def buildPrivileges(
plan: LogicalPlan,
inputObjs: ArrayBuffer[PrivilegeObject],
outputObjs: ArrayBuffer[PrivilegeObject]): Unit = {
this.buildInput(plan, inputObjs, commandTypes)
this.buildOutput(plan, outputObjs, commandTypes, outputActionType)
}
}

// dml commands

val DeleteFromIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
operationType = ALTERTABLE_DROPPARTS,
leastVer = Some("3.2"),
commandTypes = Seq(v2Commands.CommandType.HasTableAsIdentifierOption),
outputActionType = PrivilegeObjectActionType.UPDATE)

val UpdateIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
operationType = ALTERTABLE_ADDPARTS,
leastVer = Some("3.2"),
commandTypes = Seq(v2Commands.CommandType.HasTableAsIdentifierOption),
outputActionType = PrivilegeObjectActionType.UPDATE)

val UnresolvedMergeIntoIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder()

val MergeIntoIcebergTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
buildInput = v2Commands.MergeIntoTable.buildInput,
buildOutput = v2Commands.MergeIntoTable.buildOutput)
}
Expand Up @@ -212,6 +212,9 @@ object PrivilegesBuilder {
case v2Cmd if v2Commands.accept(v2Cmd) =>
v2Commands.withName(v2Cmd).buildPrivileges(plan, inputObjs, outputObjs)

case icebergCmd if IcebergCommands.accept(icebergCmd) =>
IcebergCommands.withName(icebergCmd).buildPrivileges(plan, inputObjs, outputObjs)

case "AlterDatabasePropertiesCommand" |
"AlterDatabaseSetLocationCommand" |
"CreateDatabaseCommand" |
Expand Down
Expand Up @@ -24,16 +24,27 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.Identifier

import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType}
import org.apache.kyuubi.plugin.spark.authz.{IcebergCommands, ObjectType, OperationType}
import org.apache.kyuubi.plugin.spark.authz.util.{PermanentViewMarker, RowFilterAndDataMaskingMarker}
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._

class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[LogicalPlan] {

private def mapPlanChildren(plan: LogicalPlan)(f: LogicalPlan => LogicalPlan): LogicalPlan = {
val newChildren = plan match {
case _ if IcebergCommands.accept(plan.nodeName) =>
val skipped = IcebergCommands.skipMappedChildren(plan)
skipped ++ (plan.children diff skipped).map(f)
case _ =>
plan.children.map(f)
}
plan.withNewChildren(newChildren)
}

override def apply(plan: LogicalPlan): LogicalPlan = {
// Apply FilterAndMasking and wrap HiveTableRelation/LogicalRelation/DataSourceV2Relation with
// RowFilterAndDataMaskingMarker if it is not wrapped yet.
plan mapChildren {
mapPlanChildren(plan) {
case p: RowFilterAndDataMaskingMarker => p
case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) =>
val table = getHiveTable(hiveTableRelation)
Expand Down
Expand Up @@ -44,6 +44,8 @@ private[authz] object AuthZUtils {
}
}

def getFieldValOpt[T](o: Any, name: String): Option[T] = Try(getFieldVal[T](o, name)).toOption

def invoke(
obj: AnyRef,
methodName: String,
Expand Down Expand Up @@ -128,6 +130,19 @@ private[authz] object AuthZUtils {
SemanticVersion(SPARK_VERSION).isVersionEqualTo(targetVersionString)
}

/**
* check if spark version satisfied
* first param is option of supported most spark version,
* and secont param is option of supported least spark version
*
* @return
*/
def passSparkVersionCheck: (Option[String], Option[String]) => Boolean =
(mostSparkVersion, leastSparkVersion) => {
mostSparkVersion.forall(isSparkVersionAtMost) &&
leastSparkVersion.forall(isSparkVersionAtLeast)
}

def quoteIfNeeded(part: String): String = {
if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) {
part
Expand Down
Expand Up @@ -64,11 +64,8 @@ object v2Commands extends Enumeration {
val command = v2Commands.withName(commandName)

// check spark version requirements
def passSparkVersionCheck: Boolean =
(command.mostVer.isEmpty || isSparkVersionAtMost(command.mostVer.get)) &&
(command.leastVer.isEmpty || isSparkVersionAtLeast(command.leastVer.get))
passSparkVersionCheck(command.mostVer, command.leastVer)

passSparkVersionCheck
} catch {
case _: NoSuchElementException => false
}
Expand Down
Expand Up @@ -32,6 +32,8 @@ trait SparkSessionProvider {
protected val isSparkV33OrGreater: Boolean = isSparkVersionAtLeast("3.3")

protected val extension: SparkSessionExtensions => Unit = _ => Unit
protected val sqlExtensions: String = ""

protected lazy val spark: SparkSession = {
val metastore = {
val path = Files.createTempDirectory("hms")
Expand All @@ -46,6 +48,7 @@ trait SparkSessionProvider {
.config(
"spark.sql.warehouse.dir",
Files.createTempDirectory("spark-warehouse").toString)
.config("spark.sql.extensions", sqlExtensions)
.withExtensions(extension)
.getOrCreate()
}
Expand Down
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.plugin.spark.authz.ranger

// scalastyle:off
import java.nio.file.Files

import org.apache.kyuubi.plugin.spark.authz.AccessControlException

/**
* Tests for RangerSparkExtensionSuite
* on Iceberg catalog with DataSource V2 API.
*/
class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
override protected val catalogImpl: String = "hive"
override protected val sqlExtensions: String =
if (isSparkV32OrGreater)
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
else ""

val catalogV2 = "local"
val namespace1 = "ns1"
val table1 = "table1"
val outputTable1 = "outputTable1"

override def beforeAll(): Unit = {
if (isSparkV32OrGreater) {
spark.conf.set(
s"spark.sql.catalog.$catalogV2",
"org.apache.iceberg.spark.SparkCatalog")
spark.conf.set(s"spark.sql.catalog.$catalogV2.type", "hadoop")
spark.conf.set(
s"spark.sql.catalog.$catalogV2.warehouse",
Files.createTempDirectory("iceberg-hadoop").toString)

super.beforeAll()

doAs("admin", sql(s"CREATE DATABASE IF NOT EXISTS $catalogV2.$namespace1"))
doAs(
"admin",
sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table1" +
" (id int, name string, city string) USING iceberg"))

doAs(
"admin",
sql(s"INSERT INTO $catalogV2.$namespace1.$table1" +
" (id , name , city ) VALUES (1, 'liangbowen','Guangzhou')"))
doAs(
"admin",
sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$outputTable1" +
" (id int, name string, city string) USING iceberg"))
}
}

override def afterAll(): Unit = {
super.afterAll()
spark.sessionState.catalog.reset()
spark.sessionState.conf.clear()
}

test("[KYUUBI #3515] MERGE INTO") {
assume(isSparkV32OrGreater)

val mergeIntoSql =
s"""
|MERGE INTO $catalogV2.$namespace1.$outputTable1 AS target
|USING $catalogV2.$namespace1.$table1 AS source
|ON target.id = source.id
|WHEN MATCHED AND (target.name='delete') THEN DELETE
|WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city = source.city
""".stripMargin

// MergeIntoTable: Using a MERGE INTO Statement
val e1 = intercept[AccessControlException](
doAs(
"someone",
sql(mergeIntoSql)))
assert(e1.getMessage.contains(s"does not have [select] privilege" +
s" on [$namespace1/$table1/id]"))

try {
SparkRangerAdminPlugin.getRangerConf.setBoolean(
s"ranger.plugin.${SparkRangerAdminPlugin.getServiceType}.authorize.in.single.call",
true)
val e2 = intercept[AccessControlException](
doAs(
"someone",
sql(mergeIntoSql)))
assert(e2.getMessage.contains(s"does not have" +
s" [select] privilege" +
s" on [$namespace1/$table1/id,$namespace1/table1/name,$namespace1/$table1/city]," +
s" [update] privilege on [$namespace1/$outputTable1]"))
} finally {
SparkRangerAdminPlugin.getRangerConf.setBoolean(
s"ranger.plugin.${SparkRangerAdminPlugin.getServiceType}.authorize.in.single.call",
false)
}

doAs("admin", sql(mergeIntoSql))
}

test("[KYUUBI #3515] UPDATE TABLE") {
assume(isSparkV32OrGreater)

// UpdateTable
val e1 = intercept[AccessControlException](
doAs(
"someone",
sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " +
" WHERE id=1")))
assert(e1.getMessage.contains(s"does not have [update] privilege" +
s" on [$namespace1/$table1]"))

doAs(
"admin",
sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " +
" WHERE id=1"))
}

test("[KYUUBI #3515] DELETE FROM TABLE") {
assume(isSparkV32OrGreater)

// DeleteFromTable
val e6 = intercept[AccessControlException](
doAs("someone", sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2")))
assert(e6.getMessage.contains(s"does not have [update] privilege" +
s" on [$namespace1/$table1]"))

doAs("admin", sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2"))
}
}

0 comments on commit 0c2091c

Please sign in to comment.