From 97fd5b79a5c72cb82592b7fef6653d4545ae9ad4 Mon Sep 17 00:00:00 2001 From: zml1206 Date: Fri, 3 Nov 2023 22:08:26 +0800 Subject: [PATCH] [KYUUBI #5565][AUTHZ] Support Delete/Insert/Update table command for Delta Lake ### _Why are the changes needed?_ To close #5565. - Support Delete from table command for Delta Lake in Authz. - Support Insert table command for Delta Lake in Authz. - Support Update table command for Delta Lake in Authz. - Reduce the fields of `createTableSql`. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No. Closes #5596 from zml1206/KYUUBI-5565-2. Closes #5565 452d7d8d4 [zml1206] ut add do as with admin 257200510 [zml1206] improve SubqueryAliasTableExtractor e2a3fe00f [zml1206] Support Delete/Insert/Update table command for Delta Lake Authored-by: zml1206 Signed-off-by: Kent Yao --- ...bi.plugin.spark.authz.serde.TableExtractor | 1 + .../main/resources/table_command_spec.json | 64 +++++++++ .../spark/authz/serde/tableExtractors.scala | 14 ++ .../spark/authz/gen/DeltaCommands.scala | 48 +++++++ .../authz/gen/JsonSpecFileGenerator.scala | 2 +- ...eltaCatalogRangerSparkExtensionSuite.scala | 122 +++++++++++------- 6 files changed, 205 insertions(+), 46 deletions(-) create mode 100644 extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor index dc35a8f5104..27775efd594 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor +++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor @@ -29,5 +29,6 @@ org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor org.apache.kyuubi.plugin.spark.authz.serde.ResolvedIdentifierTableExtractor org.apache.kyuubi.plugin.spark.authz.serde.ResolvedTableTableExtractor org.apache.kyuubi.plugin.spark.authz.serde.StringTableExtractor +org.apache.kyuubi.plugin.spark.authz.serde.SubqueryAliasTableExtractor org.apache.kyuubi.plugin.spark.authz.serde.TableIdentifierTableExtractor org.apache.kyuubi.plugin.spark.authz.serde.TableTableExtractor diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json index 5c2dcd09b66..9a1c0c34c39 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json +++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json @@ -1997,4 +1997,68 @@ "opType" : "QUERY", "queryDescs" : [ ], "uriDescs" : [ ] +}, { + "classname" : "org.apache.spark.sql.delta.commands.DeleteCommand", + "tableDescs" : [ { + "fieldName" : "catalogTable", + "fieldExtractor" : "CatalogTableOptionTableExtractor", + "columnDesc" : null, + "actionTypeDesc" : { + "fieldName" : null, + "fieldExtractor" : null, + "actionType" : "UPDATE" + }, + "tableTypeDesc" : null, + "catalogDesc" : null, + "isInput" : false, + "setCurrentDatabaseIfMissing" : false + }, { + "fieldName" : "target", + "fieldExtractor" : "SubqueryAliasTableExtractor", + "columnDesc" : null, + "actionTypeDesc" : { + "fieldName" : null, + "fieldExtractor" : null, + "actionType" : "UPDATE" + }, + "tableTypeDesc" : null, + "catalogDesc" : null, + "isInput" : false, + "setCurrentDatabaseIfMissing" : false + } ], + "opType" : "QUERY", + "queryDescs" : [ ], + "uriDescs" : [ ] +}, { + "classname" : "org.apache.spark.sql.delta.commands.UpdateCommand", + "tableDescs" : [ { + "fieldName" : "catalogTable", + "fieldExtractor" : "CatalogTableOptionTableExtractor", + "columnDesc" : null, + "actionTypeDesc" : { + "fieldName" : null, + "fieldExtractor" : null, + "actionType" : "UPDATE" + }, + "tableTypeDesc" : null, + "catalogDesc" : null, + "isInput" : false, + "setCurrentDatabaseIfMissing" : false + }, { + "fieldName" : "target", + "fieldExtractor" : "SubqueryAliasTableExtractor", + "columnDesc" : null, + "actionTypeDesc" : { + "fieldName" : null, + "fieldExtractor" : null, + "actionType" : "UPDATE" + }, + "tableTypeDesc" : null, + "catalogDesc" : null, + "isInput" : false, + "setCurrentDatabaseIfMissing" : false + } ], + "opType" : "QUERY", + "queryDescs" : [ ], + "uriDescs" : [ ] } ] \ No newline at end of file diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala index a54b58c33e0..7a5be4cac74 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala @@ -234,6 +234,20 @@ class ResolvedIdentifierTableExtractor extends TableExtractor { } } +/** + * org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias + */ +class SubqueryAliasTableExtractor extends TableExtractor { + override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = { + v1.asInstanceOf[SubqueryAlias] match { + case SubqueryAlias(_, SubqueryAlias(identifier, _)) => + lookupExtractor[StringTableExtractor].apply(spark, identifier.toString()) + case SubqueryAlias(identifier, _) => + lookupExtractor[StringTableExtractor].apply(spark, identifier.toString()) + } + } +} + /** * org.apache.spark.sql.connector.catalog.Table */ diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala new file mode 100644 index 00000000000..e8cce67abcd --- /dev/null +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.spark.authz.gen + +import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._ +import org.apache.kyuubi.plugin.spark.authz.serde._ + +object DeltaCommands extends CommandSpecs[TableCommandSpec] { + + val DeleteCommand = { + val cmd = "org.apache.spark.sql.delta.commands.DeleteCommand" + val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE)) + val tableDesc = TableDesc( + "catalogTable", + classOf[CatalogTableOptionTableExtractor], + actionTypeDesc = Some(actionTypeDesc)) + TableCommandSpec(cmd, Seq(tableDesc)) + val targetDesc = TableDesc( + "target", + classOf[SubqueryAliasTableExtractor], + actionTypeDesc = Some(actionTypeDesc)) + TableCommandSpec(cmd, Seq(tableDesc, targetDesc)) + } + + val UpdateCommand = { + val cmd = "org.apache.spark.sql.delta.commands.UpdateCommand" + DeleteCommand.copy(classname = cmd) + } + + override def specs: Seq[TableCommandSpec] = Seq( + DeleteCommand, + UpdateCommand) +} diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala index 07a8e285241..5fb4ace10da 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala @@ -44,7 +44,7 @@ class JsonSpecFileGenerator extends AnyFunSuite { // scalastyle:on test("check spec json files") { writeCommandSpecJson("database", Seq(DatabaseCommands)) - writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands, HudiCommands)) + writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands, HudiCommands, DeltaCommands)) writeCommandSpecJson("function", Seq(FunctionCommands)) writeCommandSpecJson("scan", Seq(Scans)) } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala index 405c5512ded..4fc73adcee2 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala @@ -38,6 +38,18 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { val table1 = "table1_delta" val table2 = "table2_delta" + def createTableSql(namespace: String, table: String): String = + s""" + |CREATE TABLE IF NOT EXISTS $namespace.$table ( + | id INT, + | name STRING, + | gender STRING, + | birthDate TIMESTAMP + |) + |USING DELTA + |PARTITIONED BY (gender) + |""".stripMargin + override def withFixture(test: NoArgTest): Outcome = { test() } @@ -66,13 +78,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { s""" |CREATE TABLE IF NOT EXISTS $namespace1.$table1 ( | id INT, - | firstName STRING, - | middleName STRING, - | lastName STRING, + | name STRING, | gender STRING, - | birthDate TIMESTAMP, - | ssn STRING, - | salary INT + | birthDate TIMESTAMP |) USING DELTA |""".stripMargin interceptContains[AccessControlException] { @@ -80,21 +88,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { }(s"does not have [create] privilege on [$namespace1/$table1]") doAs(admin, sql(createNonPartitionTableSql)) - val createPartitionTableSql = - s""" - |CREATE TABLE IF NOT EXISTS $namespace1.$table2 ( - | id INT, - | firstName STRING, - | middleName STRING, - | lastName STRING, - | gender STRING, - | birthDate TIMESTAMP, - | ssn STRING, - | salary INT - |) - |USING DELTA - |PARTITIONED BY (gender) - |""".stripMargin + val createPartitionTableSql = createTableSql(namespace1, table2) interceptContains[AccessControlException] { doAs(someone, sql(createPartitionTableSql)) }(s"does not have [create] privilege on [$namespace1/$table2]") @@ -109,13 +103,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { s""" |CREATE OR REPLACE TABLE $namespace1.$table1 ( | id INT, - | firstName STRING, - | middleName STRING, - | lastName STRING, + | name STRING, | gender STRING, - | birthDate TIMESTAMP, - | ssn STRING, - | salary INT + | birthDate TIMESTAMP |) USING DELTA |""".stripMargin interceptContains[AccessControlException] { @@ -128,23 +118,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { test("alter table") { withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), (s"$namespace1", "database"))) { doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1")) - doAs( - admin, - sql( - s""" - |CREATE TABLE IF NOT EXISTS $namespace1.$table1 ( - | id INT, - | firstName STRING, - | middleName STRING, - | lastName STRING, - | gender STRING, - | birthDate TIMESTAMP, - | ssn STRING, - | salary INT - |) - |USING DELTA - |PARTITIONED BY (gender) - |""".stripMargin)) + doAs(admin, sql(createTableSql(namespace1, table1))) // add columns interceptContains[AccessControlException]( @@ -164,7 +138,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { doAs( someone, sql(s"ALTER TABLE $namespace1.$table1" + - s" REPLACE COLUMNS (id INT, firstName STRING)")))( + s" REPLACE COLUMNS (id INT, name STRING)")))( s"does not have [alter] privilege on [$namespace1/$table1]") // rename column @@ -189,6 +163,64 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { s"does not have [alter] privilege on [$namespace1/$table1]") } } + + test("delete from table") { + withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), (s"$namespace1", "database"))) { + doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1")) + doAs(admin, sql(createTableSql(namespace1, table1))) + val deleteFromTableSql = s"DELETE FROM $namespace1.$table1 WHERE birthDate < '1955-01-01'" + interceptContains[AccessControlException]( + doAs(someone, sql(deleteFromTableSql)))( + s"does not have [update] privilege on [$namespace1/$table1]") + doAs(admin, sql(deleteFromTableSql)) + } + } + + test("insert table") { + withSingleCallEnabled { + withCleanTmpResources(Seq( + (s"$namespace1.$table1", "table"), + (s"$namespace1.$table2", "table"), + (s"$namespace1", "database"))) { + doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1")) + doAs(admin, sql(createTableSql(namespace1, table1))) + doAs(admin, sql(createTableSql(namespace1, table2))) + + // insert into + val insertIntoSql = s"INSERT INTO $namespace1.$table1" + + s" SELECT * FROM $namespace1.$table2" + interceptContains[AccessControlException]( + doAs(someone, sql(insertIntoSql)))( + s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," + + s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," + + s" [update] privilege on [$namespace1/$table1]") + doAs(admin, sql(insertIntoSql)) + + // insert overwrite + val insertOverwriteSql = s"INSERT OVERWRITE $namespace1.$table1" + + s" SELECT * FROM $namespace1.$table2" + interceptContains[AccessControlException]( + doAs(someone, sql(insertOverwriteSql)))( + s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," + + s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," + + s" [update] privilege on [$namespace1/$table1]") + doAs(admin, sql(insertOverwriteSql)) + } + } + } + + test("update table") { + withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), (s"$namespace1", "database"))) { + doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1")) + doAs(admin, sql(createTableSql(namespace1, table1))) + val updateTableSql = s"UPDATE $namespace1.$table1" + + s" SET gender = 'Female' WHERE gender = 'F'" + interceptContains[AccessControlException]( + doAs(someone, sql(updateTableSql)))( + s"does not have [update] privilege on [$namespace1/$table1]") + doAs(admin, sql(updateTableSql)) + } + } } object DeltaCatalogRangerSparkExtensionSuite {