Skip to content

Comments

[SPARK-47685][SQL] Restore the support for Stream type in Dataset#groupBy#45811

Closed
LuciferYang wants to merge 2 commits intoapache:masterfrom
LuciferYang:SPARK-47685
Closed

[SPARK-47685][SQL] Restore the support for Stream type in Dataset#groupBy#45811
LuciferYang wants to merge 2 commits intoapache:masterfrom
LuciferYang:SPARK-47685

Conversation

@LuciferYang
Copy link
Contributor

What changes were proposed in this pull request?

When I reviewed the changes in SPARK-45685, I found an old user case that is no longer supported:

Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): _*).sum("id")
[info] - SPARK-38221: group by `Stream` of complex expressions should not fail *** FAILED *** (51 milliseconds)
[info]   org.apache.spark.SparkException: Task not serializable
[info]   at org.apache.spark.util.SparkClosureCleaner$.clean(SparkClosureCleaner.scala:45)
[info]   at org.apache.spark.SparkContext.clean(SparkContext.scala:2718)
[info]   at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:908)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[info]   at org.apache.spark.rdd.RDD.withScope(RDD.scala:411)
[info]   at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:907)
[info]   at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:762)
...

Since this is a historical user usage, and although the Stream type has been deprecated after Scala 2.13.0, it has not been removed, so this PR restores the support for Stream type in Dataset#groupBy.

Why are the changes needed?

Restore the support for Stream type in Dataset#groupBy

Does this PR introduce any user-facing change?

No

How was this patch tested?

  • Pass GitHub Actions
  • Restored the test case for dataset group by Stream.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Apr 2, 2024
import RelationalGroupedDataset._

private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
@scala.annotation.nowarn("cat=deprecation")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to suppress the use of Stream

}

test("SPARK-38221: group by `Stream` of complex expressions should not fail") {
@scala.annotation.nowarn("cat=deprecation")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to suppress the use of Stream

}

test("SPARK-38221: group by stream of complex expressions should not fail") {
test("SPARK-45685: group by `LazyList` of complex expressions should not fail") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case is essentially added in SPARK-45685, the test case has been renamed to make its description clearer

@LuciferYang
Copy link
Contributor Author

cc @dongjoon-hyun

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (Pending CIs).

Thank you for finding and fixing the regression, @LuciferYang .

@LuciferYang
Copy link
Contributor Author

Merged into master for Spark 4.0. Thanks @dongjoon-hyun @HyukjinKwon @zhengruifeng

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants