Skip to content

Commit

Permalink
Support partition schema autofill for 'Convert To Delta' on catalog …
Browse files Browse the repository at this point in the history
…tables

Currently 'Convert To Delta' fails on partitioned Parquet tables if partition schema is not provided, this PR enables 'Convert To delta' to autofill the partition schema from the table's catalog if available.

Unit test is added in this PR.

GitOrigin-RevId: 7b4bb330fdfb98e560d1b440898a621c2df8d97b
  • Loading branch information
mingdai-db authored and scottsand-db committed Jul 25, 2022
1 parent d3eadc6 commit 18d4d12
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 15 deletions.
11 changes: 11 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,17 @@
],
"sqlState" : "22000"
},
"DELTA_UNEXPECTED_PARTITION_SCHEMA_FROM_USER" : {
"message" : [
"CONVERT TO DELTA was called with a partition schema different from the partition schema inferred from the catalog, please avoid providing the schema so that the partition schema can be chosen from the catalog.",
"",
"catalog partition schema:",
"<catalogPartitionSchema>",
"provided partition schema:",
"<userPartitionSchema>"
],
"sqlState" : "42000"
},
"DELTA_UNKNOWN_CONFIGURATION" : {
"message" : [
"Unknown configuration was specified: <config>",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,15 @@ trait DeltaErrorsBase
)
}

def unexpectedPartitionSchemaFromUserException(
catalogPartitionSchema: StructType, userPartitionSchema: StructType): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_UNEXPECTED_PARTITION_SCHEMA_FROM_USER",
messageParameters = Array(
formatSchema(catalogPartitionSchema), formatSchema(userPartitionSchema))
)
}

def multipleSourceRowMatchingTargetRowInMergeException(spark: SparkSession): Throwable = {
new DeltaUnsupportedOperationException(
errorClass = "DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,9 @@ abstract class ConvertToDeltaCommandBase(
target.provider match {
case Some(providerName) => providerName.toLowerCase(Locale.ROOT) match {
case _ if target.catalogTable.exists(isHiveStyleParquetTable) =>
new ParquetTable(spark, qualifiedDir, partitionSchema)
new ParquetTable(spark, qualifiedDir, target.catalogTable, partitionSchema)
case checkProvider if checkProvider.equalsIgnoreCase("parquet") =>
new ParquetTable(spark, qualifiedDir, partitionSchema)
new ParquetTable(spark, qualifiedDir, target.catalogTable, partitionSchema)
case checkProvider =>
throw DeltaErrors.convertNonParquetTablesException(tableIdentifier, checkProvider)
}
Expand Down Expand Up @@ -330,10 +330,7 @@ abstract class ConvertToDeltaCommandBase(
throw DeltaErrors.emptyDirectoryException(convertProperties.targetDir)
}

val partitionFields = partitionSchema
.orElse(targetTable.partitionSchema)
.getOrElse(new StructType())

val partitionFields = targetTable.partitionSchema
val schema = targetTable.tableSchema
val metadata = Metadata(
schemaString = schema.json,
Expand Down Expand Up @@ -436,8 +433,8 @@ trait ConvertTargetTable {
/** The table properties of the target table */
def properties: Map[String, String] = Map.empty

/** The partition schema of the target table, if known */
def partitionSchema: Option[StructType] = None
/** The partition schema of the target table */
def partitionSchema: StructType

/** The file manifest of the target table */
def fileManifest: ConvertTargetFileManifest
Expand All @@ -453,18 +450,29 @@ trait ConvertTargetTable {
class ParquetTable(
spark: SparkSession,
basePath: String,
override val partitionSchema: Option[StructType]) extends ConvertTargetTable with DeltaLogging {
catalogTable: Option[CatalogTable],
userPartitionSchema: Option[StructType]) extends ConvertTargetTable with DeltaLogging {
// Validate user provided partition schema if catalogTable is available.
if (catalogTable.isDefined && userPartitionSchema.isDefined
&& !catalogTable.get.partitionSchema.equals(userPartitionSchema.get)) {
throw DeltaErrors.unexpectedPartitionSchemaFromUserException(
catalogTable.get.partitionSchema, userPartitionSchema.get)
}

private var _numFiles: Option[Long] = None

private var _tableSchema: Option[StructType] = None
private var _tableSchema: Option[StructType] = catalogTable.map(_.schema)

protected lazy val serializableConf = {
// scalastyle:off deltahadoopconfiguration
new SerializableConfiguration(spark.sessionState.newHadoopConf())
// scalastyle:on deltahadoopconfiguration
}

override val partitionSchema: StructType = {
userPartitionSchema.orElse(catalogTable.map(_.partitionSchema)).getOrElse(new StructType())
}

def numFiles: Long = {
if (_numFiles.isEmpty) {
inferSchema()
Expand Down Expand Up @@ -598,7 +606,7 @@ class ParquetTable(
}
}

val partitionFields = partitionSchema.map(_.fields.toSeq).getOrElse(Nil)
val partitionFields = partitionSchema.fields.toSeq

_numFiles = Some(numFiles)
_tableSchema = Some(PartitioningUtils.mergeDataAndPartitionSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,31 @@ trait ConvertToDeltaHiveTableTests extends ConvertToDeltaTestUtils with SQLTestU
}
}

testQuietly("negative case: unmatched partition schema") {
val tableName = "pqtable"
withTable(tableName) {
// Create a partitioned parquet table
simpleDF.write.partitionBy("key1", "key2").format("parquet").saveAsTable(tableName)

// Check the partition schema in the catalog, key1's data type is original Long.
assert(spark.sessionState.catalog.getTableMetadata(
TableIdentifier(tableName, Some("default"))).partitionSchema
.equals(
(new StructType)
.add(StructField("key1", LongType, true))
.add(StructField("key2", StringType, true))
))

// Convert to delta with partition schema mismatch on key1's data type, which is String.
val ae = intercept[AnalysisException] {
convertToDelta(tableName, Some("key1 string, key2 string"))
}

assert(ae.getMessage.contains("CONVERT TO DELTA was called with a partition schema " +
"different from the partition schema inferred from the catalog"))
}
}

testQuietly("convert two external tables pointing to same underlying files " +
"with differing table properties should error if conf enabled otherwise merge properties") {
val externalTblName = "extpqtbl"
Expand Down Expand Up @@ -894,12 +919,12 @@ trait ConvertToDeltaHiveTableTests extends ConvertToDeltaTestUtils with SQLTestU

// Verify that table converted to delta
checkAnswer(
sql(s"select id from delta.`$path` where key1 = 1"),
simpleDF.filter("id % 2 == 1").select("id"))
sql(s"select key2 from delta.`$path` where key1 = 1"),
simpleDF.filter("id % 2 == 1").select("key2"))

checkAnswer(
sql(s"select id from $externalTblName where key1 = 1"),
simpleDF.filter("id % 2 == 1").select("id"))
sql(s"select key2 from $externalTblName where key1 = 1"),
simpleDF.filter("id % 2 == 1").select("key2"))
}
}

Expand Down Expand Up @@ -1035,6 +1060,34 @@ trait ConvertToDeltaHiveTableTests extends ConvertToDeltaTestUtils with SQLTestU
}
}

testQuietly("Convert a partitioned parquet table with partition schema autofill") {
val tableName = "ppqtable"
withTable(tableName) {
// Create a partitioned parquet table
simpleDF.write.partitionBy("key1", "key2").format("parquet").saveAsTable(tableName)

// Convert to delta without partition schema, partition schema is autofill from catalog
convertToDelta(tableName)

// Verify that table is converted to delta
assert(spark.sessionState.catalog.getTableMetadata(
TableIdentifier(tableName, Some("default"))).provider.contains("delta"))

// Check the partition schema in the transaction log
assert(DeltaLog.forTable(spark, TableIdentifier(tableName, Some("default")))
.snapshot.metadata.partitionSchema.equals(
(new StructType())
.add(StructField("key1", LongType, true))
.add(StructField("key2", StringType, true))
))

// Check data in the converted delta table.
checkAnswer(
sql(s"SELECT id from default.$tableName where key2 = '2'"),
simpleDF.filter("id % 3 == 2").select("id"))
}
}

test("external tables use correct path scheme") {
withTempDir { dir =>
withTable("externalTable") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1991,6 +1991,22 @@ trait DeltaErrorsSuiteBase
assert(e.getMessage ==
"Using column c0 of type IntegerType as a partition column is not supported.")
}
{
val catalogPartitionSchema = StructType(Seq(StructField("a", IntegerType)))
val userPartitionSchema = StructType(Seq(StructField("b", StringType)))
val e = intercept[DeltaAnalysisException] {
throw DeltaErrors.unexpectedPartitionSchemaFromUserException(catalogPartitionSchema,
userPartitionSchema)
}
assert(e.getErrorClass == "DELTA_UNEXPECTED_PARTITION_SCHEMA_FROM_USER")
assert(e.getSqlState == "42000")
assert(e.getMessage ==
"CONVERT TO DELTA was called with a partition schema different from the partition " +
"schema inferred from the catalog, please avoid providing the schema so that the " +
"partition schema can be chosen from the catalog.\n" +
s"\ncatalog partition schema:\n${catalogPartitionSchema.treeString}" +
s"\nprovided partition schema:\n${userPartitionSchema.treeString}")
}
{
val e = intercept[DeltaIllegalArgumentException] {
throw DeltaErrors.invalidInterval("interval1")
Expand Down

0 comments on commit 18d4d12

Please sign in to comment.