From 1678247d3b83688ad92590aa2626d153475c0509 Mon Sep 17 00:00:00 2001 From: jinxing64 Date: Sun, 15 May 2022 11:30:03 +0800 Subject: [PATCH] Set hoodie.query.as.ro.table in serde properties --- .../hudi/command/DropHoodieTableCommand.scala | 6 +-- .../apache/spark/sql/hudi/TestDropTable.scala | 42 +++++++++++++------ 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala index ecc8588e64dd..68582fc2795d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala @@ -95,10 +95,10 @@ extends HoodieLeafRunnableCommand { var rtTableOpt: Option[CatalogTable] = None var roTableOpt: Option[CatalogTable] = None - if (catalog.tableExists(roIdt)) { + if (catalog.tableExists(rtIdt)) { val rtTable = catalog.getTableMetadata(rtIdt) if (rtTable.storage.locationUri.equals(hoodieTable.table.storage.locationUri)) { - rtTable.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match { + rtTable.storage.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match { case Some(v) if v.equalsIgnoreCase("false") => rtTableOpt = Some(rtTable) case _ => // do-nothing } @@ -107,7 +107,7 @@ extends HoodieLeafRunnableCommand { if (catalog.tableExists(roIdt)) { val roTable = catalog.getTableMetadata(roIdt) if (roTable.storage.locationUri.equals(hoodieTable.table.storage.locationUri)) { - roTable.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match { + roTable.storage.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match { case Some(v) if v.equalsIgnoreCase("true") => roTableOpt = Some(roTable) case _ => // do-nothing } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala index 4198a93e6e9b..174835cbac0b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.hudi +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.SessionCatalog + class TestDropTable extends HoodieSparkSqlTestBase { test("Test Drop Table") { @@ -98,10 +101,11 @@ class TestDropTable extends HoodieSparkSqlTestBase { | tblproperties ( | type = 'mor', | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.query.as.ro.table='true' + | preCombineField = 'ts' | ) """.stripMargin) + alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"), + Map("hoodie.query.as.ro.table" -> "true")) spark.sql( s""" @@ -110,10 +114,11 @@ class TestDropTable extends HoodieSparkSqlTestBase { | tblproperties ( | type = 'mor', | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.query.as.ro.table='false' + | preCombineField = 'ts' | ) """.stripMargin) + alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"), + Map("hoodie.query.as.ro.table" -> "false")) spark.sql(s"drop table ${tableName} purge") checkAnswer("show tables")() @@ -145,10 +150,11 @@ class TestDropTable extends HoodieSparkSqlTestBase { | tblproperties ( | type = 'mor', | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.query.as.ro.table='true' + | preCombineField = 'ts' | ) """.stripMargin) + alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"), + Map("hoodie.query.as.ro.table" -> "true")) spark.sql( s""" @@ -157,10 +163,11 @@ class TestDropTable extends HoodieSparkSqlTestBase { | tblproperties ( | type = 'mor', | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.query.as.ro.table='false' + | preCombineField = 'ts' | ) """.stripMargin) + alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"), + Map("hoodie.query.as.ro.table" -> "false")) spark.sql(s"drop table ${tableName}_ro") checkAnswer("show tables")( @@ -199,10 +206,11 @@ class TestDropTable extends HoodieSparkSqlTestBase { | tblproperties ( | type = 'mor', | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.query.as.ro.table='true' + | preCombineField = 'ts' | ) """.stripMargin) + alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"), + Map("hoodie.query.as.ro.table" -> "true")) spark.sql( s""" @@ -211,15 +219,23 @@ class TestDropTable extends HoodieSparkSqlTestBase { | tblproperties ( | type = 'mor', | primaryKey = 'id', - | preCombineField = 'ts', - | hoodie.query.as.ro.table='false' + | preCombineField = 'ts' | ) """.stripMargin) + alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"), + Map("hoodie.query.as.ro.table" -> "false")) spark.sql(s"drop table ${tableName}_ro purge") checkAnswer("show tables")() } } - + private def alterSerdeProperties(sessionCatalog: SessionCatalog, tableIdt: TableIdentifier, + newProperties: Map[String, String]): Unit = { + val catalogTable = spark.sessionState.catalog.getTableMetadata(tableIdt) + val storage = catalogTable.storage + val storageProperties = storage.properties ++ newProperties + val newCatalogTable = catalogTable.copy(storage = storage.copy(properties = storageProperties)) + sessionCatalog.alterTable(newCatalogTable) + } }