Skip to content
Permalink
Browse files
[HUDI-4100] CTAS failed to clean up when given an illegal MANAGED tab…
…le definition (#5588)
  • Loading branch information
jinxing64 committed May 21, 2022
1 parent 8ec625d commit 922f765ead3c471aa0d9aa44b1782d381bbfa9d9
Showing 3 changed files with 66 additions and 31 deletions.
@@ -763,4 +763,22 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
assertResult(true)(shown.contains("COMMENT 'This is a simple hudi table'"))
}
}

test("Test CTAS using an illegal definition -- a COW table with compaction enabled.") {
val tableName = generateTableName
checkExceptionContain(
s"""
| create table $tableName using hudi
| tblproperties(
| primaryKey = 'id',
| type = 'cow',
| hoodie.compact.inline='true'
| )
| AS
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts
|""".stripMargin)("Compaction is not supported on a CopyOnWrite table")
val dbPath = spark.sessionState.catalog.getDatabaseMetadata("default").locationUri.getPath
val tablePath = s"${dbPath}/${tableName}"
assertResult(false)(existsPath(tablePath))
}
}
@@ -38,6 +38,7 @@ import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession, _}

import java.net.URI
import java.util
import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter}

@@ -50,7 +51,9 @@ class HoodieCatalog extends DelegatingCatalogExtension

override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_CREATE)
val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties)
HoodieStagedTable(ident, locUriAndTableType, this, schema, partitions,
properties, TableCreationMode.STAGE_CREATE)
} else {
BasicStagedTable(
ident,
@@ -61,7 +64,9 @@ class HoodieCatalog extends DelegatingCatalogExtension

override def stageReplace(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_REPLACE)
val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties)
HoodieStagedTable(ident, locUriAndTableType, this, schema, partitions,
properties, TableCreationMode.STAGE_REPLACE)
} else {
super.dropTable(ident)
BasicStagedTable(
@@ -76,8 +81,9 @@ class HoodieCatalog extends DelegatingCatalogExtension
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(
ident, this, schema, partitions, properties, TableCreationMode.CREATE_OR_REPLACE)
val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties)
HoodieStagedTable(ident, locUriAndTableType, this, schema, partitions,
properties, TableCreationMode.CREATE_OR_REPLACE)
} else {
try super.dropTable(ident) catch {
case _: NoSuchTableException => // ignore the exception
@@ -112,7 +118,9 @@ class HoodieCatalog extends DelegatingCatalogExtension
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
createHoodieTable(ident, schema, partitions, properties, Map.empty, Option.empty, TableCreationMode.CREATE)
val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties)
createHoodieTable(ident, schema, locUriAndTableType, partitions, properties,
Map.empty, Option.empty, TableCreationMode.CREATE)
}

override def tableExists(ident: Identifier): Boolean = super.tableExists(ident)
@@ -193,8 +201,30 @@ class HoodieCatalog extends DelegatingCatalogExtension
loadTable(ident)
}

private def deduceTableLocationURIAndTableType(
ident: Identifier, properties: util.Map[String, String]): (URI, CatalogTableType) = {
val locOpt = if (isPathIdentifier(ident)) {
Option(ident.name())
} else {
Option(properties.get("location"))
}
val tableType = if (locOpt.nonEmpty) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}
val locUriOpt = locOpt.map(CatalogUtils.stringToURI)
val tableIdent = ident.asTableIdentifier
val existingTableOpt = getExistingTableIfExists(tableIdent)
val locURI = locUriOpt
.orElse(existingTableOpt.flatMap(_.storage.locationUri))
.getOrElse(spark.sessionState.catalog.defaultTablePath(tableIdent))
(locURI, tableType)
}

def createHoodieTable(ident: Identifier,
schema: StructType,
locUriAndTableType: (URI, CatalogTableType),
partitions: Array[Transform],
allTableProperties: util.Map[String, String],
writeOptions: Map[String, String],
@@ -206,29 +236,17 @@ class HoodieCatalog extends DelegatingCatalogExtension
val newPartitionColumns = partitionColumns
val newBucketSpec = maybeBucketSpec

val isByPath = isPathIdentifier(ident)

val location = if (isByPath) Option(ident.name()) else Option(allTableProperties.get("location"))
val id = ident.asTableIdentifier

val locUriOpt = location.map(CatalogUtils.stringToURI)
val existingTableOpt = getExistingTableIfExists(id)
val loc = locUriOpt
.orElse(existingTableOpt.flatMap(_.storage.locationUri))
.getOrElse(spark.sessionState.catalog.defaultTablePath(id))
val storage = DataSource.buildStorageFormatFromOptions(writeOptions.--(needFilterProps))
.copy(locationUri = Option(loc))
val tableType =
if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
.copy(locationUri = Option(locUriAndTableType._1))
val commentOpt = Option(allTableProperties.get("comment"))

val tablePropertiesNew = new util.HashMap[String, String](allTableProperties)
// put path to table properties.
tablePropertiesNew.put("path", loc.getPath)
tablePropertiesNew.put("path", locUriAndTableType._1.getPath)

val tableDesc = new CatalogTable(
identifier = id,
tableType = tableType,
identifier = ident.asTableIdentifier,
tableType = locUriAndTableType._2,
storage = storage,
schema = newSchema,
provider = Option("hudi"),
@@ -21,16 +21,18 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions.RECORDKEY_FIELD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, SupportsWrite, TableCapability}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, V1Write, WriteBuilder}
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.types.StructType

import java.net.URI
import java.util
import scala.collection.JavaConverters.{mapAsScalaMapConverter, setAsJavaSetConverter}

case class HoodieStagedTable(ident: Identifier,
locUriAndTableType: (URI, CatalogTableType),
catalog: HoodieCatalog,
override val schema: StructType,
partitions: Array[Transform],
@@ -59,13 +61,14 @@ case class HoodieStagedTable(ident: Identifier,
props.putAll(properties)
props.put("hoodie.table.name", ident.name())
props.put(RECORDKEY_FIELD.key, properties.get("primaryKey"))
catalog.createHoodieTable(ident, schema, partitions, props, writeOptions, sourceQuery, mode)
catalog.createHoodieTable(
ident, schema, locUriAndTableType, partitions, props, writeOptions, sourceQuery, mode)
}

override def name(): String = ident.name()

override def abortStagedChanges(): Unit = {
clearTablePath(properties.get("location"), catalog.spark.sparkContext.hadoopConfiguration)
clearTablePath(locUriAndTableType._1.getPath, catalog.spark.sparkContext.hadoopConfiguration)
}

private def clearTablePath(tablePath: String, conf: Configuration): Unit = {
@@ -85,13 +88,9 @@ case class HoodieStagedTable(ident: Identifier,
* WriteBuilder for creating a Hoodie table.
*/
private class HoodieV1WriteBuilder extends WriteBuilder {
override def build(): V1Write = new V1Write {
override def toInsertableRelation(): InsertableRelation = {
new InsertableRelation {
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
sourceQuery = Option(data)
}
}
override def build(): V1Write = () => {
(data: DataFrame, overwrite: Boolean) => {
sourceQuery = Option(data)
}
}
}

0 comments on commit 922f765

Please sign in to comment.