Skip to content

Commit

Permalink
[SPARK-21085][SQL] Failed to read the partitioned table created by Sp…
Browse files Browse the repository at this point in the history
…ark 2.1

### What changes were proposed in this pull request?
Before the PR, Spark is unable to read the partitioned table created by Spark 2.1 when the table schema does not put the partitioning column at the end of the schema.
[assert(partitionFields.map(_.name) == partitionColumnNames)](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L234-L236)

When reading the table metadata from the metastore, we also need to reorder the columns.

### How was this patch tested?
Added test cases to check both Hive-serde and data source tables.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #18295 from gatorsmile/reorderReadSchema.

(cherry picked from commit 0c88e8d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
gatorsmile authored and cloud-fan committed Jun 14, 2017
1 parent 42cc830 commit 9bdc835
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
properties = table.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) })
}

// Reorder table schema to put partition columns at the end. Before Spark 2.2, the partition
// columns are not put at the end of schema. We need to reorder it when reading the schema
// from the table properties.
private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): StructType = {
val partitionFields = partColumnNames.map { partCol =>
schema.find(_.name == partCol).getOrElse {
throw new AnalysisException("The metadata is corrupted. Unable to find the " +
s"partition column names from the schema. schema: ${schema.catalogString}. " +
s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}")
}
}
StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
}

private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
val hiveTable = table.copy(
provider = Some(DDLUtils.HIVE_PROVIDER),
Expand All @@ -726,10 +740,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// schema from table properties.
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
val schemaFromTableProps = getSchemaFromTableProperties(table)
if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)) {
hiveTable.copy(
schema = schemaFromTableProps,
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
schema = reorderedSchema,
partitionColumnNames = partColumnNames,
bucketSpec = getBucketSpecFromTableProperties(table))
} else {
// Hive metastore may change the table schema, e.g. schema inference. If the table
Expand Down Expand Up @@ -759,11 +776,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)

val schemaFromTableProps = getSchemaFromTableProperties(table)
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

table.copy(
provider = Some(provider),
storage = storageWithLocation,
schema = getSchemaFromTableProperties(table),
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
schema = reorderedSchema,
partitionColumnNames = partColumnNames,
bucketSpec = getBucketSpecFromTableProperties(table),
tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,30 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
assert(DDLUtils.isHiveTable(externalCatalog.getTable("db1", "hive_tbl")))
}

Seq("parquet", "hive").foreach { format =>
test(s"Partition columns should be put at the end of table schema for the format $format") {
val catalog = newBasicCatalog()
val newSchema = new StructType()
.add("col1", "int")
.add("col2", "string")
.add("partCol1", "int")
.add("partCol2", "string")
val table = CatalogTable(
identifier = TableIdentifier("tbl", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = new StructType()
.add("col1", "int")
.add("partCol1", "int")
.add("partCol2", "string")
.add("col2", "string"),
provider = Some(format),
partitionColumnNames = Seq("partCol1", "partCol2"))
catalog.createTable(table, ignoreIfExists = false)

val restoredTable = externalCatalog.getTable("db1", "tbl")
assert(restoredTable.schema == newSchema)
}
}
}

0 comments on commit 9bdc835

Please sign in to comment.