-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28730][SQL] Configurable type coercion policy for table insertion #25453
Conversation
bafe248
to
9e5ff08
Compare
) | ||
.stringConf | ||
.transform(_.toUpperCase(Locale.ROOT)) | ||
.checkValues(StoreAssignmentPolicy.values.map(_.toString)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the configuration is ENUM instead of Boolean type. We will have a new policy "ANSI" after #25239 is finished.
CC @cloud-fan @maropu @rdblue @HyukjinKwon @mccheah |
I believe this PR is the minimal effort to fix the Spark 3.0 blocker: DS v1 and v2 tables have inconsistent behaviors regarding table insertion. I think we all agree that we need to make table insertion behavior configurable, and IMO legacy mode should be the default instead of the strict mode, as it's the behavior in Spark 1.x and 2.x. Fow now I think the legacy mode is the most reasonable default, but this may change after we make more progress on new policies and fixing the "return-null" behavior. We can discuss to change the default at that time. |
Test build #109108 has finished for PR 25453 at commit
|
Test build #109109 has finished for PR 25453 at commit
|
9e5ff08
to
1590fbc
Compare
77a19a5
to
4716e92
Compare
Test build #109347 has finished for PR 25453 at commit
|
Test build #109348 has finished for PR 25453 at commit
|
"strict. With legacy policy, Spark allows casting any value to any data type. " + | ||
"The legacy policy is the only behavior in Spark 2.x and it is compatible with Hive. " + | ||
"With strict policy, Spark doesn't allow any possible precision loss or data truncation " + | ||
"in type coercion, e.g. `int` and `long`, `float` -> `double` are not allowed." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g. int
to long
, timestamp
to date
...
} else { | ||
// run the type check first to ensure type errors are present | ||
val canWrite = DataType.canWrite( | ||
queryExpr.dataType, tableAttr.dataType, byName, useStrictRules, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why DataType.canWrite
need to take the useStrictRules
parameter? In this branch useStrictRules
is true.
conf: SQLConf, | ||
addError: String => Unit): Option[NamedExpression] = { | ||
|
||
val useStrictRules = conf.storeAssignmentPolicy == StoreAssignmentPolicy.STRICT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useStrictRules
-> useStrictRule
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would perfer "rules"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we will add another rule in follow-up activities, it'd be better to use pattern-matching here instead of if
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what "rules" mean here? From the code we are applying the STRICT rule here. If you want to represent "non-legacy rules", I think it's better to write
val isLegacyMode = conf.storeAssignmentPolicy == StoreAssignmentPolicy.LEGACY
...
assertAnalysisError(parsedPlan, Seq( | ||
"Cannot write incompatible data to table", "'table-name'", | ||
"Cannot write nullable values to non-null column", "'x'", "'y'")) | ||
withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can set this config in beforeAll
and unset it in afterAll
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
Show resolved
Hide resolved
} else { | ||
// always add an UpCast. it will be removed in the optimizer if it is unnecessary. | ||
Some(Alias( | ||
UpCast(queryExpr, tableAttr.dataType), tableAttr.name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use Cast
here? The upcast logic is already checked in DataType.canWrite
. Then we can remove https://github.com/apache/spark/pull/25453/files#diff-7690f56bde3f7a3dd76fab9c136c1494R181
Test build #109342 has finished for PR 25453 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Show resolved
Hide resolved
@@ -2525,7 +2525,8 @@ class Analyzer( | |||
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { | |||
case append @ AppendData(table, query, isByName) | |||
if table.resolved && query.resolved && !append.outputResolved => | |||
val projection = resolveOutputColumns(table.name, table.output, query, isByName) | |||
val projection = | |||
TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code in line2528-2535, line2539-2546, and line2550-2557 is duplicate, so can we merge them into one rule by defining an extractor(unapply)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, ResolveOutputRelation
currently seems to be a thin wrapper for TableOutputResolver
. Do we need to separate TableOutputResolver
from ResolveOutputRelation
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code in line2528-2535, line2539-2546, and line2550-2557 is duplicate, so can we merge them into one rule by defining an extractor(unapply)?
Eventually we need to call append.copy(query = projection)
and overwrite.copy(query = projection)
, we need to match all the V2WriteCommand
anyway. I think it is fine to keep the current code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, ResolveOutputRelation currently seems to be a thin wrapper for TableOutputResolver. Do we need to separate TableOutputResolver from ResolveOutputRelation?
Otherwise, we can't access the method resolveOutputColumns
in PreprocessTableInsertion
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
Show resolved
Hide resolved
conf: SQLConf, | ||
addError: String => Unit): Option[NamedExpression] = { | ||
|
||
val useStrictRules = conf.storeAssignmentPolicy == StoreAssignmentPolicy.STRICT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we will add another rule in follow-up activities, it'd be better to use pattern-matching here instead of if
?
We still need |
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
Outdated
Show resolved
Hide resolved
Test build #109424 has finished for PR 25453 at commit
|
@gengliangwang, in the description, you said:
Why is that? We know that v2 already introduces breaking behavior changes and we can't avoid them. We were previously okay with different behavior between v1 and v2, so I see no reason to support the legacy type coercion in the v2 path. I do think it makes sense to support the SQL standard type coercion along with strict type coercion in v2, though. |
@@ -2525,7 +2525,8 @@ class Analyzer( | |||
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might need to update the description for ResolveOutputRelation
and refine "what's a safe cast?".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, we might need to move some parts of this description to TableOutputResolver
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed the word "safe" in the comment. I think the comment "Detect plans that are not compatible with the output table and throw AnalysisException" already states that there is type coercion check in the rule.
} | ||
|
||
case other => | ||
throw new AnalysisException(s"Unsupported store assignment policy: $other") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this? It seems we already have checked if the mode is valid? https://github.com/apache/spark/pull/25453/files#diff-9a6b543db706f1a90f790783d6930a13R1661
outputField | ||
|
||
case StoreAssignmentPolicy.STRICT => | ||
// run the type check first to ensure type errors are present |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (queryExpr.nullable && !tableAttr.nullable) {
addError(s"Cannot write nullable values to non-null column '${tableAttr.name}'")
None
} else {
// run the type check first to ensure type errors are present
val canWrite = DataType.canWrite(
queryExpr.dataType, tableAttr.dataType, byName, conf.resolver, tableAttr.name, addError)
if (canWrite) {
outputField
} else {
None
}
}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, we don't need the check queryExpr.nullable && !tableAttr.nullable
in the other modes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is on purpose in the original code. Running DataType.canWrite
can expose more errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, we don't need the check queryExpr.nullable && !tableAttr.nullable in the other modes?
IIRC there is no such check in Spark 2.x
.copy(SQLConf.STORE_ASSIGNMENT_POLICY -> StoreAssignmentPolicy.STRICT) | ||
val catalog = new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf) | ||
catalog.createDatabase( | ||
CatalogDatabase("default", "", new URI("loc"), Map.empty), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about creating a temporary dir for loc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually copied from AnalysisTest
. I think it should be fine.
read/write are very basic functionalities and ideally we should make them consistent. I know that some commands like Furthermore, we need to make it configurable, and it will be awkward to have 2 store assignment configs for v1 and v2 tables. To be honest I don't like the legacy mode. But I don't think it's possible to make strict mode the default. Breaking change has degrees, and to me strict mode is too breaking. How about this: we add a hack in this PR, so that legacy mode is the default for v1 table, and strict mode is the default for v2 table. The hack can be removed if we decide to make the ansi sql mode the default. Then we can still have a single config to configure store assignment policy. |
As per the discussion in dev list, I think most of us agree that we should make the table insertion behavior configurable. So I assume that you are asking to have two table insertion flags for V1 and V2 data sources. I have asked for your opinions one week ago in comment #25453 (comment) before I start the actual code changes. I hope we can move forward on this PR. The ANSI mode will be added right after this. Making the default policy as the legacy one is safest for now. We can discuss the default policy after ANSI mode is added. |
Test build #109470 has finished for PR 25453 at commit
|
Test build #109466 has finished for PR 25453 at commit
|
retest this please. |
Test build #109478 has finished for PR 25453 at commit
|
I don't think that v2 should allow the legacy mode. ANSI mode and strict modes make sense, but carrying the legacy mode forward just to avoid an extra config property doesn't seem worth it. This mode is user-facing, while the configuration properties are primarily administrator settings. Reducing the administrator settings by one doesn't seem worth corrupting user data in the v2 code path. And, the v1 code path will eventually be removed along with a v1 setting for this mode. If v2 doesn't support legacy mode, then we can remove it as well.
I think it is fine to make ANSI mode the default -- assuming that's what the community votes for. But I don't think that carrying legacy mode forward is not the right way to avoid making strict mode the default. If legacy mode is available then we will always need to support it even when v1 is gone. If it is the default, then we can't actually make progress in this area by combining this with breaking changes for v2. |
If we want to have just one property, I think what we can do is use different defaults for v1 and v2, but use the same configuration property. That property should only support configuring ANSI and strict modes. That way, setting the property sets the same mode for v1 and v2, but doesn't allow v2 to use the unsafe legacy mode. v1 would continue to default to the legacy mode (unless we want to replace that with ANSI) and v2 would default to whatever is decided by the vote on the dev list. |
This is what I see in that comment:
That's not the same thing as making a setting that allows legacy mode with v2. I'm fine adding a way for v1 to use strict mode, although I doubt that's what we will want to ship in Spark 3.0 as a default. |
Totally agree. I prefer using the ANSI mode as default as well. Disallowing the legacy mode in V2 also makes sense. I am glad that we come to agreement on this :) |
Any thoughts on whether we can get rid of legacy mode for v1 if we implement ANSI mode? Maybe we could use the 3.0 release to remove legacy mode. |
+1; the approach looks pretty reasonable to me, too. |
The general policy is to keep the legacy config for at least one release, if a behavior change is made. I think we can remove it in 3.1. I agree with forbidding legacy mode in v2. For now we can make v1 and v2 have different default policies. Once we make ANSI policy the default, then v1 and v2 can still have the same default policy. |
Test build #109564 has finished for PR 25453 at commit
|
Test build #109566 has finished for PR 25453 at commit
|
From this and other comments, it sounds like there is an assumption that ANSI mode will be the default. Just want to remind everyone that I think that decision requires a vote on the dev list. |
Yes we need a vote. #25239 should only add the ansi sql policy without changing the default. A separated PR is needed to change the default if the vote passes. |
thanks, merging to master! |
What changes were proposed in this pull request?
After all the discussions in the dev list: http://apache-spark-developers-list.1001551.n3.nabble.com/Discuss-Follow-ANSI-SQL-on-table-insertion-td27531.html#a27562.
Here I propose that we can make the store assignment rules in the analyzer configurable, and the behavior of V1 and V2 should be consistent.
When inserting a value into a column with a different data type, Spark will perform type coercion. After this PR, we support 2 policies for the type coercion rules:
legacy and strict.
int
andlong
,float
->double
are not allowed.Eventually, the "legacy" mode will be removed, so it is disallowed in data source V2.
To ensure backward compatibility with existing queries, the default store assignment policy for data source V1 is "legacy".
How was this patch tested?
Unit test