Skip to content

Commit

Permalink
support char/varchar type
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Nov 18, 2020
1 parent 74bd046 commit a7f0730
Show file tree
Hide file tree
Showing 35 changed files with 1,036 additions and 299 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
Expand Down Expand Up @@ -3088,7 +3088,12 @@ class Analyzer(override val catalogManager: CatalogManager)
val projection = TableOutputResolver.resolveOutputColumns(
v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf)
if (projection != v2Write.query) {
v2Write.withNewQuery(projection)
val cleanedTable = v2Write.table match {
case r: DataSourceV2Relation =>
r.copy(output = r.output.map(CharVarcharUtils.cleanAttrMetadata))
case other => other
}
v2Write.withNewQuery(projection).withNewTable(cleanedTable)
} else {
v2Write
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, SupportsPartitionManagement, Table}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -94,6 +94,10 @@ trait CheckAnalysis extends PredicateHelper {

case p if p.analyzed => // Skip already analyzed sub-plans

case leaf: LeafNode if leaf.output.map(_.dataType).exists(CharVarcharUtils.hasCharVarchar) =>
throw new IllegalStateException(
"[BUG] leaf logical plan should not have output of char/varchar type: " + leaf)

case u: UnresolvedNamespace =>
u.failAnalysis(s"Namespace not found: ${u.multipartIdentifier.quoted}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableCatalog, TableChange}

/**
* Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements
Expand All @@ -35,7 +35,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case AlterTableAddColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
cols.foreach(c => failCharType(c.dataType))
val changes = cols.map { col =>
TableChange.addColumn(
col.name.toArray,
Expand All @@ -49,7 +48,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case AlterTableReplaceColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
cols.foreach(c => failCharType(c.dataType))
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(table) =>
// REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
Expand All @@ -72,7 +70,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case a @ AlterTableAlterColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
a.dataType.foreach(failNullType)
a.dataType.foreach(failCharType)
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
TableChange.updateColumnType(colName, newDataType)
Expand Down Expand Up @@ -145,7 +142,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
assertNoCharTypeInSchema(c.tableSchema)
CreateV2Table(
catalog.asTableCatalog,
tbl.asIdentifier,
Expand Down Expand Up @@ -173,7 +169,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case c @ ReplaceTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
assertNoCharTypeInSchema(c.tableSchema)
ReplaceTable(
catalog.asTableCatalog,
tbl.asIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, AnsiCast, Attribute, Cast, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.types.DataType
Expand Down Expand Up @@ -93,19 +94,17 @@ object TableOutputResolver {
tableAttr.metadata == queryExpr.metadata) {
Some(queryExpr)
} else {
// Renaming is needed for handling the following cases like
// 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
// 2) Target tables have column metadata
storeAssignmentPolicy match {
val casted = storeAssignmentPolicy match {
case StoreAssignmentPolicy.ANSI =>
Some(Alias(
AnsiCast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone)),
tableAttr.name)(explicitMetadata = Option(tableAttr.metadata)))
AnsiCast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone))
case _ =>
Some(Alias(
Cast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone)),
tableAttr.name)(explicitMetadata = Option(tableAttr.metadata)))
Cast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone))
}
val strLenChecked = CharVarcharUtils.stringLengthCheck(casted, tableAttr)
// Renaming is needed for handling the following cases like
// 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
// 2) Target tables have column metadata
Some(Alias(strLenChecked, tableAttr.name)(explicitMetadata = Some(tableAttr.metadata)))
}

storeAssignmentPolicy match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, ImplicitCastInputTypes}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
Expand Down Expand Up @@ -473,7 +473,13 @@ class SessionCatalog(
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.getTable(db, table)
removeCharVarcharFromTableSchema(externalCatalog.getTable(db, table))
}

// We replace char/varchar with string type in the table schema, as Spark's type system doesn't
// support char/varchar yet.
private def removeCharVarcharFromTableSchema(t: CatalogTable): CatalogTable = {
t.copy(schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(t.schema))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, IntervalUtils}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.catalyst.util.IntervalUtils.IntervalUnit
import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog}
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
Expand Down Expand Up @@ -2201,7 +2201,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
* Create a Spark DataType.
*/
private def visitSparkDataType(ctx: DataTypeContext): DataType = {
HiveStringType.replaceCharType(typedVisit(ctx))
CharVarcharUtils.replaceCharVarcharWithString(typedVisit(ctx))
}

/**
Expand Down Expand Up @@ -2276,16 +2276,9 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
builder.putString("comment", _)
}

// Add Hive type string to metadata.
val rawDataType = typedVisit[DataType](ctx.dataType)
val cleanedDataType = HiveStringType.replaceCharType(rawDataType)
if (rawDataType != cleanedDataType) {
builder.putString(HIVE_TYPE_STRING, rawDataType.catalogString)
}

StructField(
name = colName.getText,
dataType = cleanedDataType,
dataType = typedVisit[DataType](ctx.dataType),
nullable = NULL == null,
metadata = builder.build())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, Res
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
import org.apache.spark.sql.connector.expressions.Transform
Expand All @@ -45,9 +46,10 @@ trait V2WriteCommand extends Command {
table.skipSchemaResolution || (query.output.size == table.output.size &&
query.output.zip(table.output).forall {
case (inAttr, outAttr) =>
val outType = CharVarcharUtils.getRawType(outAttr.metadata).getOrElse(outAttr.dataType)
// names and types must match, nullability must be compatible
inAttr.name == outAttr.name &&
DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, outAttr.dataType) &&
DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, outType) &&
(outAttr.nullable || !inAttr.nullable)
})
}
Expand Down
Loading

0 comments on commit a7f0730

Please sign in to comment.