Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16552] [SQL] Store the Inferred Schemas into External Catalog Tables when Creating Tables #14207

Closed
wants to merge 15 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ abstract class Catalog {
* If this table is cached as an InMemoryRelation, drop the original cached version and make the
* new version cached lazily.
*
* If the table's schema is inferred at runtime, infer the schema again and update the schema
Copy link
Contributor

@cloud-fan cloud-fan Jul 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @rxin, I'm thinking of what's the main reason to allow inferring the table schema at run time. IIRC, it's mainly because we wanna save some typing when creating external data source table by SQL string, which usually have very long schema, e.g. json files.

If this is true, then the table schema is not supposed to change. If users do wanna change it, I'd argue that it's a different table, users should drop this table and create a new one. Then we don't need to make refresh table support schema changing and thus don't need to store the DATASOURCE_SCHEMA_ISINFERRED flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refreshTable shouldn't run schema inference. Only run schema inference when creating the table.

And don't make this a config flag. Just run schema inference when creating the table. For managed tables, store the schema explicitly. Users must explicitly change it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin @cloud-fan I see. Will make a change

FYI, this will change the existing external behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes unfortunately I find out about this one too late. I will add it to the release notes for 2.0 that this change will come.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

* in the external catalog.
*
* @since 2.0.0
*/
def refreshTable(tableName: String): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class CreateDataSourceTableCommand(
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String],
partitionColumns: Array[String],
userSpecifiedPartitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
ignoreIfExists: Boolean,
managedIfNoPath: Boolean)
Expand Down Expand Up @@ -95,17 +95,39 @@ case class CreateDataSourceTableCommand(
}

// Create the relation to validate the arguments before writing the metadata to the metastore.
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
bucketSpec = None,
options = optionsWithPath).resolveRelation(checkPathExist = false)
val dataSource: HadoopFsRelation =
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
bucketSpec = None,
options = optionsWithPath)
.resolveRelation(checkPathExist = false).asInstanceOf[HadoopFsRelation]
Copy link
Contributor

@cloud-fan cloud-fan Jul 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to cast it HadoopFsRelation?

Copy link
Contributor

@cloud-fan cloud-fan Jul 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a safer way is to do a pattern match here, if it's HadoopFsRelation, get its partition columns, else, no partition columns

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do it.


if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionColumns.length > 0) {
// The table does not have a specified schema, which means that the schema will be inferred
// when we load the table. So, we are not expecting partition columns and we will discover
// partitions when we load the table. However, if there are specified partition columns,
// we simply ignore them and provide a warning message.
logWarning(
s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " +
s"ignored. The schema and partition columns of table $tableIdent are inferred. " +
s"Schema: ${dataSource.schema.simpleString}; " +
s"Partition columns: ${dataSource.partitionSchema.fieldNames}")
}

val partitionColumns =
if (userSpecifiedSchema.isEmpty) {
dataSource.partitionSchema.fieldNames
} else {
userSpecifiedPartitionColumns
}

CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
userSpecifiedSchema = userSpecifiedSchema,
schema = dataSource.schema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we should still use the user-specified schema, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think from the code, it is not very clear that dataSource.schema will be userSpecifiedSchema?

Copy link
Member Author

@gatorsmile gatorsmile Aug 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, dataSource.schema could be the inferred schema. Previously, we do not store the inferred schema. After this PR, we did and thus we use dataSource.schema.

Actually, after re-checking the code, I found the schema might be adjusted a little even if users specify the schema. For example, the nullability could be changed :

I think we should make such a change but maybe we should test and log it?

isSchemaInferred = userSpecifiedSchema.isEmpty,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
Expand Down Expand Up @@ -256,7 +278,8 @@ case class CreateDataSourceTableAsSelectCommand(
CreateDataSourceTableUtils.createDataSourceTable(
sparkSession = sparkSession,
tableIdent = tableIdent,
userSpecifiedSchema = Some(result.schema),
schema = result.schema,
isSchemaInferred = false,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
Expand All @@ -270,7 +293,6 @@ case class CreateDataSourceTableAsSelectCommand(
}
}


object CreateDataSourceTableUtils extends Logging {

val DATASOURCE_PREFIX = "spark.sql.sources."
Expand All @@ -279,6 +301,7 @@ object CreateDataSourceTableUtils extends Logging {
val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path"
val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
val DATASOURCE_SCHEMA_ISINFERRED = DATASOURCE_SCHEMA_PREFIX + "isInferred"
val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
Expand All @@ -303,10 +326,40 @@ object CreateDataSourceTableUtils extends Logging {
matcher.matches()
}

/**
* Saves the schema (including partition info) into the table properties.
* Overwrites the schema, if already existed.
*/
def saveSchema(
sparkSession: SparkSession,
schema: StructType,
partitionColumns: Array[String],
tableProperties: mutable.HashMap[String, String]): Unit = {
// Serialized JSON schema string may be too long to be stored into a single
// metastore SerDe property. In this case, we split the JSON string and store each part as
// a separate table property.
val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}

if (partitionColumns.length > 0) {
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
}
}
}

def createDataSourceTable(
sparkSession: SparkSession,
tableIdent: TableIdentifier,
userSpecifiedSchema: Option[StructType],
schema: StructType,
isSchemaInferred: Boolean,
partitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
provider: String,
Expand All @@ -315,28 +368,10 @@ object CreateDataSourceTableUtils extends Logging {
val tableProperties = new mutable.HashMap[String, String]
tableProperties.put(DATASOURCE_PROVIDER, provider)

// Saves optional user specified schema. Serialized JSON schema string may be too long to be
// stored into a single metastore SerDe property. In this case, we split the JSON string and
// store each part as a separate SerDe property.
userSpecifiedSchema.foreach { schema =>
val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}
}
tableProperties.put(DATASOURCE_SCHEMA_ISINFERRED, isSchemaInferred.toString.toUpperCase)
saveSchema(sparkSession, schema, partitionColumns, tableProperties)

if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) {
tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
}
}

if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
if (bucketSpec.isDefined) {
val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get

tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString)
Expand All @@ -353,16 +388,6 @@ object CreateDataSourceTableUtils extends Logging {
}
}

if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
// The table does not have a specified schema, which means that the schema will be inferred
// when we load the table. So, we are not expecting partition columns and we will discover
// partitions when we load the table. However, if there are specified partition columns,
// we simply ignore them and provide a warning message.
logWarning(
s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
}

val tableType = if (isExternal) {
tableProperties.put("EXTERNAL", "TRUE")
CatalogTableType.EXTERNAL
Expand All @@ -375,7 +400,7 @@ object CreateDataSourceTableUtils extends Logging {
val dataSource =
DataSource(
sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
className = provider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,10 @@ object DDLUtils {
isDatasourceTable(table.properties)
}

def isSchemaInferred(table: CatalogTable): Boolean = {
table.properties.get(DATASOURCE_SCHEMA_ISINFERRED) == Option(true.toString.toUpperCase)
}

/**
* If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view,
* issue an exception [[AnalysisException]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,15 +413,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
} else {
val metadata = catalog.getTableMetadata(table)

if (DDLUtils.isDatasourceTable(metadata)) {
DDLUtils.getSchemaFromTableProperties(metadata) match {
case Some(userSpecifiedSchema) => describeSchema(userSpecifiedSchema, result)
case None => describeSchema(catalog.lookupRelation(table).schema, result)
}
} else {
describeSchema(metadata.schema, result)
}

describeSchema(metadata, result)
if (isExtended) {
describeExtended(metadata, result)
} else if (isFormatted) {
Expand Down Expand Up @@ -518,6 +510,19 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
}

private def describeSchema(
tableDesc: CatalogTable,
buffer: ArrayBuffer[Row]): Unit = {
if (DDLUtils.isDatasourceTable(tableDesc)) {
DDLUtils.getSchemaFromTableProperties(tableDesc) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now getSchemaFromTableProperties should never return None?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all types of data source tables, we store the schema in the table properties. Thus, we should not return None; unless the table properties are modified by users using the Alter Table command.

Sorry, forgot to update the message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, the message is changed to "# Schema of this table is corrupted"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make DDLUtils.getSchemaFromTableProperties always return a schema and throw exception if it's corrupted? I think it's more consistent with the previous behaviour, i.e. throw exception if the expected schema properties doesn't exist.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

case Some(userSpecifiedSchema) => describeSchema(userSpecifiedSchema, buffer)
case None => append(buffer, "# Schema of this table is inferred at runtime", "", "")
}
} else {
describeSchema(tableDesc.schema, buffer)
}
}

private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.internal

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.annotation.Experimental
Expand All @@ -27,7 +28,8 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifie
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.datasources.CreateTableUsing
import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils}
import org.apache.spark.sql.execution.datasources.{CreateTableUsing, DataSource, HadoopFsRelation}
import org.apache.spark.sql.types.StructType


Expand Down Expand Up @@ -350,6 +352,54 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
sparkSession.sharedState.cacheManager.lookupCachedData(qName).nonEmpty
}

/**
* Refresh the inferred schema stored in the external catalog for data source tables.
*/
private def refreshInferredSchema(tableIdent: TableIdentifier): Unit = {
val table = sessionCatalog.getTableMetadataOption(tableIdent)
table.foreach { tableDesc =>
if (DDLUtils.isDatasourceTable(tableDesc) && DDLUtils.isSchemaInferred(tableDesc)) {
val partitionColumns = DDLUtils.getPartitionColumnsFromTableProperties(tableDesc)
val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(tableDesc)
val dataSource =
DataSource(
sparkSession,
userSpecifiedSchema = None,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
className = tableDesc.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER),
options = tableDesc.storage.serdeProperties)
.resolveRelation().asInstanceOf[HadoopFsRelation]

val schemaProperties = new mutable.HashMap[String, String]
CreateDataSourceTableUtils.saveSchema(
sparkSession, dataSource.schema, dataSource.partitionSchema.fieldNames, schemaProperties)

def isPropertyForInferredSchema(key: String): Boolean = {
key match {
case CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS => true
case CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTCOLS => true
case _
if key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_PART_PREFIX) ||
key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_PARTCOL_PREFIX)
=> true
case _ => false
}
}

// Keep the properties that are not for schema or partition columns
val tablePropertiesWithoutSchema = tableDesc.properties.filterKeys { k =>
!isPropertyForInferredSchema(k)
}

val newTable = tableDesc.copy(properties = tablePropertiesWithoutSchema ++ schemaProperties)

// Alter the schema-related table properties that are stored in external catalog.
sessionCatalog.alterTable(newTable)
}
}
}

/**
* Refresh the cache entry for a table, if any. For Hive metastore table, the metadata
* is refreshed.
Expand All @@ -359,6 +409,13 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
override def refreshTable(tableName: String): Unit = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
// Refresh the schema in external catalog, if it is a data source table whose schema is inferred
// at runtime. For user-specified schema, we do not infer and update the schema.
// TODO: Support column-related ALTER TABLE DDL commands, and then users can update
// the user-specified schema.
refreshInferredSchema(tableIdent)
// Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively.
// Non-temp tables: refresh the metadata cache.
sessionCatalog.refreshTable(tableIdent)

// If this table is cached as an InMemoryRelation, drop the original
Expand Down
Loading