Skip to content

Commit

Permalink
fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
gatorsmile committed Jun 14, 2017
1 parent dccc0aa commit 719ecf2
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
properties = table.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) })
}

// Reorder table schema to put partition columns at the end. Before Spark 2.2, the partition
// columns are not put at the end of schema. We need to reorder it when reading the schema
// from the table properties.
private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): StructType = {
val partitionFields = partColumnNames.map { partCol =>
schema.find(_.name == partCol).getOrElse {
throw new AnalysisException("The metadata is corrupted. Unable to find the " +
s"partition column names from the schema. schema: ${schema.catalogString}. " +
s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}")
}
}
StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
}

private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
val hiveTable = table.copy(
provider = Some(DDLUtils.HIVE_PROVIDER),
Expand All @@ -738,10 +752,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// schema from table properties.
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
val schemaFromTableProps = getSchemaFromTableProperties(table)
if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)) {
hiveTable.copy(
schema = schemaFromTableProps,
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
schema = reorderedSchema,
partitionColumnNames = partColumnNames,
bucketSpec = getBucketSpecFromTableProperties(table))
} else {
// Hive metastore may change the table schema, e.g. schema inference. If the table
Expand Down Expand Up @@ -771,11 +788,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)

val schemaFromTableProps = getSchemaFromTableProperties(table)
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

table.copy(
provider = Some(provider),
storage = storageWithLocation,
schema = getSchemaFromTableProperties(table),
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
schema = reorderedSchema,
partitionColumnNames = partColumnNames,
bucketSpec = getBucketSpecFromTableProperties(table),
tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,30 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
assert(DDLUtils.isHiveTable(externalCatalog.getTable("db1", "hive_tbl")))
}

Seq("parquet", "hive").foreach { format =>
test(s"Partition columns should be put at the end of table schema for the format $format") {
val catalog = newBasicCatalog()
val newSchema = new StructType()
.add("col1", "int")
.add("col2", "string")
.add("partCol1", "int")
.add("partCol2", "string")
val table = CatalogTable(
identifier = TableIdentifier("tbl", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = new StructType()
.add("col1", "int")
.add("partCol1", "int")
.add("partCol2", "string")
.add("col2", "string"),
provider = Some(format),
partitionColumnNames = Seq("partCol1", "partCol2"))
catalog.createTable(table, ignoreIfExists = false)

val restoredTable = externalCatalog.getTable("db1", "tbl")
assert(restoredTable.schema == newSchema)
}
}
}

0 comments on commit 719ecf2

Please sign in to comment.