Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7763] [SPARK-7616] [SQL] Persists partition columns into metastore #6285

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ private[sql] class DefaultSource extends HadoopFsRelationProvider {
schema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation = {
val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext)
new ParquetRelation2(paths, schema, None, partitionColumns, parameters)(sqlContext)
}
}

Expand Down Expand Up @@ -118,12 +117,28 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
private[sql] class ParquetRelation2(
override val paths: Array[String],
private val maybeDataSchema: Option[StructType],
// This is for metastore conversion.
private val maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec)
with Logging {

private[sql] def this(
paths: Array[String],
maybeDataSchema: Option[StructType],
maybePartitionSpec: Option[PartitionSpec],
parameters: Map[String, String])(
sqlContext: SQLContext) = {
this(
paths,
maybeDataSchema,
maybePartitionSpec,
maybePartitionSpec.map(_.partitionColumns),
parameters)(sqlContext)
}

// Should we merge schemas from all Parquet part-files?
private val shouldMergeSchemas =
parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
Expand Down Expand Up @@ -161,15 +176,15 @@ private[sql] class ParquetRelation2(
Boolean.box(shouldMergeSchemas),
paths.toSet,
maybeDataSchema,
maybePartitionSpec)
partitionColumns)
} else {
Objects.hashCode(
Boolean.box(shouldMergeSchemas),
paths.toSet,
dataSchema,
schema,
maybeDataSchema,
maybePartitionSpec)
partitionColumns)
}
}

Expand All @@ -185,9 +200,6 @@ private[sql] class ParquetRelation2(

override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum

override def userDefinedPartitionColumns: Option[StructType] =
maybePartitionSpec.map(_.partitionColumns)

override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = ContextUtil.getConfiguration(job)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
job.setOutputValueClass(classOf[Row])
FileOutputFormat.setOutputPath(job, qualifiedOutputPath)

// We create a DataFrame by applying the schema of relation to the data to make sure.
// We are writing data based on the expected schema,
val df = sqlContext.createDataFrame(
DataFrame(sqlContext, query).queryExecution.toRdd,
relation.schema,
Expand Down
19 changes: 15 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.RunnableCommand
Expand Down Expand Up @@ -245,12 +245,13 @@ private[sql] object ResolvedDataSource {
SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
}

val dataSchema = StructType(schema.filterNot(f => partitionColumns.contains(f.name)))
val dataSchema =
StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable

dataSource.createRelation(
sqlContext,
paths,
Some(schema),
Some(dataSchema),
maybePartitionsSchema,
caseInsensitiveOptions)
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
Expand Down Expand Up @@ -320,10 +321,20 @@ private[sql] object ResolvedDataSource {
Some(dataSchema.asNullable),
Some(partitionColumnsSchema(data.schema, partitionColumns)),
caseInsensitiveOptions)

// For partitioned relation r, r.schema's column ordering is different with the column
// ordering of data.logicalPlan. We need a Project to adjust the ordering.
// So, inside InsertIntoHadoopFsRelation, we can safely apply the schema of r.schema to
// the data.
val project =
Project(
r.schema.map(field => new UnresolvedAttribute(Seq(field.name))),
data.logicalPlan)

sqlContext.executePlan(
InsertIntoHadoopFsRelation(
r,
data.logicalPlan,
project,
partitionColumns.toArray,
mode)).toRdd
r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.SerializableWritable
import org.apache.spark.sql._
import org.apache.spark.sql.{Row, _}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.types.{StructField, StructType}
Expand Down Expand Up @@ -120,11 +120,13 @@ trait HadoopFsRelationProvider {
* Returns a new base relation with the given parameters, a user defined schema, and a list of
* partition columns. Note: the parameters' keywords are case insensitive and this insensitivity
* is enforced by the Map that is passed to the function.
*
* @param dataSchema Schema of data columns (i.e., columns that are not partition columns).
*/
def createRelation(
sqlContext: SQLContext,
paths: Array[String],
schema: Option[StructType],
dataSchema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation
}
Expand Down Expand Up @@ -416,8 +418,29 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
final private[sql] def partitionSpec: PartitionSpec = {
if (_partitionSpec == null) {
_partitionSpec = maybePartitionSpec
.map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable))
.orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition])))
.flatMap {
case spec if spec.partitions.nonEmpty =>
Some(spec.copy(partitionColumns = spec.partitionColumns.asNullable))
case _ =>
None
}
.orElse {
// We only know the partition columns and their data types. We need to discover
// partition values.
userDefinedPartitionColumns.map { partitionSchema =>
val spec = discoverPartitions()
val castedPartitions = spec.partitions.map { case p @ Partition(values, path) =>
val literals = values.toSeq.zip(spec.partitionColumns.map(_.dataType)).map {
case (value, dataType) => Literal.create(value, dataType)
}
val castedValues = partitionSchema.zip(literals).map { case (field, literal) =>
Cast(literal, field.dataType).eval()
}
p.copy(values = Row.fromSeq(castedValues))
}
PartitionSpec(partitionSchema, castedPartitions)
}
}
.getOrElse {
if (sqlContext.conf.partitionDiscoveryEnabled()) {
discoverPartitions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,11 @@ trait SQLTestUtils {
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
try f finally sqlContext.dropTempTable(tableName)
}

/**
* Drops table `tableName` after calling `f`.
*/
protected def withTable(tableName: String)(f: => Unit): Unit = {
try f finally sqlContext.sql(s"DROP TABLE IF EXISTS $tableName")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
def schemaStringFromParts: Option[String] = {
table.properties.get("spark.sql.sources.schema.numParts").map { numParts =>
val parts = (0 until numParts.toInt).map { index =>
val part = table.properties.get(s"spark.sql.sources.schema.part.${index}").orNull
val part = table.properties.get(s"spark.sql.sources.schema.part.$index").orNull
if (part == null) {
throw new AnalysisException(
s"Could not read schema from the metastore because it is corrupted " +
s"(missing part ${index} of the schema).")
"Could not read schema from the metastore because it is corrupted " +
s"(missing part $index of the schema, $numParts parts are expected).")
}

part
Expand All @@ -89,6 +89,11 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val userSpecifiedSchema =
schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType])

// We only need names at here since userSpecifiedSchema we loaded from the metastore
// contains partition columns. We can always get datatypes of partitioning columns
// from userSpecifiedSchema.
val partitionColumns = table.partitionColumns.map(_.name)

// It does not appear that the ql client for the metastore has a way to enumerate all the
// SerDe properties directly...
val options = table.serdeProperties
Expand All @@ -97,7 +102,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
ResolvedDataSource(
hive,
userSpecifiedSchema,
Array.empty[String],
partitionColumns.toArray,
table.properties("spark.sql.sources.provider"),
options)

Expand All @@ -111,8 +116,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
override def refreshTable(databaseName: String, tableName: String): Unit = {
// refreshTable does not eagerly reload the cache. It just invalidate the cache.
// Next time when we use the table, it will be populated in the cache.
// Since we also cache ParquetRealtions converted from Hive Parquet tables and
// adding converted ParquetRealtions into the cache is not defined in the load function
// Since we also cache ParquetRelations converted from Hive Parquet tables and
// adding converted ParquetRelations into the cache is not defined in the load function
// of the cache (instead, we add the cache entry in convertToParquetRelation),
// it is better at here to invalidate the cache to avoid confusing waring logs from the
// cache loader (e.g. cannot find data source provider, which is only defined for
Expand All @@ -133,21 +138,47 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
def createDataSourceTable(
tableName: String,
userSpecifiedSchema: Option[StructType],
partitionColumns: Array[String],
provider: String,
options: Map[String, String],
isExternal: Boolean): Unit = {
val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
val tableProperties = new scala.collection.mutable.HashMap[String, String]
tableProperties.put("spark.sql.sources.provider", provider)

// Saves optional user specified schema. Serialized JSON schema string may be too long to be
// stored into a single metastore SerDe property. In this case, we split the JSON string and
// store each part as a separate SerDe property.
if (userSpecifiedSchema.isDefined) {
val threshold = conf.schemaStringLengthThreshold
val schemaJsonString = userSpecifiedSchema.get.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
tableProperties.put(s"spark.sql.sources.schema.part.${index}", part)
tableProperties.put(s"spark.sql.sources.schema.part.$index", part)
}
}

val metastorePartitionColumns = userSpecifiedSchema.map { schema =>
val fields = partitionColumns.map(col => schema(col))
fields.map { field =>
HiveColumn(
name = field.name,
hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType),
comment = "")
}.toSeq
}.getOrElse {
if (partitionColumns.length > 0) {
// The table does not have a specified schema, which means that the schema will be inferred
// when we load the table. So, we are not expecting partition columns and we will discover
// partitions when we load the table. However, if there are specified partition columns,
// we simplily ignore them and provide a warning message..
logWarning(
s"The schema and partitions of table $tableName will be inferred when it is loaded. " +
s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
}
Seq.empty[HiveColumn]
}

val tableType = if (isExternal) {
Expand All @@ -163,7 +194,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
specifiedDatabase = Option(dbName),
name = tblName,
schema = Seq.empty,
partitionColumns = Seq.empty,
partitionColumns = metastorePartitionColumns,
tableType = tableType,
properties = tableProperties.toMap,
serdeProperties = options))
Expand Down Expand Up @@ -199,7 +230,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val dataSourceTable =
cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
// Then, if alias is specified, wrap the table with a Subquery using the alias.
// Othersie, wrap the table with a Subquery using the table name.
// Otherwise, wrap the table with a Subquery using the table name.
val withAlias =
alias.map(a => Subquery(a, dataSourceTable)).getOrElse(
Subquery(tableIdent.last, dataSourceTable))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ case class CreateMetastoreDataSource(
hiveContext.catalog.createDataSourceTable(
tableName,
userSpecifiedSchema,
Array.empty[String],
provider,
optionsWithPath,
isExternal)
Expand Down Expand Up @@ -244,6 +245,7 @@ case class CreateMetastoreDataSourceAsSelect(
hiveContext.catalog.createDataSourceTable(
tableName,
Some(resolved.relation.schema),
partitionColumns,
provider,
optionsWithPath,
isExternal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,14 @@ private[sql] class DefaultSource extends HadoopFsRelationProvider {
def createRelation(
sqlContext: SQLContext,
paths: Array[String],
schema: Option[StructType],
dataSchema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation = {
assert(
sqlContext.isInstanceOf[HiveContext],
"The ORC data source can only be used with HiveContext.")

val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty[Partition]))
OrcRelation(paths, parameters, schema, partitionSpec)(sqlContext)
new OrcRelation(paths, dataSchema, None, partitionColumns, parameters)(sqlContext)
}
}

Expand Down Expand Up @@ -136,23 +135,35 @@ private[orc] class OrcOutputWriter(
}

@DeveloperApi
private[sql] case class OrcRelation(
private[sql] class OrcRelation(
override val paths: Array[String],
parameters: Map[String, String],
maybeSchema: Option[StructType] = None,
maybePartitionSpec: Option[PartitionSpec] = None)(
maybeDataSchema: Option[StructType],
maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
@transient val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec)
with Logging {

override val dataSchema: StructType = maybeSchema.getOrElse {
private[sql] def this(
paths: Array[String],
maybeDataSchema: Option[StructType],
maybePartitionSpec: Option[PartitionSpec],
parameters: Map[String, String])(
sqlContext: SQLContext) = {
this(
paths,
maybeDataSchema,
maybePartitionSpec,
maybePartitionSpec.map(_.partitionColumns),
parameters)(sqlContext)
}

override val dataSchema: StructType = maybeDataSchema.getOrElse {
OrcFileOperator.readSchema(
paths.head, Some(sqlContext.sparkContext.hadoopConfiguration))
}

override def userDefinedPartitionColumns: Option[StructType] =
maybePartitionSpec.map(_.partitionColumns)

override def needConversion: Boolean = false

override def equals(other: Any): Boolean = other match {
Expand All @@ -169,7 +180,7 @@ private[sql] case class OrcRelation(
paths.toSet,
dataSchema,
schema,
maybePartitionSpec)
partitionColumns)
}

override def buildScan(
Expand Down