-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-33480][SQL] Support char/varchar type #30412
Conversation
v2Write.withNewQuery(projection) | ||
val cleanedTable = v2Write.table match { | ||
case r: DataSourceV2Relation => | ||
r.copy(output = r.output.map(CharVarcharUtils.cleanAttrMetadata)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We remove the char/varchar metadata after length check expressions are added, so that we don't do it repeatedly and this rule is idempotent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the current implementation assume the analyzer removes the metadata in plans before the optimizer phase? If so, how about checking if plans don't have the metadata in CheckAnalysis
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it doesn't. Metadata is fine as it's harmless. We only need to watch out for some specific rules that look at the char/varchar metadata, and make sure they are idempotent.
As a fact, the added cast and length check expression is wrapped by an Alias
which retains char/varchar metadata. So the output attributes of Project
above the v2 relation still have metadata. It's necessary as we need to rely on it later to do padding for char type column comparison.
if (projectList == r.output) { | ||
r -> Nil | ||
} else { | ||
val cleanedOutput = r.output.map(CharVarcharUtils.cleanAttrMetadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, we remove the char/varchar metadata to make this rule idempotent.
@@ -1000,18 +995,10 @@ private[hive] object HiveClientImpl extends Logging { | |||
/** Builds the native StructField from Hive's FieldSchema. */ | |||
def fromHiveColumn(hc: FieldSchema): StructField = { | |||
val columnType = getSparkSQLDataType(hc) | |||
val replacedVoidType = HiveVoidType.replaceVoidType(columnType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comes from 339eec5#diff-45c9b065d76b237bcfecda83b8ee08c1ff6592d6f85acca09c0fa01472e056afR1009 but is actually useless. The test still pass.
import org.apache.spark.unsafe.types.UTF8String | ||
|
||
@Experimental | ||
case class CharType(length: Int) extends AtomicType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add SPARK-6412 (Add Char support in dataTypes)
into the PR title as a secondary JIRA?
- SPARK-6412 was resolved as
Later
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR description says To be safe, this PR doesn't add char/varchar type to the core type system.
, but this adds CharType
into org.apache.spark.sql.types
. Do you revise a little the PR description? It seems that you wanted to mention the SQL execution parts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's already there: https://github.com/apache/spark/pull/30412/files#diff-aacc5ed42589c636615a3c09a44fa6a5248195242c9fbe0e996db17471cd35fdL70
I just moved it to a new file and remove the parent class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And type system means a lot more, e.g. expression type check, InternalRow.get, etc. Maybe I should say "type system framework"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR description updated
val output = table.schema().toAttributes | ||
DataSourceV2Relation(table, output, catalog, identifier, options) | ||
// The v2 source may return schema containing char/varchar type. We replace char/varchar | ||
// with string type here as Spark's type system doesn't support char/varchar yet. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with string type
-> with string type with metadata
?
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #131291 has finished for PR 30412 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
Show resolved
Hide resolved
import org.apache.spark.unsafe.types.UTF8String | ||
|
||
@Experimental | ||
case class VarcharType(length: Int) extends AtomicType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about checking if length
has a valid number in the constructor of char/vchar?
if (length > 0) {
throw new AnalysisException("XXX")
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, how about setting a reasonable max length for these types like postgresql does so?
postgres=# create table t (v char(100000000));
ERROR: length for type char cannot exceed 10485760
LINE 1: create table t (v char(100000000));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VarcharType was already there: https://github.com/apache/spark/pull/30412/files#diff-aacc5ed42589c636615a3c09a44fa6a5248195242c9fbe0e996db17471cd35fdL79
I think it's a good idea to check the negative value, but adding max length should be done separately.
v2Write.withNewQuery(projection) | ||
val cleanedTable = v2Write.table match { | ||
case r: DataSourceV2Relation => | ||
r.copy(output = r.output.map(CharVarcharUtils.cleanAttrMetadata)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the current implementation assume the analyzer removes the metadata in plans before the optimizer phase? If so, how about checking if plans don't have the metadata in CheckAnalysis
?
} | ||
val strLenChecked = CharVarcharUtils.stringLengthCheck(casted, tableAttr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid accidentally adding the length check exprs again, we cannot remove the metadata at the same time as adding the exprs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: strLenChecked
-> exprWithStrLenCheck
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid accidentally adding the length check exprs again, we cannot remove the metadata at the same time as adding the exprs?
We can't access the table relation here, only table output attrs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't access the table relation here, only table output attrs.
Oh, I see.
private def stringLengthCheck(expr: Expression, dt: DataType): Expression = dt match { | ||
case CharType(length) => | ||
val trimmed = StringTrimRight(expr) | ||
val errorMsg = Concat(Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: How about extract errorMsg
as a private method?
private def raiseError(expr: Expression, typeName: String, length: Int): Expression = {
val errorMsg = Concat(Seq(
Literal("input string '"),
expr,
Literal(s"' exceeds $typeName type length limitation: $length")))
Cast(RaiseError(errorMsg), StringType)
}
private def stringLengthCheck(expr: Expression, dt: DataType): Expression = dt match {
case CharType(length) =>
val trimmed = StringTrimRight(expr)
// Trailing spaces do not count in the length check. We don't need to retain the trailing
// spaces, as we will pad char type columns/fields at read time.
If(
GreaterThan(Length(trimmed), Literal(length)),
raiseError(expr, "char", length),
trimmed)
...
|
||
// We replace char/varchar with string type in the table schema, as Spark's type system doesn't | ||
// support char/varchar yet. | ||
private def removeCharVarcharFromTableSchema(t: CatalogTable): CatalogTable = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you inline this in L476?
Test build #131356 has finished for PR 30412 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan You should rebase on the master to see the errors #30412 (comment) locally. The check came from #30351
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
Test build #131365 has finished for PR 30412 at commit
|
Kubernetes integration test status success |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have a plan to update the doc: https://github.com/apache/spark/blob/master/docs/sql-ref-datatypes.md ? It would be nice to describe something about char/varchar there, I think.
* Re-construct the original StructType from the type strings in the metadata of StructFields. | ||
* This is needed when dealing with char/varchar columns/fields. | ||
*/ | ||
def getRawSchema(schema: StructType): StructType = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
} | ||
} | ||
|
||
test("char type comparison: nested in array of array") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add char comparison tests for a map case, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map type is not comparable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. If so, why the function typeWithWiderCharLength
has an entry for Map
? https://github.com/apache/spark/pull/30412/files#diff-16753285e80505e04c445ea8ccee1dde7ae601ed7bae224e212d90d395f57928R225-R226
It seems the function is only used for addPaddingInStringComparison
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, I should remove it.
withTable("t") { | ||
sql(s"CREATE TABLE t(i STRING, c CHAR(5)) USING $format") | ||
sql("INSERT INTO t VALUES ('1', 'a')") | ||
checkAnswer(spark.table("t"), Row("1", "a" + " " * 4)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about checking an output schema, too, in these tests?
scala> sql("CREATE TABLE t(i STRING, c CHAR(5)) USING parquet PARTITIONED BY (c)")
scala> spark.table("t").printSchema
root
|-- i: string (nullable = true)
|-- c: string (nullable = true) <---- this check
btw, how do users check a char length after defining a table? In pg, users can check a char length via some commands, e.g., \d
;
postgres=# create table t (c char(5));
CREATE TABLE
postgres=# \d t
Table "public.t"
Column | Type | Collation | Nullable | Default
--------+--------------+-----------+----------+---------
c | character(5) | | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We ensure that LocalPlan.output
won't have char/varchar type and we have UT to verify it. It seems not necessary to check it again here.
For DDL command support, can we leave it to followup? This PR is already very big. I can create a ticket for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The followup looks fine.
@@ -94,6 +94,10 @@ trait CheckAnalysis extends PredicateHelper { | |||
|
|||
case p if p.analyzed => // Skip already analyzed sub-plans | |||
|
|||
case p if p.output.map(_.dataType).exists(CharVarcharUtils.hasCharVarchar) => | |||
throw new IllegalStateException( | |||
"[BUG] logical plan should not have output of char/varchar type: " + p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case below, could we use AnalysisException
instead?
scala> sql("""SELECT from_json("{'a': 'aaa'}", "a char(3)")""").printSchema()
java.lang.IllegalStateException: [BUG] logical plan should not have output of char/varchar type: Project [from_json(StructField(a,CharType(3),true), {'a': 'aaa'}, Some(Asia/Tokyo)) AS from_json({'a': 'aaa'})#37]
+- OneRowRelation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I choose IllegalStateException
because this should not happen. Thanks for catching this corner case and I've fixed it in DataType.fromDDL
.
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #131786 has finished for PR 30412 at commit
|
Test build #131803 has finished for PR 30412 at commit
|
Test build #131797 has finished for PR 30412 at commit
|
Test build #131798 has finished for PR 30412 at commit
|
docs/sql-ref-datatypes.md
Outdated
@@ -37,6 +37,8 @@ Spark SQL and DataFrames support the following data types: | |||
- `DecimalType`: Represents arbitrary-precision signed decimal numbers. Backed internally by `java.math.BigDecimal`. A `BigDecimal` consists of an arbitrary precision integer unscaled value and a 32-bit integer scale. | |||
* String type | |||
- `StringType`: Represents character string values. | |||
- `VarcharType(length)`: A variant of `StringType` which has a length limitation. Data writing will fail if the input string exceeds the length limitation. Note: this type can only be used in table schema, not functions/operators. | |||
- `CharType(length)`: A variant of `VarcharType(length)` which is fixed length. Reading column of type `VarcharType(n)` always returns string values of length `n`. Char type column comparison will pad the short one to the longer length. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading column of type VarcharType(n)
-> Reading column of type CharType(n)
?
} | ||
|
||
/** | ||
* Returns an expression to apply write-side char type padding for the given expression. A string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
char type padding -> char type checking?
|
||
@Experimental | ||
case class VarcharType(length: Int) extends AtomicType { | ||
require(length >= 0, "The length if varchar type cannot be negative.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if -> of
|
||
@Experimental | ||
case class CharType(length: Int) extends AtomicType { | ||
require(length >= 0, "The length if char type cannot be negative.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if -> of
def cast(to: DataType): Column = withExpr { | ||
Cast(expr, CharVarcharUtils.replaceCharVarcharWithString(to)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we can do cast(CharType)
? It actually casts to StringType? But don't we loss length info?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you do col.cast("char(5)")
before this PR, Spark already silently treats it as cast to string type. I don't want to change this behavior here. We can make it better later, by failing explicitly and saying that cast to char type is not supported.
Test build #131827 has finished for PR 30412 at commit
|
okay, it looks good to me. Btw, this new feature will land in the v3.1 release? There are still some remaining works (e.g., #30412 (comment) and #30412 (comment)) though. cc: @HyukjinKwon because of a v3.1 release manager |
Not that, before this PR, people can already create varchar type column but it has no length check during write. People can already create char type column with hive table but it truncates input string during write. This PR doesn't make things worse. |
Test build #131866 has finished for PR 30412 at commit
|
Test build #131881 has finished for PR 30412 at commit
|
retest this please |
Test build #131876 has finished for PR 30412 at commit
|
Test build #131884 has finished for PR 30412 at commit
|
thanks for the review, merging to master! |
Thank you, @cloud-fan and all! |
…rovider for CREATE TABLE command ### What changes were proposed in this pull request? For CRETE TABLE [AS SELECT] command, creates native Parquet table if neither USING nor STORE AS is specified and `spark.sql.legacy.createHiveTableByDefault` is false. This is a retry after we unify the CREATE TABLE syntax. It partially reverts d2bec5e This PR allows `CREATE EXTERNAL TABLE` when `LOCATION` is present. This was not allowed for data source tables before, which is an unnecessary behavior different with hive tables. ### Why are the changes needed? Changing from Hive text table to native Parquet table has many benefits: 1. be consistent with `DataFrameWriter.saveAsTable`. 2. better performance 3. better support for nested types (Hive text table doesn't work well with nested types, e.g. `insert into t values struct(null)` actually inserts a null value not `struct(null)` if `t` is a Hive text table, which leads to wrong result) 4. better interoperability as Parquet is a more popular open file format. ### Does this PR introduce _any_ user-facing change? No by default. If the config is set, the behavior change is described below: Behavior-wise, the change is very small as the native Parquet table is also Hive-compatible. All the Spark DDL commands that works for hive tables also works for native Parquet tables, with two exceptions: `ALTER TABLE SET [SERDE | SERDEPROPERTIES]` and `LOAD DATA`. char/varchar behavior has been taken care by #30412, and there is no behavior difference between data source and hive tables. One potential issue is `CREATE TABLE ... LOCATION ...` while users want to directly access the files later. It's more like a corner case and the legacy config should be good enough. Another potential issue is users may use Spark to create the table and then use Hive to add partitions with different serde. This is not allowed for Spark native tables. ### How was this patch tested? Re-enable the tests Closes #30554 from cloud-fan/create-table. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…rovider for CREATE TABLE command ### What changes were proposed in this pull request? For CRETE TABLE [AS SELECT] command, creates native Parquet table if neither USING nor STORE AS is specified and `spark.sql.legacy.createHiveTableByDefault` is false. This is a retry after we unify the CREATE TABLE syntax. It partially reverts apache/spark@d2bec5e This PR allows `CREATE EXTERNAL TABLE` when `LOCATION` is present. This was not allowed for data source tables before, which is an unnecessary behavior different with hive tables. ### Why are the changes needed? Changing from Hive text table to native Parquet table has many benefits: 1. be consistent with `DataFrameWriter.saveAsTable`. 2. better performance 3. better support for nested types (Hive text table doesn't work well with nested types, e.g. `insert into t values struct(null)` actually inserts a null value not `struct(null)` if `t` is a Hive text table, which leads to wrong result) 4. better interoperability as Parquet is a more popular open file format. ### Does this PR introduce _any_ user-facing change? No by default. If the config is set, the behavior change is described below: Behavior-wise, the change is very small as the native Parquet table is also Hive-compatible. All the Spark DDL commands that works for hive tables also works for native Parquet tables, with two exceptions: `ALTER TABLE SET [SERDE | SERDEPROPERTIES]` and `LOAD DATA`. char/varchar behavior has been taken care by apache/spark#30412, and there is no behavior difference between data source and hive tables. One potential issue is `CREATE TABLE ... LOCATION ...` while users want to directly access the files later. It's more like a corner case and the legacy config should be good enough. Another potential issue is users may use Spark to create the table and then use Hive to add partitions with different serde. This is not allowed for Spark native tables. ### How was this patch tested? Re-enable the tests Closes #30554 from cloud-fan/create-table. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? This is a followup of #30412. This PR updates the error message of char/varchar table insertion length check, to not expose user data. ### Why are the changes needed? This is risky to expose user data in the error message, especially the string data, as it may contain sensitive data. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated tests Closes #30653 from cloud-fan/minor2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? This is a followup of #30412. This PR updates the error message of char/varchar table insertion length check, to not expose user data. ### Why are the changes needed? This is risky to expose user data in the error message, especially the string data, as it may contain sensitive data. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated tests Closes #30653 from cloud-fan/minor2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit c0874ba) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
This PR adds the char/varchar type which is kind of a variant of string type:
To implement the char/varchar semantic, this PR:
To simplify the implementation, this PR doesn't propagate char/varchar type info through functions/operators(e.g.
substring
). That said, a column can only be char/varchar type if it's a table column, not a derived column likeSELECT substring(col)
.To be safe, this PR doesn't add char/varchar type to the query engine(expression input check, internal row framework, codegen framework, etc.). We will replace char/varchar type by string type with metadata (
Attribute.metadata
orStructField.metadata
) that includes the original type string before it goes into the query engine. That said, the existing code will not see char/varchar type but only string type.char/varchar type may come from several places:
spark.read.schema
andspark.readStream.schema
Column.cast
from_json
, pandas UDF, etc. These places use SQL parser which replaces char/varchar with string already, even before this PR.This PR covers all the above cases, implements the length check and padding feature by looking at string type with special metadata.
Why are the changes needed?
char and varchar are standard SQL types. varchar is widely used in other databases instead of string type.
Does this PR introduce any user-facing change?
For hive tables: now the table insertion fails if the value exceeds char/varchar length. Previously we truncate the value silently.
For other tables:
How was this patch tested?
new tests