Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Nov 19, 2020
1 parent 99a6f21 commit 17d13e5
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ trait CheckAnalysis extends PredicateHelper {

case p if p.analyzed => // Skip already analyzed sub-plans

case leaf: LeafNode if leaf.output.map(_.dataType).exists(CharVarcharUtils.hasCharVarchar) =>
case p if p.output.map(_.dataType).exists(CharVarcharUtils.hasCharVarchar) =>
throw new IllegalStateException(
"[BUG] leaf logical plan should not have output of char/varchar type: " + leaf)
"[BUG] logical plan should not have output of char/varchar type: " + p)

case u: UnresolvedNamespace =>
u.failAnalysis(s"Namespace not found: ${u.multipartIdentifier.quoted}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ object TableOutputResolver {
case _ =>
Cast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone))
}
val strLenChecked = CharVarcharUtils.stringLengthCheck(casted, tableAttr)
val exprWithStrLenCheck = CharVarcharUtils.stringLengthCheck(casted, tableAttr)
// Renaming is needed for handling the following cases like
// 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
// 2) Target tables have column metadata
Some(Alias(strLenChecked, tableAttr.name)(explicitMetadata = Some(tableAttr.metadata)))
Some(Alias(exprWithStrLenCheck, tableAttr.name)(explicitMetadata = Some(tableAttr.metadata)))
}

storeAssignmentPolicy match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,12 +473,9 @@ class SessionCatalog(
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
removeCharVarcharFromTableSchema(externalCatalog.getTable(db, table))
}

// We replace char/varchar with string type in the table schema, as Spark's type system doesn't
// support char/varchar yet.
private def removeCharVarcharFromTableSchema(t: CatalogTable): CatalogTable = {
val t = externalCatalog.getTable(db, table)
// We replace char/varchar with "annotated" string type in the table schema, as the query
// engine doesn't support char/varchar yet.
t.copy(schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(t.schema))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,26 @@ object CharVarcharUtils {
}.getOrElse(expr)
}

private def raiseError(expr: Expression, typeName: String, length: Int): Expression = {
val errorMsg = Concat(Seq(
Literal("input string '"),
expr,
Literal(s"' exceeds $typeName type length limitation: $length")))
Cast(RaiseError(errorMsg), StringType)
}

private def stringLengthCheck(expr: Expression, dt: DataType): Expression = dt match {
case CharType(length) =>
val trimmed = StringTrimRight(expr)
val errorMsg = Concat(Seq(
Literal("input string '"),
expr,
Literal(s"' exceeds char type length limitation: $length")))
// Trailing spaces do not count in the length check. We don't need to retain the trailing
// spaces, as we will pad char type columns/fields at read time.
If(
GreaterThan(Length(trimmed), Literal(length)),
Cast(RaiseError(errorMsg), StringType),
raiseError(expr, "char", length),
trimmed)

case VarcharType(length) =>
val trimmed = StringTrimRight(expr)
val errorMsg = Concat(Seq(
Literal("input string '"),
expr,
Literal(s"' exceeds varchar type length limitation: $length")))
// Trailing spaces do not count in the length check. We need to retain the trailing spaces
// (truncate to length N), as there is no read-time padding for varchar type.
// TODO: create a special TrimRight function that can trim to a certain length.
Expand All @@ -179,7 +179,7 @@ object CharVarcharUtils {
expr,
If(
GreaterThan(Length(trimmed), Literal(length)),
Cast(RaiseError(errorMsg), StringType),
raiseError(expr, "varchar", length),
StringRPad(trimmed, Literal(length))))

case StructType(fields) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ object DataSourceV2Relation {
identifier: Option[Identifier],
options: CaseInsensitiveStringMap): DataSourceV2Relation = {
// The v2 source may return schema containing char/varchar type. We replace char/varchar
// with string type here as Spark's type system doesn't support char/varchar yet.
// with "annotated" string type here as the query engine doesn't support char/varchar yet.
val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.schema)
DataSourceV2Relation(table, schema.toAttributes, catalog, identifier, options)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.unsafe.types.UTF8String

@Experimental
case class CharType(length: Int) extends AtomicType {
require(length >= 0, "The length if char type cannot be negative.")

private[sql] type InternalType = UTF8String
@transient private[sql] lazy val tag = typeTag[InternalType]
private[sql] val ordering = implicitly[Ordering[InternalType]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.unsafe.types.UTF8String

@Experimental
case class VarcharType(length: Int) extends AtomicType {
require(length >= 0, "The length if varchar type cannot be negative.")

private[sql] type InternalType = UTF8String
@transient private[sql] lazy val tag = typeTag[InternalType]
private[sql] val ordering = implicitly[Ordering[InternalType]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ case class LogicalRelation(
object LogicalRelation {
def apply(relation: BaseRelation, isStreaming: Boolean = false): LogicalRelation = {
// The v1 source may return schema containing char/varchar type. We replace char/varchar
// with string type here as Spark's type system doesn't support char/varchar yet.
// with "annotated" string type here as the query engine doesn't support char/varchar yet.
val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(relation.schema)
LogicalRelation(relation, schema.toAttributes, None, isStreaming)
}

def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation = {
// The v1 source may return schema containing char/varchar type. We replace char/varchar
// with string type here as Spark's type system doesn't support char/varchar yet.
// with "annotated" string type here as the query engine doesn't support char/varchar yet.
val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(relation.schema)
LogicalRelation(relation, schema.toAttributes, Some(table), false)
}
Expand Down

0 comments on commit 17d13e5

Please sign in to comment.