Skip to content

Commit

Permalink
[SPARK-19611][SQL][FOLLOWUP] set dataSchema correctly in HiveMetastor…
Browse files Browse the repository at this point in the history
…eCatalog.convertToLogicalRelation

## What changes were proposed in this pull request?

We made a mistake in #16944 . In `HiveMetastoreCatalog#inferIfNeeded` we infer the data schema, merge with full schema, and return the new full schema. At caller side we treat the full schema as data schema and set it to `HadoopFsRelation`.

This doesn't cause any problem because both parquet and orc can work with a wrong data schema that has extra columns, but it's better to fix this mistake.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19615 from cloud-fan/infer.
  • Loading branch information
cloud-fan committed Oct 31, 2017
1 parent 59589bc commit 4d9ebf3
Showing 1 changed file with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}

val (dataSchema, updatedTable) =
inferIfNeeded(relation, options, fileFormat, Option(fileIndex))
val updatedTable = inferIfNeeded(relation, options, fileFormat, Option(fileIndex))

val fsRelation = HadoopFsRelation(
location = fileIndex,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
dataSchema = updatedTable.dataSchema,
bucketSpec = None,
fileFormat = fileFormat,
options = options)(sparkSession = sparkSession)
Expand All @@ -191,13 +190,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
fileFormatClass,
None)
val logicalRelation = cached.getOrElse {
val (dataSchema, updatedTable) = inferIfNeeded(relation, options, fileFormat)
val updatedTable = inferIfNeeded(relation, options, fileFormat)
val created =
LogicalRelation(
DataSource(
sparkSession = sparkSession,
paths = rootPath.toString :: Nil,
userSpecifiedSchema = Option(dataSchema),
userSpecifiedSchema = Option(updatedTable.dataSchema),
bucketSpec = None,
options = options,
className = fileType).resolveRelation(),
Expand All @@ -224,7 +223,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
relation: HiveTableRelation,
options: Map[String, String],
fileFormat: FileFormat,
fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = {
fileIndexOpt: Option[FileIndex] = None): CatalogTable = {
val inferenceMode = sparkSession.sessionState.conf.caseSensitiveInferenceMode
val shouldInfer = (inferenceMode != NEVER_INFER) && !relation.tableMeta.schemaPreservesCase
val tableName = relation.tableMeta.identifier.unquotedString
Expand All @@ -241,21 +240,22 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
sparkSession,
options,
fileIndex.listFiles(Nil, Nil).flatMap(_.files))
.map(mergeWithMetastoreSchema(relation.tableMeta.schema, _))
.map(mergeWithMetastoreSchema(relation.tableMeta.dataSchema, _))

inferredSchema match {
case Some(schema) =>
case Some(dataSchema) =>
val schema = StructType(dataSchema ++ relation.tableMeta.partitionSchema)
if (inferenceMode == INFER_AND_SAVE) {
updateCatalogSchema(relation.tableMeta.identifier, schema)
}
(schema, relation.tableMeta.copy(schema = schema))
relation.tableMeta.copy(schema = schema)
case None =>
logWarning(s"Unable to infer schema for table $tableName from file format " +
s"$fileFormat (inference mode: $inferenceMode). Using metastore schema.")
(relation.tableMeta.schema, relation.tableMeta)
relation.tableMeta
}
} else {
(relation.tableMeta.schema, relation.tableMeta)
relation.tableMeta
}
}

Expand Down

0 comments on commit 4d9ebf3

Please sign in to comment.