diff --git a/core/src/main/resources/error/delta-error-classes.json b/core/src/main/resources/error/delta-error-classes.json index d872e9c0b2..fc4471e27b 100644 --- a/core/src/main/resources/error/delta-error-classes.json +++ b/core/src/main/resources/error/delta-error-classes.json @@ -1320,6 +1320,17 @@ ], "sqlState" : "22000" }, + "DELTA_UNEXPECTED_PARTITION_SCHEMA_FROM_USER" : { + "message" : [ + "CONVERT TO DELTA was called with a partition schema different from the partition schema inferred from the catalog, please avoid providing the schema so that the partition schema can be chosen from the catalog.", + "", + "catalog partition schema:", + "", + "provided partition schema:", + "" + ], + "sqlState" : "42000" + }, "DELTA_UNKNOWN_CONFIGURATION" : { "message" : [ "Unknown configuration was specified: ", diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 9cc0a990bb..216b9df8be 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -1028,6 +1028,15 @@ trait DeltaErrorsBase ) } + def unexpectedPartitionSchemaFromUserException( + catalogPartitionSchema: StructType, userPartitionSchema: StructType): Throwable = { + new DeltaAnalysisException( + errorClass = "DELTA_UNEXPECTED_PARTITION_SCHEMA_FROM_USER", + messageParameters = Array( + formatSchema(catalogPartitionSchema), formatSchema(userPartitionSchema)) + ) + } + def multipleSourceRowMatchingTargetRowInMergeException(spark: SparkSession): Throwable = { new DeltaUnsupportedOperationException( errorClass = "DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE", diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala index 2f809c00a2..6c0dfafacf 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala @@ -293,9 +293,9 @@ abstract class ConvertToDeltaCommandBase( target.provider match { case Some(providerName) => providerName.toLowerCase(Locale.ROOT) match { case _ if target.catalogTable.exists(isHiveStyleParquetTable) => - new ParquetTable(spark, qualifiedDir, partitionSchema) + new ParquetTable(spark, qualifiedDir, target.catalogTable, partitionSchema) case checkProvider if checkProvider.equalsIgnoreCase("parquet") => - new ParquetTable(spark, qualifiedDir, partitionSchema) + new ParquetTable(spark, qualifiedDir, target.catalogTable, partitionSchema) case checkProvider => throw DeltaErrors.convertNonParquetTablesException(tableIdentifier, checkProvider) } @@ -330,10 +330,7 @@ abstract class ConvertToDeltaCommandBase( throw DeltaErrors.emptyDirectoryException(convertProperties.targetDir) } - val partitionFields = partitionSchema - .orElse(targetTable.partitionSchema) - .getOrElse(new StructType()) - + val partitionFields = targetTable.partitionSchema val schema = targetTable.tableSchema val metadata = Metadata( schemaString = schema.json, @@ -436,8 +433,8 @@ trait ConvertTargetTable { /** The table properties of the target table */ def properties: Map[String, String] = Map.empty - /** The partition schema of the target table, if known */ - def partitionSchema: Option[StructType] = None + /** The partition schema of the target table */ + def partitionSchema: StructType /** The file manifest of the target table */ def fileManifest: ConvertTargetFileManifest @@ -453,11 +450,18 @@ trait ConvertTargetTable { class ParquetTable( spark: SparkSession, basePath: String, - override val partitionSchema: Option[StructType]) extends ConvertTargetTable with DeltaLogging { + catalogTable: Option[CatalogTable], + userPartitionSchema: Option[StructType]) extends ConvertTargetTable with DeltaLogging { + // Validate user provided partition schema if catalogTable is available. + if (catalogTable.isDefined && userPartitionSchema.isDefined + && !catalogTable.get.partitionSchema.equals(userPartitionSchema.get)) { + throw DeltaErrors.unexpectedPartitionSchemaFromUserException( + catalogTable.get.partitionSchema, userPartitionSchema.get) + } private var _numFiles: Option[Long] = None - private var _tableSchema: Option[StructType] = None + private var _tableSchema: Option[StructType] = catalogTable.map(_.schema) protected lazy val serializableConf = { // scalastyle:off deltahadoopconfiguration @@ -465,6 +469,10 @@ class ParquetTable( // scalastyle:on deltahadoopconfiguration } + override val partitionSchema: StructType = { + userPartitionSchema.orElse(catalogTable.map(_.partitionSchema)).getOrElse(new StructType()) + } + def numFiles: Long = { if (_numFiles.isEmpty) { inferSchema() @@ -598,7 +606,7 @@ class ParquetTable( } } - val partitionFields = partitionSchema.map(_.fields.toSeq).getOrElse(Nil) + val partitionFields = partitionSchema.fields.toSeq _numFiles = Some(numFiles) _tableSchema = Some(PartitioningUtils.mergeDataAndPartitionSchema( diff --git a/core/src/test/scala/org/apache/spark/sql/delta/ConvertToDeltaSuiteBase.scala b/core/src/test/scala/org/apache/spark/sql/delta/ConvertToDeltaSuiteBase.scala index f9f6306d58..7f0d48bdfd 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/ConvertToDeltaSuiteBase.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/ConvertToDeltaSuiteBase.scala @@ -777,6 +777,31 @@ trait ConvertToDeltaHiveTableTests extends ConvertToDeltaTestUtils with SQLTestU } } + testQuietly("negative case: unmatched partition schema") { + val tableName = "pqtable" + withTable(tableName) { + // Create a partitioned parquet table + simpleDF.write.partitionBy("key1", "key2").format("parquet").saveAsTable(tableName) + + // Check the partition schema in the catalog, key1's data type is original Long. + assert(spark.sessionState.catalog.getTableMetadata( + TableIdentifier(tableName, Some("default"))).partitionSchema + .equals( + (new StructType) + .add(StructField("key1", LongType, true)) + .add(StructField("key2", StringType, true)) + )) + + // Convert to delta with partition schema mismatch on key1's data type, which is String. + val ae = intercept[AnalysisException] { + convertToDelta(tableName, Some("key1 string, key2 string")) + } + + assert(ae.getMessage.contains("CONVERT TO DELTA was called with a partition schema " + + "different from the partition schema inferred from the catalog")) + } + } + testQuietly("convert two external tables pointing to same underlying files " + "with differing table properties should error if conf enabled otherwise merge properties") { val externalTblName = "extpqtbl" @@ -894,12 +919,12 @@ trait ConvertToDeltaHiveTableTests extends ConvertToDeltaTestUtils with SQLTestU // Verify that table converted to delta checkAnswer( - sql(s"select id from delta.`$path` where key1 = 1"), - simpleDF.filter("id % 2 == 1").select("id")) + sql(s"select key2 from delta.`$path` where key1 = 1"), + simpleDF.filter("id % 2 == 1").select("key2")) checkAnswer( - sql(s"select id from $externalTblName where key1 = 1"), - simpleDF.filter("id % 2 == 1").select("id")) + sql(s"select key2 from $externalTblName where key1 = 1"), + simpleDF.filter("id % 2 == 1").select("key2")) } } @@ -1035,6 +1060,34 @@ trait ConvertToDeltaHiveTableTests extends ConvertToDeltaTestUtils with SQLTestU } } + testQuietly("Convert a partitioned parquet table with partition schema autofill") { + val tableName = "ppqtable" + withTable(tableName) { + // Create a partitioned parquet table + simpleDF.write.partitionBy("key1", "key2").format("parquet").saveAsTable(tableName) + + // Convert to delta without partition schema, partition schema is autofill from catalog + convertToDelta(tableName) + + // Verify that table is converted to delta + assert(spark.sessionState.catalog.getTableMetadata( + TableIdentifier(tableName, Some("default"))).provider.contains("delta")) + + // Check the partition schema in the transaction log + assert(DeltaLog.forTable(spark, TableIdentifier(tableName, Some("default"))) + .snapshot.metadata.partitionSchema.equals( + (new StructType()) + .add(StructField("key1", LongType, true)) + .add(StructField("key2", StringType, true)) + )) + + // Check data in the converted delta table. + checkAnswer( + sql(s"SELECT id from default.$tableName where key2 = '2'"), + simpleDF.filter("id % 3 == 2").select("id")) + } + } + test("external tables use correct path scheme") { withTempDir { dir => withTable("externalTable") { diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index e274020b6d..af45bf30a5 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -1991,6 +1991,22 @@ trait DeltaErrorsSuiteBase assert(e.getMessage == "Using column c0 of type IntegerType as a partition column is not supported.") } + { + val catalogPartitionSchema = StructType(Seq(StructField("a", IntegerType))) + val userPartitionSchema = StructType(Seq(StructField("b", StringType))) + val e = intercept[DeltaAnalysisException] { + throw DeltaErrors.unexpectedPartitionSchemaFromUserException(catalogPartitionSchema, + userPartitionSchema) + } + assert(e.getErrorClass == "DELTA_UNEXPECTED_PARTITION_SCHEMA_FROM_USER") + assert(e.getSqlState == "42000") + assert(e.getMessage == + "CONVERT TO DELTA was called with a partition schema different from the partition " + + "schema inferred from the catalog, please avoid providing the schema so that the " + + "partition schema can be chosen from the catalog.\n" + + s"\ncatalog partition schema:\n${catalogPartitionSchema.treeString}" + + s"\nprovided partition schema:\n${userPartitionSchema.treeString}") + } { val e = intercept[DeltaIllegalArgumentException] { throw DeltaErrors.invalidInterval("interval1")