Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33480][SQL] Support char/varchar type #30412

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sql-ref-datatypes.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ Spark SQL and DataFrames support the following data types:
- `DecimalType`: Represents arbitrary-precision signed decimal numbers. Backed internally by `java.math.BigDecimal`. A `BigDecimal` consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
* String type
- `StringType`: Represents character string values.
- `VarcharType(length)`: A variant of `StringType` which has a length limitation. Data writing will fail if the input string exceeds the length limitation. Note: this type can only be used in table schema, not functions/operators.
- `CharType(length)`: A variant of `VarcharType(length)` which is fixed length. Reading column of type `CharType(n)` always returns string values of length `n`. Char type column comparison will pad the short one to the longer length.
* Binary type
- `BinaryType`: Represents byte sequence values.
* Boolean type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
Expand Down Expand Up @@ -3097,7 +3097,12 @@ class Analyzer(override val catalogManager: CatalogManager)
val projection = TableOutputResolver.resolveOutputColumns(
v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf)
if (projection != v2Write.query) {
v2Write.withNewQuery(projection)
val cleanedTable = v2Write.table match {
case r: DataSourceV2Relation =>
r.copy(output = r.output.map(CharVarcharUtils.cleanAttrMetadata))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We remove the char/varchar metadata after length check expressions are added, so that we don't do it repeatedly and this rule is idempotent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the current implementation assume the analyzer removes the metadata in plans before the optimizer phase? If so, how about checking if plans don't have the metadata in CheckAnalysis?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it doesn't. Metadata is fine as it's harmless. We only need to watch out for some specific rules that look at the char/varchar metadata, and make sure they are idempotent.

As a fact, the added cast and length check expression is wrapped by an Alias which retains char/varchar metadata. So the output attributes of Project above the v2 relation still have metadata. It's necessary as we need to rely on it later to do padding for char type column comparison.

case other => other
}
v2Write.withNewQuery(projection).withNewTable(cleanedTable)
} else {
v2Write
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, SupportsPartitionManagement, Table}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -94,6 +94,10 @@ trait CheckAnalysis extends PredicateHelper {

case p if p.analyzed => // Skip already analyzed sub-plans

case leaf: LeafNode if leaf.output.map(_.dataType).exists(CharVarcharUtils.hasCharVarchar) =>
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu I changed it back as it's pretty risky to get output from arbitrary logical plans. An example of the error:

Caused by: sbt.ForkMain$ForkError: java.lang.AssertionError: assertion failed: Scalar subquery should have only one column
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.sql.catalyst.expressions.ScalarSubquery.dataType(subquery.scala:229)
	at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:181)
	at org.apache.spark.sql.catalyst.plans.logical.Project.$anonfun$output$1(basicLogicalOperators.scala:61)

throw new IllegalStateException(
"[BUG] logical plan should not have output of char/varchar type: " + leaf)

case u: UnresolvedNamespace =>
u.failAnalysis(s"Namespace not found: ${u.multipartIdentifier.quoted}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case AlterTableAddColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
cols.foreach(c => failCharType(c.dataType))
val changes = cols.map { col =>
TableChange.addColumn(
col.name.toArray,
Expand All @@ -49,7 +48,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case AlterTableReplaceColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
cols.foreach(c => failCharType(c.dataType))
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(table) =>
// REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
Expand All @@ -72,7 +70,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case a @ AlterTableAlterColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
a.dataType.foreach(failNullType)
a.dataType.foreach(failCharType)
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
TableChange.updateColumnType(colName, newDataType)
Expand Down Expand Up @@ -145,7 +142,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
assertNoCharTypeInSchema(c.tableSchema)
CreateV2Table(
catalog.asTableCatalog,
tbl.asIdentifier,
Expand Down Expand Up @@ -173,7 +169,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case c @ ReplaceTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
assertNoCharTypeInSchema(c.tableSchema)
ReplaceTable(
catalog.asTableCatalog,
tbl.asIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, AlterTableDropPartition, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
Expand Down Expand Up @@ -66,7 +67,8 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {

val partValues = partSchema.map { part =>
val raw = normalizedSpec.get(part.name).orNull
Cast(Literal.create(raw, StringType), part.dataType, Some(conf.sessionLocalTimeZone)).eval()
val dt = CharVarcharUtils.replaceCharVarcharWithString(part.dataType)
Cast(Literal.create(raw, StringType), dt, Some(conf.sessionLocalTimeZone)).eval()
}
InternalRow.fromSeq(partValues)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, AnsiCast, Attribute, Cast, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.types.DataType
Expand Down Expand Up @@ -93,19 +94,17 @@ object TableOutputResolver {
tableAttr.metadata == queryExpr.metadata) {
Some(queryExpr)
} else {
// Renaming is needed for handling the following cases like
// 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
// 2) Target tables have column metadata
storeAssignmentPolicy match {
val casted = storeAssignmentPolicy match {
case StoreAssignmentPolicy.ANSI =>
Some(Alias(
AnsiCast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone)),
tableAttr.name)(explicitMetadata = Option(tableAttr.metadata)))
AnsiCast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone))
case _ =>
Some(Alias(
Cast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone)),
tableAttr.name)(explicitMetadata = Option(tableAttr.metadata)))
Cast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone))
}
val exprWithStrLenCheck = CharVarcharUtils.stringLengthCheck(casted, tableAttr)
// Renaming is needed for handling the following cases like
// 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
// 2) Target tables have column metadata
Some(Alias(exprWithStrLenCheck, tableAttr.name)(explicitMetadata = Some(tableAttr.metadata)))
}

storeAssignmentPolicy match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, ImplicitCastInputTypes}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
Expand Down Expand Up @@ -473,7 +473,10 @@ class SessionCatalog(
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.getTable(db, table)
val t = externalCatalog.getTable(db, table)
// We replace char/varchar with "annotated" string type in the table schema, as the query
// engine doesn't support char/varchar yet.
t.copy(schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(t.schema))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, IntervalUtils}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.catalyst.util.IntervalUtils.IntervalUnit
import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog}
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
Expand Down Expand Up @@ -99,7 +99,9 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

override def visitSingleTableSchema(ctx: SingleTableSchemaContext): StructType = {
withOrigin(ctx)(StructType(visitColTypeList(ctx.colTypeList)))
val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
StructType(visitColTypeList(ctx.colTypeList)))
withOrigin(ctx)(schema)
}

def parseRawDataType(ctx: SingleDataTypeContext): DataType = withOrigin(ctx) {
Expand Down Expand Up @@ -2216,7 +2218,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
* Create a Spark DataType.
*/
private def visitSparkDataType(ctx: DataTypeContext): DataType = {
HiveStringType.replaceCharType(typedVisit(ctx))
CharVarcharUtils.replaceCharVarcharWithString(typedVisit(ctx))
}

/**
Expand Down Expand Up @@ -2291,16 +2293,9 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
builder.putString("comment", _)
}

// Add Hive type string to metadata.
val rawDataType = typedVisit[DataType](ctx.dataType)
val cleanedDataType = HiveStringType.replaceCharType(rawDataType)
if (rawDataType != cleanedDataType) {
builder.putString(HIVE_TYPE_STRING, rawDataType.catalogString)
}

StructField(
name = colName.getText,
dataType = cleanedDataType,
dataType = typedVisit[DataType](ctx.dataType),
nullable = NULL == null,
metadata = builder.build())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, Res
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
import org.apache.spark.sql.connector.expressions.Transform
Expand All @@ -45,9 +46,10 @@ trait V2WriteCommand extends Command {
table.skipSchemaResolution || (query.output.size == table.output.size &&
query.output.zip(table.output).forall {
case (inAttr, outAttr) =>
val outType = CharVarcharUtils.getRawType(outAttr.metadata).getOrElse(outAttr.dataType)
// names and types must match, nullability must be compatible
inAttr.name == outAttr.name &&
DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, outAttr.dataType) &&
DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, outType) &&
(outAttr.nullable || !inAttr.nullable)
})
}
Expand Down
Loading