Skip to content

Commit

Permalink
[KYUUBI #5565][AUTHZ] Support Delete/Insert/Update table command for …
Browse files Browse the repository at this point in the history
…Delta Lake

### _Why are the changes needed?_
To close #5565.
- Support Delete from table command for Delta Lake in Authz.
- Support Insert table command for Delta Lake in Authz.
- Support Update table command for Delta Lake in Authz.
- Reduce the fields of `createTableSql`.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_
No.

Closes #5596 from zml1206/KYUUBI-5565-2.

Closes #5565

452d7d8 [zml1206] ut add do as with admin
2572005 [zml1206] improve SubqueryAliasTableExtractor
e2a3fe0 [zml1206] Support Delete/Insert/Update table command for Delta Lake

Authored-by: zml1206 <zhuml1206@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
zml1206 authored and yaooqinn committed Nov 3, 2023
1 parent ebc98fe commit 97fd5b7
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedIdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedTableTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.StringTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.SubqueryAliasTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.TableIdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.TableTableExtractor
Original file line number Diff line number Diff line change
Expand Up @@ -1997,4 +1997,68 @@
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.delta.commands.DeleteCommand",
"tableDescs" : [ {
"fieldName" : "catalogTable",
"fieldExtractor" : "CatalogTableOptionTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
}, {
"fieldName" : "target",
"fieldExtractor" : "SubqueryAliasTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.delta.commands.UpdateCommand",
"tableDescs" : [ {
"fieldName" : "catalogTable",
"fieldExtractor" : "CatalogTableOptionTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
}, {
"fieldName" : "target",
"fieldExtractor" : "SubqueryAliasTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,20 @@ class ResolvedIdentifierTableExtractor extends TableExtractor {
}
}

/**
* org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
*/
class SubqueryAliasTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
v1.asInstanceOf[SubqueryAlias] match {
case SubqueryAlias(_, SubqueryAlias(identifier, _)) =>
lookupExtractor[StringTableExtractor].apply(spark, identifier.toString())
case SubqueryAlias(identifier, _) =>
lookupExtractor[StringTableExtractor].apply(spark, identifier.toString())
}
}
}

/**
* org.apache.spark.sql.connector.catalog.Table
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.spark.authz.gen

import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
import org.apache.kyuubi.plugin.spark.authz.serde._

object DeltaCommands extends CommandSpecs[TableCommandSpec] {

val DeleteCommand = {
val cmd = "org.apache.spark.sql.delta.commands.DeleteCommand"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
val tableDesc = TableDesc(
"catalogTable",
classOf[CatalogTableOptionTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc))
val targetDesc = TableDesc(
"target",
classOf[SubqueryAliasTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc, targetDesc))
}

val UpdateCommand = {
val cmd = "org.apache.spark.sql.delta.commands.UpdateCommand"
DeleteCommand.copy(classname = cmd)
}

override def specs: Seq[TableCommandSpec] = Seq(
DeleteCommand,
UpdateCommand)
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class JsonSpecFileGenerator extends AnyFunSuite {
// scalastyle:on
test("check spec json files") {
writeCommandSpecJson("database", Seq(DatabaseCommands))
writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands, HudiCommands))
writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands, HudiCommands, DeltaCommands))
writeCommandSpecJson("function", Seq(FunctionCommands))
writeCommandSpecJson("scan", Seq(Scans))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
val table1 = "table1_delta"
val table2 = "table2_delta"

def createTableSql(namespace: String, table: String): String =
s"""
|CREATE TABLE IF NOT EXISTS $namespace.$table (
| id INT,
| name STRING,
| gender STRING,
| birthDate TIMESTAMP
|)
|USING DELTA
|PARTITIONED BY (gender)
|""".stripMargin

override def withFixture(test: NoArgTest): Outcome = {
test()
}
Expand Down Expand Up @@ -66,35 +78,17 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table1 (
| id INT,
| firstName STRING,
| middleName STRING,
| lastName STRING,
| name STRING,
| gender STRING,
| birthDate TIMESTAMP,
| ssn STRING,
| salary INT
| birthDate TIMESTAMP
|) USING DELTA
|""".stripMargin
interceptContains[AccessControlException] {
doAs(someone, sql(createNonPartitionTableSql))
}(s"does not have [create] privilege on [$namespace1/$table1]")
doAs(admin, sql(createNonPartitionTableSql))

val createPartitionTableSql =
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table2 (
| id INT,
| firstName STRING,
| middleName STRING,
| lastName STRING,
| gender STRING,
| birthDate TIMESTAMP,
| ssn STRING,
| salary INT
|)
|USING DELTA
|PARTITIONED BY (gender)
|""".stripMargin
val createPartitionTableSql = createTableSql(namespace1, table2)
interceptContains[AccessControlException] {
doAs(someone, sql(createPartitionTableSql))
}(s"does not have [create] privilege on [$namespace1/$table2]")
Expand All @@ -109,13 +103,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
s"""
|CREATE OR REPLACE TABLE $namespace1.$table1 (
| id INT,
| firstName STRING,
| middleName STRING,
| lastName STRING,
| name STRING,
| gender STRING,
| birthDate TIMESTAMP,
| ssn STRING,
| salary INT
| birthDate TIMESTAMP
|) USING DELTA
|""".stripMargin
interceptContains[AccessControlException] {
Expand All @@ -128,23 +118,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
test("alter table") {
withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), (s"$namespace1", "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(
admin,
sql(
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table1 (
| id INT,
| firstName STRING,
| middleName STRING,
| lastName STRING,
| gender STRING,
| birthDate TIMESTAMP,
| ssn STRING,
| salary INT
|)
|USING DELTA
|PARTITIONED BY (gender)
|""".stripMargin))
doAs(admin, sql(createTableSql(namespace1, table1)))

// add columns
interceptContains[AccessControlException](
Expand All @@ -164,7 +138,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
doAs(
someone,
sql(s"ALTER TABLE $namespace1.$table1" +
s" REPLACE COLUMNS (id INT, firstName STRING)")))(
s" REPLACE COLUMNS (id INT, name STRING)")))(
s"does not have [alter] privilege on [$namespace1/$table1]")

// rename column
Expand All @@ -189,6 +163,64 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
s"does not have [alter] privilege on [$namespace1/$table1]")
}
}

test("delete from table") {
withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), (s"$namespace1", "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(admin, sql(createTableSql(namespace1, table1)))
val deleteFromTableSql = s"DELETE FROM $namespace1.$table1 WHERE birthDate < '1955-01-01'"
interceptContains[AccessControlException](
doAs(someone, sql(deleteFromTableSql)))(
s"does not have [update] privilege on [$namespace1/$table1]")
doAs(admin, sql(deleteFromTableSql))
}
}

test("insert table") {
withSingleCallEnabled {
withCleanTmpResources(Seq(
(s"$namespace1.$table1", "table"),
(s"$namespace1.$table2", "table"),
(s"$namespace1", "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(admin, sql(createTableSql(namespace1, table1)))
doAs(admin, sql(createTableSql(namespace1, table2)))

// insert into
val insertIntoSql = s"INSERT INTO $namespace1.$table1" +
s" SELECT * FROM $namespace1.$table2"
interceptContains[AccessControlException](
doAs(someone, sql(insertIntoSql)))(
s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," +
s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
s" [update] privilege on [$namespace1/$table1]")
doAs(admin, sql(insertIntoSql))

// insert overwrite
val insertOverwriteSql = s"INSERT OVERWRITE $namespace1.$table1" +
s" SELECT * FROM $namespace1.$table2"
interceptContains[AccessControlException](
doAs(someone, sql(insertOverwriteSql)))(
s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," +
s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
s" [update] privilege on [$namespace1/$table1]")
doAs(admin, sql(insertOverwriteSql))
}
}
}

test("update table") {
withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), (s"$namespace1", "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(admin, sql(createTableSql(namespace1, table1)))
val updateTableSql = s"UPDATE $namespace1.$table1" +
s" SET gender = 'Female' WHERE gender = 'F'"
interceptContains[AccessControlException](
doAs(someone, sql(updateTableSql)))(
s"does not have [update] privilege on [$namespace1/$table1]")
doAs(admin, sql(updateTableSql))
}
}
}

object DeltaCatalogRangerSparkExtensionSuite {
Expand Down

0 comments on commit 97fd5b7

Please sign in to comment.