Skip to content

[SPARK-50661][CONNECT][SS] Fix Spark Connect Scala foreachBatch impl. to support Dataset[T].#49323

Closed
haiyangsun-db wants to merge 4 commits intoapache:masterfrom
haiyangsun-db:SPARK-50661
Closed

[SPARK-50661][CONNECT][SS] Fix Spark Connect Scala foreachBatch impl. to support Dataset[T].#49323
haiyangsun-db wants to merge 4 commits intoapache:masterfrom
haiyangsun-db:SPARK-50661

Conversation

@haiyangsun-db
Copy link
Copy Markdown
Contributor

@haiyangsun-db haiyangsun-db commented Dec 27, 2024

What changes were proposed in this pull request?

This PR fixes incorrect implementation of Scala Streaming foreachBatch when the input dataset is not a DataFrame (but a Dataset[T]) in spark connect mode.

Note that this only affects Scala.

In DataStreamWriter:

  • serialize foreachBatch function together with the dataset's encoder.
  • reuse ForeachWriterPacket for foreachBatch as both are sink operations and only require a function/writer object and the encoder of the input. Optionally, we could rename ForeachWriterPacket to something more general for both cases.

In SparkConnectPlanner / StreamingForeachBatchHelper

  • Use the encoder passed from the client to recover the Dataset[T] object to properly call the foreachBatch function.

Why are the changes needed?

Without the fix, Scala foreachBatch will fail or give wrong results when the input dataset is not a DataFrame.

Below is a simple reproduction:

import org.apache.spark.sql._
spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/test")

val q = spark.readStream.format("parquet").schema("id LONG").load("/tmp/test").as[java.lang.Long].writeStream.foreachBatch((ds: Dataset[java.lang.Long], batchId: Long) => println(ds.collect().map(_.asInstanceOf[Long]).sum)).start()

Thread.sleep(1000)
q.stop()

The code above should output 45 in the foreachBatch function. Without the fix, the code will fail because the foreachBatch function will be called with a DataFrame object instead of Dataset[java.lang.Long].

Does this PR introduce any user-facing change?

Yes, this PR includes fixes to the Spark Connect client (we add the encoder to the foreachBatch function during serialization) around the foreachBatch API.

How was this patch tested?

  1. Run end-to-end test with spark-shell (with spark connect server and client running in connect mode).
  2. New / updated unit tests that would have failed without the fix.

Was this patch authored or co-authored using generative AI tooling?

No.

@haiyangsun-db haiyangsun-db changed the title [SPARK-50661] Fix Spark Connect Scala foreachBatch impl. to support Dataset[T]. [SPARK-50661][CONNECT][SS] Fix Spark Connect Scala foreachBatch impl. to support Dataset[T]. Dec 27, 2024
@WweiL
Copy link
Copy Markdown
Contributor

WweiL commented Dec 27, 2024

Thank you for the fix!

The change LGTM, for test complexity, can you add a custom class test case like here?
https://github.com/apache/spark/blob/master/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala#L426-L448

@WweiL
Copy link
Copy Markdown
Contributor

WweiL commented Dec 27, 2024

I believe this makes a 3.5 scala client running scala foreachbatch not able to run against a 4.0 spark server. But for 3.5, streaming scala is still under development, so this should be fine. But it should be worth noting somewhere about this breaking change. cc @HyukjinKwon

@haiyangsun-db
Copy link
Copy Markdown
Contributor Author

haiyangsun-db commented Dec 27, 2024

Added a new test case for using a custom class with foreachBatch (as simple as the test case in foreach) and probably good enough for now.
I do have tested the custom class case with a more complicated test case locally by launching a spark connect client running against a spark connect server, but somehow the same code does not work in the unit testing environment. I can try to improve that part in a follow up.

Copy link
Copy Markdown
Contributor

@WweiL WweiL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HyukjinKwon
Copy link
Copy Markdown
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants