From 7b5b44364aa6491867ffc8199a12ca4dde4e14af Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 13 Apr 2023 17:58:36 +0800 Subject: [PATCH 1/2] special internal field metadata should not be leaked to catalogs --- .../sql/catalyst/analysis/Analyzer.scala | 4 +- .../spark/sql/catalyst/util/package.scala | 29 ++++++++++--- .../command/createDataSourceTables.scala | 5 ++- .../v2/WriteToDataSourceV2Exec.scala | 42 +++++++++---------- .../sql/connector/MetadataColumnSuite.scala | 16 +++++++ 5 files changed, 64 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 8821e652a31f0..2638b780102a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.catalyst.trees.AlwaysProcess import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin import org.apache.spark.sql.catalyst.trees.TreePattern._ -import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils, StringUtils} +import org.apache.spark.sql.catalyst.util.{toPrettySQL, AUTO_GENERATED_ALIAS, CharVarcharUtils, StringUtils} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.connector.catalog.{View => _, _} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -492,7 +492,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case l: Literal => Alias(l, toPrettySQL(l))() case e => val metaForAutoGeneratedAlias = new MetadataBuilder() - .putString("__autoGeneratedAlias", "true") + .putString(AUTO_GENERATED_ALIAS, "true") .build() Alias(e, toPrettySQL(e))(explicitMetadata = Some(metaForAutoGeneratedAlias)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index e1ce45c135385..e482b9a8d3d32 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -27,7 +27,7 @@ import com.google.common.io.ByteStreams import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType} +import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType, StructType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -191,12 +191,13 @@ package object util extends Logging { val METADATA_COL_ATTR_KEY = "__metadata_col" + /** + * If set, this metadata column can only be accessed with qualifiers, e.g. `qualifiers.col` or + * `qualifiers.*`. If not set, metadata columns cannot be accessed via star. + */ + val QUALIFIED_ACCESS_ONLY = "__qualified_access_only" + implicit class MetadataColumnHelper(attr: Attribute) { - /** - * If set, this metadata column can only be accessed with qualifiers, e.g. `qualifiers.col` or - * `qualifiers.*`. If not set, metadata columns cannot be accessed via star. - */ - val QUALIFIED_ACCESS_ONLY = "__qualified_access_only" def isMetadataCol: Boolean = MetadataAttribute.isValid(attr.metadata) @@ -225,4 +226,20 @@ package object util extends Logging { } } } + + val AUTO_GENERATED_ALIAS = "__autoGeneratedAlias" + + def removeInternalMetadata(schema: StructType): StructType = { + StructType(schema.map { field => + val newMetadata = new MetadataBuilder().withMetadata(field.metadata) + .remove(METADATA_COL_ATTR_KEY) + .remove(QUALIFIED_ACCESS_ONLY) + .remove(AUTO_GENERATED_ALIAS) + .remove(FileSourceMetadataAttribute.FILE_SOURCE_METADATA_COL_ATTR_KEY) + .remove(FileSourceConstantMetadataStructField.FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY) + .remove(FileSourceGeneratedMetadataStructField.FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY) + .build() + field.copy(metadata = newMetadata) + }) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index bf14ef14cf463..3848d5505155e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -22,7 +22,7 @@ import java.net.URI import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.CommandExecutionMode import org.apache.spark.sql.execution.datasources._ @@ -181,7 +181,8 @@ case class CreateDataSourceTableAsSelectCommand( } val result = saveDataIntoTable( sparkSession, table, tableLocation, SaveMode.Overwrite, tableExists = false) - val tableSchema = CharVarcharUtils.getRawSchema(result.schema, sessionState.conf) + val tableSchema = CharVarcharUtils.getRawSchema( + removeInternalMetadata(result.schema), sessionState.conf) val newTable = table.copy( storage = table.storage.copy(locationUri = tableLocation), // We will use the schema of resolved.relation as the schema of the table (instead of diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 8355ac8e70366..426f33129a6ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -26,15 +26,16 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, TableSpec, UnaryNode} -import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, WriteDeltaProjections} +import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils, WriteDeltaProjections} import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSERT_OPERATION, UPDATE_OPERATION} -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, WriterCommitMessage} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics} +import org.apache.spark.sql.types.StructType import org.apache.spark.util.{LongAccumulator, Utils} /** @@ -69,7 +70,7 @@ case class CreateTableAsSelectExec( query: LogicalPlan, tableSpec: TableSpec, writeOptions: Map[String, String], - ifNotExists: Boolean) extends TableWriteExecHelper { + ifNotExists: Boolean) extends V2CreateTableAsSelectBaseExec { val properties = CatalogV2Util.convertTableProperties(tableSpec) @@ -78,14 +79,10 @@ case class CreateTableAsSelectExec( if (ifNotExists) { return Nil } - throw QueryCompilationErrors.tableAlreadyExistsError(ident) } - - val columns = CatalogV2Util.structTypeToV2Columns( - CharVarcharUtils.getRawSchema(query.schema, conf).asNullable) - val table = catalog.createTable(ident, columns, - partitioning.toArray, properties.asJava) + val table = catalog.createTable( + ident, getV2Columns(query.schema), partitioning.toArray, properties.asJava) writeToTable(catalog, table, writeOptions, ident, query) } } @@ -106,7 +103,7 @@ case class AtomicCreateTableAsSelectExec( query: LogicalPlan, tableSpec: TableSpec, writeOptions: Map[String, String], - ifNotExists: Boolean) extends TableWriteExecHelper { + ifNotExists: Boolean) extends V2CreateTableAsSelectBaseExec { val properties = CatalogV2Util.convertTableProperties(tableSpec) @@ -115,13 +112,10 @@ case class AtomicCreateTableAsSelectExec( if (ifNotExists) { return Nil } - throw QueryCompilationErrors.tableAlreadyExistsError(ident) } - val columns = CatalogV2Util.structTypeToV2Columns( - CharVarcharUtils.getRawSchema(query.schema, conf).asNullable) val stagedTable = catalog.stageCreate( - ident, columns, partitioning.toArray, properties.asJava) + ident, getV2Columns(query.schema), partitioning.toArray, properties.asJava) writeToTable(catalog, stagedTable, writeOptions, ident, query) } } @@ -144,7 +138,8 @@ case class ReplaceTableAsSelectExec( tableSpec: TableSpec, writeOptions: Map[String, String], orCreate: Boolean, - invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper { + invalidateCache: (TableCatalog, Table, Identifier) => Unit) + extends V2CreateTableAsSelectBaseExec { val properties = CatalogV2Util.convertTableProperties(tableSpec) @@ -164,10 +159,8 @@ case class ReplaceTableAsSelectExec( } else if (!orCreate) { throw QueryCompilationErrors.cannotReplaceMissingTableError(ident) } - val columns = CatalogV2Util.structTypeToV2Columns( - CharVarcharUtils.getRawSchema(query.schema, conf).asNullable) val table = catalog.createTable( - ident, columns, partitioning.toArray, properties.asJava) + ident, getV2Columns(query.schema), partitioning.toArray, properties.asJava) writeToTable(catalog, table, writeOptions, ident, query) } } @@ -192,13 +185,13 @@ case class AtomicReplaceTableAsSelectExec( tableSpec: TableSpec, writeOptions: Map[String, String], orCreate: Boolean, - invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper { + invalidateCache: (TableCatalog, Table, Identifier) => Unit) + extends V2CreateTableAsSelectBaseExec { val properties = CatalogV2Util.convertTableProperties(tableSpec) override protected def run(): Seq[InternalRow] = { - val columns = CatalogV2Util.structTypeToV2Columns( - CharVarcharUtils.getRawSchema(query.schema, conf).asNullable) + val columns = getV2Columns(query.schema) if (catalog.tableExists(ident)) { val table = catalog.loadTable(ident) invalidateCache(catalog, table, ident) @@ -559,9 +552,14 @@ case class DeltaWithMetadataWritingSparkTask( } } -private[v2] trait TableWriteExecHelper extends LeafV2CommandExec { +private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec { override def output: Seq[Attribute] = Nil + protected def getV2Columns(schema: StructType): Array[Column] = { + CatalogV2Util.structTypeToV2Columns(CharVarcharUtils.getRawSchema( + removeInternalMetadata(schema), conf).asNullable) + } + protected def writeToTable( catalog: TableCatalog, table: Table, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala index d03bf402170ca..b043bf2f5be23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala @@ -18,8 +18,11 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.{col, struct} +import org.apache.spark.sql.types.IntegerType class MetadataColumnSuite extends DatasourceV2SQLBase { import testImplicits._ @@ -340,4 +343,17 @@ class MetadataColumnSuite extends DatasourceV2SQLBase { assert(relations(0).output != relations(1).output) } } + + test("SPARK-43123: Metadata column related field metadata should not be leaked to catalogs") { + withTable(tbl, "testcat.target") { + prepareTable() + sql(s"CREATE TABLE testcat.target AS SELECT index FROM $tbl") + val cols = catalog("testcat").asTableCatalog.loadTable( + Identifier.of(Array.empty, "target")).columns() + assert(cols.length == 1) + assert(cols.head.name() == "index") + assert(cols.head.dataType() == IntegerType) + assert(cols.head.metadataInJSON() == null) + } + } } From 93f9335308552e6b6e35f5dca67c9baa33b7e994 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 14 Apr 2023 12:34:13 +0800 Subject: [PATCH 2/2] address comments --- .../spark/sql/catalyst/util/package.scala | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index e482b9a8d3d32..23ddb534af919 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -229,17 +229,22 @@ package object util extends Logging { val AUTO_GENERATED_ALIAS = "__autoGeneratedAlias" + val INTERNAL_METADATA_KEYS = Seq( + AUTO_GENERATED_ALIAS, + METADATA_COL_ATTR_KEY, + QUALIFIED_ACCESS_ONLY, + FileSourceMetadataAttribute.FILE_SOURCE_METADATA_COL_ATTR_KEY, + FileSourceConstantMetadataStructField.FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY, + FileSourceGeneratedMetadataStructField.FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY + ) + def removeInternalMetadata(schema: StructType): StructType = { StructType(schema.map { field => - val newMetadata = new MetadataBuilder().withMetadata(field.metadata) - .remove(METADATA_COL_ATTR_KEY) - .remove(QUALIFIED_ACCESS_ONLY) - .remove(AUTO_GENERATED_ALIAS) - .remove(FileSourceMetadataAttribute.FILE_SOURCE_METADATA_COL_ATTR_KEY) - .remove(FileSourceConstantMetadataStructField.FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY) - .remove(FileSourceGeneratedMetadataStructField.FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY) - .build() - field.copy(metadata = newMetadata) + var builder = new MetadataBuilder().withMetadata(field.metadata) + INTERNAL_METADATA_KEYS.foreach { key => + builder = builder.remove(key) + } + field.copy(metadata = builder.build()) }) } }