New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30590][SQL] Untyped select API cannot take typed column expression that needs input type #27499
Conversation
Test build #118060 has finished for PR 27499 at commit
|
retest this please |
Test build #118062 has finished for PR 27499 at commit
|
Test build #118077 has finished for PR 27499 at commit
|
@@ -352,8 +352,7 @@ object functions { | |||
* @group agg_funcs | |||
* @since 1.3.0 | |||
*/ | |||
def count(columnName: String): TypedColumn[Any, Long] = | |||
count(Column(columnName)).as(ExpressionEncoder[Long]()) | |||
def count(columnName: String): Column = count(Column(columnName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to me it is wrongly being a TypedColumn
. Count
is a DeclarativeAggregate
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a breaking change, right?
At least https://github.com/apache/spark/pull/27499/files#diff-2c67e6ae3d5115b5521681f6ef871b1dR43 is broken.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a right change but let's revert this line considering it's code freeze period ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. :)
project/MimaExcludes.scala
Outdated
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStartedEvent.this"), | ||
|
||
// [SPARK-30590][SQL] Untyped select API cannot take typed column expression | ||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.functions.count") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put it under 3.0 exclude rules temporarily. The version number in the master branch is still 3.0.0.
Test build #118080 has finished for PR 27499 at commit
|
Test build #118189 has finished for PR 27499 at commit
|
retest this please |
Test build #118196 has finished for PR 27499 at commit
|
retest this please |
Test build #118197 has finished for PR 27499 at commit
|
retest this please. |
Test build #118209 has finished for PR 27499 at commit
|
sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
Show resolved
Hide resolved
Seems fine except #27499 (comment). Might need to update title and PR description too. |
Test build #118777 has finished for PR 27499 at commit
|
Updated the description. I think the title is still ok? |
Test build #118800 has finished for PR 27499 at commit
|
Test build #118813 has finished for PR 27499 at commit
|
also cc @dongjoon-hyun |
} | ||
if (isSimpleEncoder) { | ||
// This typed column produces simple type output that can be fit into untyped `DataFrame`. | ||
typedCol.withInputType(exprEnc, logicalPlan.output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously we didn't call withInputType
for count
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
count
has no TypedAggregateExpression
. withInputType
only works on TypedAggregateExpression
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, df.select(count("*"))
works without calling withInputType
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, for TypedColumn
that doesn't contain TypedAggregateExpression
, withInputType
is no-op, so you don't need to call withInputType
for df.select(count("*"))
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So here we are supporting more cases than before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I get your point now. Yea, we should not allow more cases than before.
Test build #118897 has finished for PR 27499 at commit
|
retest this please. |
*/ | ||
private[sql] def needInputType: Boolean = { | ||
expr.find { | ||
case ta: TypedAggregateExpression if ta.inputDeserializer.isEmpty => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: case ta: TypedAggregateExpression => ta.inputDeserializer.isEmpty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
LGTM, let's highlight that it only refines the error message in the |
|
||
// Passes typed columns to untyped `Dataset.select` API. | ||
val err = intercept[AnalysisException] { | ||
df.select(fooAgg(1), fooAgg(2), fooAgg(3), fooAgg(4), fooAgg(5), fooAgg(6)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR, just a note:
We have 5 overloads of typed select
, and typed count
is supported in both typed and untyped select
. That said, if we add a 6th overload of typed select
, it can break queries that call the untyped select
with 6 typed count
s.
I'm not sure what's the best way to move forward. Maybe we should add new methods typedSelect
to disambiguate the untyped version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, to be clear, if we add a 6th overload of typed select
, a call to the untyped select
with 6 typed count
could return Dataset[(Long, Long, ...)]
instead of DataFrame
.
I think you meant something like existing selectUntyped
? Although its naming is confusing.
Test build #118909 has finished for PR 27499 at commit
|
Test build #118931 has finished for PR 27499 at commit
|
retest this please |
Test build #118944 has finished for PR 27499 at commit
|
Project(cols.map(_.named), logicalPlan) | ||
val untypedCols = cols.map { | ||
case typedCol: TypedColumn[_, _] => | ||
if (!typedCol.needInputType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just noticed: why don't we inline this method? Then we can centralize the changes here. The methods in TypedColumn
can still be accessed by java users who ignore "private[spark]", so better to avoid adding if we can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh ok. I previously don't want to make select
looks complicated. Inlined it now.
Test build #118984 has finished for PR 27499 at commit
|
thanks, merging to master/3.0! |
Thanks! I will open a JIRA for discussion of typed select API. |
+1 LGTM too |
…sion that needs input type ### What changes were proposed in this pull request? This patch proposes to throw clear analysis exception if untyped `Dataset.select` takes typed column expression that needs input type. ### Why are the changes needed? `Dataset` provides few typed `select` helper functions to select typed column expressions. The maximum number of typed columns supported is 5. If wanting to select more than 5 typed columns, it silently calls untyped `Dataset.select` and can causes weird unresolved error, like: ``` org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS value#124, assertnotnull(cast(value#124 as int)), input[0, int, false] AS value#123, IntegerType, IntegerType, false) AS foo_agg_3#126, fooagg(FooAgg(4), None, None, None, input[0, int, false] AS value#129, assertnotnull(cast(value#129 as int)), input[0, int, false] AS value#128, IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), None, None, None, input[0, int, false] AS value#134, assertnotnull(cast(value#134 as int)), input[0, int, false] AS value#133, IntegerType, IntegerType, false) AS foo_agg_5#136, fooagg(FooAgg(6), None, None, None, input[0, int, false] AS value#139, assertnotnull(cast(value#139 as int)), input[0, int, false] AS value#138, IntegerType, IntegerType, false) AS foo_agg_6#141];; 'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS value#124, assertnotnull(cast(value#124 as int)), input[0, int, false] AS value#123, IntegerType, IntegerType, false) AS foo_agg_3#126, fooagg(FooAgg(4), None, None, None, input[0, int, false] AS value#129, assertnotnull(cast(value#129 as int)), input[0, int, false] AS value#128, IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), None, None, None, input[0, int, false] AS value#134, assertnotnull(cast(value#134 as int)), input[0, int, false] AS value#133, IntegerType, IntegerType, false) AS foo_agg_5#136, fooagg(FooAgg(6), None, None, None, input[0, int, false] AS value#139, assertnotnull(cast(value#139 as int)), input[0, int, false] AS value#138, IntegerType, IntegerType, false) AS foo_agg_6#141] +- Project [_1#6 AS a#13, _2#7 AS b#14, _3#8 AS c#15, _4#9 AS d#16, _5#10 AS e#17, _6#11 AS F#18] +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:431) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:430) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:430) ``` However, to fully disallow typed columns as input to untyped `select` API will break current usage like `count` that is a `TypedColumn` in `functions`. In order to keep compatibility, we should allow current usage of certain `TypedColumn`s as input to untyped `select` API. For the `TypedColumn`s that will cause unresolved exception, we should explicitly let users know that they are incorrectly calling untyped `select` with typed columns which need input type. ### Does this PR introduce any user-facing change? Yes, but this PR only refines the error message. When users call `Dataset.select` API with typed column that needs input type, an analysis exception will be thrown. Previously an unresolved error will be thrown. ### How was this patch tested? Unit tests. Closes #27499 from viirya/SPARK-30590. Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 160c144) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Created SPARK-30983 for discussion of typed select API. |
…sion that needs input type ### What changes were proposed in this pull request? This patch proposes to throw clear analysis exception if untyped `Dataset.select` takes typed column expression that needs input type. ### Why are the changes needed? `Dataset` provides few typed `select` helper functions to select typed column expressions. The maximum number of typed columns supported is 5. If wanting to select more than 5 typed columns, it silently calls untyped `Dataset.select` and can causes weird unresolved error, like: ``` org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS value#124, assertnotnull(cast(value#124 as int)), input[0, int, false] AS value#123, IntegerType, IntegerType, false) AS foo_agg_3#126, fooagg(FooAgg(4), None, None, None, input[0, int, false] AS value#129, assertnotnull(cast(value#129 as int)), input[0, int, false] AS value#128, IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), None, None, None, input[0, int, false] AS value#134, assertnotnull(cast(value#134 as int)), input[0, int, false] AS value#133, IntegerType, IntegerType, false) AS foo_agg_5#136, fooagg(FooAgg(6), None, None, None, input[0, int, false] AS value#139, assertnotnull(cast(value#139 as int)), input[0, int, false] AS value#138, IntegerType, IntegerType, false) AS foo_agg_6#141];; 'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS value#124, assertnotnull(cast(value#124 as int)), input[0, int, false] AS value#123, IntegerType, IntegerType, false) AS foo_agg_3#126, fooagg(FooAgg(4), None, None, None, input[0, int, false] AS value#129, assertnotnull(cast(value#129 as int)), input[0, int, false] AS value#128, IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), None, None, None, input[0, int, false] AS value#134, assertnotnull(cast(value#134 as int)), input[0, int, false] AS value#133, IntegerType, IntegerType, false) AS foo_agg_5#136, fooagg(FooAgg(6), None, None, None, input[0, int, false] AS value#139, assertnotnull(cast(value#139 as int)), input[0, int, false] AS value#138, IntegerType, IntegerType, false) AS foo_agg_6#141] +- Project [_1#6 AS a#13, _2#7 AS b#14, _3#8 AS c#15, _4#9 AS d#16, _5#10 AS e#17, _6#11 AS F#18] +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:431) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:430) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:430) ``` However, to fully disallow typed columns as input to untyped `select` API will break current usage like `count` that is a `TypedColumn` in `functions`. In order to keep compatibility, we should allow current usage of certain `TypedColumn`s as input to untyped `select` API. For the `TypedColumn`s that will cause unresolved exception, we should explicitly let users know that they are incorrectly calling untyped `select` with typed columns which need input type. ### Does this PR introduce any user-facing change? Yes, but this PR only refines the error message. When users call `Dataset.select` API with typed column that needs input type, an analysis exception will be thrown. Previously an unresolved error will be thrown. ### How was this patch tested? Unit tests. Closes apache#27499 from viirya/SPARK-30590. Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This patch proposes to throw clear analysis exception if untyped
Dataset.select
takes typed column expression that needs input type.Why are the changes needed?
Dataset
provides few typedselect
helper functions to select typed column expressions. The maximum number of typed columns supported is 5. If wanting to select more than 5 typed columns, it silently calls untypedDataset.select
and can causes weird unresolved error, like:However, to fully disallow typed columns as input to untyped
select
API will break current usage likecount
that is aTypedColumn
infunctions
. In order to keep compatibility, we should allow current usage of certainTypedColumn
s as input to untypedselect
API. For theTypedColumn
s that will cause unresolved exception, we should explicitly let users know that they are incorrectly calling untypedselect
with typed columns which need input type.Does this PR introduce any user-facing change?
Yes, but this PR only refines the error message.
When users call
Dataset.select
API with typed column that needs input type, an analysis exception will be thrown. Previously an unresolved error will be thrown.How was this patch tested?
Unit tests.