Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47336][SQL][CONNECT] Provide to PySpark a functionality to get estimated size of DataFrame in bytes #46368

Open
wants to merge 19 commits into
base: master
Choose a base branch
from

Conversation

SemyonSinchenko
Copy link

What changes were proposed in this pull request?

In PySpark connect there is no access to JVM to call queryExecution().optimizedPlan.stats. So, there is no way to get information about size in bytes from plan except parsing by regexps an output of explain. This PR is trying to fill that gap by providing sizeInBytesApproximation method to JVM, PySpark Classic and PySpark Connect APIs. Under the hood it is just a call to queryExecution().optimizedPlan.stats.sizeInBytes. JVM and PySpark Classic APIs were updated just to have a parity.

  1. Update of Dataset.scala in JVM connect by adding a new API
  2. Update of Dataset.scala in JVM classic by adding a new API
  3. Update dataframe.py in sql by adding signature and doc of a new API
  4. Update dataframe.py in connect by adding an implementation of a new API
  5. Update dataframe.py in classic by adding an implementation of a new API
  6. Update base.proto in part AnalyzeRequest / AnalyzeResponse by adding new message
  7. Generate new py-files from proto
  8. Update SparkConnectAnalyzeHandler by extending match and adding call to queryExecution
  9. Update SparkConnectClient by adding a new method that build a new request
  10. Update SparkSession by adding a call to client and parsing a response
  11. Add/update corresponding tests

Why are the changes needed?

To provide to PySpark Connect users an ability to get in runtime the DataFrame size estimation without forcing them to parse string-output of df.explain. Other changes are needed to have a parity across Connect / Classic and PySpark / JVM Spark.

Does this PR introduce any user-facing change?

Only a new API. The new API is mostly for PySpark Connect users.

How was this patch tested?

Because the actual logic is in queryExecution I added tests only for syntax / calls. In tests we are testing that for a dataframe the returned size is greater than zero.

Was this patch authored or co-authored using generative AI tooling?

No.

@grundprinzip We discussed that ticket with you, may you please make a look? Thanks!

 On branch size_in_bytes_api
 Changes to be committed:
	modified:   connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
	modified:   connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
	modified:   connector/connect/common/src/main/protobuf/spark/connect/base.proto
	modified:   connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
	modified:   sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
 Changes to be committed:
	modified:   .gitignore
	modified:   connector/connect/common/src/main/protobuf/spark/connect/base.proto
	modified:   connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
	modified:   sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+ Relation instead of Plan in base.proto
+ Fix broken ids in base.proto
+ Fix corresponding parts in AnalyzeHandler

 On branch size_in_bytes_api
 Your branch is up to date with 'origin/size_in_bytes_api'.

 Changes to be committed:
	modified:   connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
	modified:   connector/connect/common/src/main/protobuf/spark/connect/base.proto
	modified:   connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
+ update naming following the discussion in JIRA

 On branch size_in_bytes_api
 Your branch is up to date with 'origin/size_in_bytes_api'.

 Changes to be committed:
	modified:   connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
	modified:   connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
	modified:   connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
	modified:   connector/connect/common/src/main/protobuf/spark/connect/base.proto
	modified:   connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
	modified:   connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
+ small fixes
+ tests

 On branch size_in_bytes_api
 Your branch is up to date with 'origin/size_in_bytes_api'.

 Changes to be committed:
	modified:   python/pyspark/sql/classic/dataframe.py
	modified:   python/pyspark/sql/connect/client/core.py
	modified:   python/pyspark/sql/connect/dataframe.py
	modified:   python/pyspark/sql/connect/proto/base_pb2.py
	modified:   python/pyspark/sql/connect/proto/base_pb2.pyi
	modified:   python/pyspark/sql/dataframe.py
	modified:   python/pyspark/sql/tests/connect/test_connect_basic.py
	modified:   python/pyspark/sql/tests/test_dataframe.py
	modified:   sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
 On branch size_in_bytes_api
 Your branch is up to date with 'origin/size_in_bytes_api'.

 Changes to be committed:
	modified:   python/pyspark/sql/dataframe.py
	modified:   sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+ delete an example because it requires a data

 On branch size_in_bytes_api
 Your branch is up to date with 'origin/size_in_bytes_api'.

 Changes to be committed:
	modified:   python/pyspark/sql/connect/dataframe.py
	modified:   python/pyspark/sql/dataframe.py
 On branch size_in_bytes_api
 Your branch is up to date with 'origin/size_in_bytes_api'.

 Changes to be committed:
	modified:   python/pyspark/sql/connect/dataframe.py
.gitignore Outdated Show resolved Hide resolved
- change from Long to bytes[] in proto
- JVM methods return BigInteger from now
- in Python conversion from BigInteger to int is via bytes[]
- drop .dir-locals.el from .gitignore
- rename _sizeInBytes -> _size_in_bytes on Python side

 On branch size_in_bytes_api
 Your branch is up to date with 'origin/size_in_bytes_api'.

 Changes to be committed:
	modified:   .gitignore
	modified:   connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
	modified:   connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
	modified:   connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
	modified:   connector/connect/common/src/main/protobuf/spark/connect/base.proto
	modified:   connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
	modified:   python/pyspark/sql/classic/dataframe.py
	modified:   python/pyspark/sql/connect/client/core.py
	modified:   python/pyspark/sql/connect/dataframe.py
	modified:   python/pyspark/sql/connect/proto/base_pb2.py
	modified:   python/pyspark/sql/connect/proto/base_pb2.pyi
	modified:   python/pyspark/sql/dataframe.py
	modified:   sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
 On branch size_in_bytes_api
 Your branch is ahead of 'origin/size_in_bytes_api' by 1 commit.
   (use "git push" to publish your local commits)

 Changes to be committed:
	modified:   connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
	modified:   connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
	modified:   connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
 On branch size_in_bytes_api
 Your branch is ahead of 'origin/size_in_bytes_api' by 2 commits.
   (use "git push" to publish your local commits)

 Changes to be committed:
	modified:   sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
 On branch size_in_bytes_api
 Your branch is ahead of 'origin/size_in_bytes_api' by 3 commits.
   (use "git push" to publish your local commits)

 Changes to be committed:
	modified:   connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@github-actions github-actions bot removed the INFRA label May 7, 2024
 On branch size_in_bytes_api
 Your branch is up to date with 'origin/size_in_bytes_api'.

 Changes to be committed:
	modified:   python/pyspark/sql/classic/dataframe.py
@SemyonSinchenko
Copy link
Author

New changes:

  • fixes from comments
  • changing the type from Long to BigInteger (bytes in proto)

+ resolving conflicts
+ regenerate python proto-classes
@SemyonSinchenko
Copy link
Author

@HyukjinKwon sorry for tagging, but may you please make a look again? Thanks in advance!

I updated docstring for sizeInBytes method of dataframe.

 Changes to be committed:
	modified:   python/pyspark/sql/connect/proto/base_pb2.py
	modified:   python/pyspark/sql/dataframe.py
@SemyonSinchenko
Copy link
Author

Changes from the last two commits (actual changes marked by bold):

  • resolve merge conflicts
  • re-generate proto files for PySpark
  • update docstring in dataframe.py: fix a typo and extend it by describing how it works and corner-cases

 Changes to be committed:
	modified:   connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
	modified:   connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@SemyonSinchenko
Copy link
Author

@HyukjinKwon I'm sorry for tagging you again, but maybe you can make a look? Thanks in advance!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants