Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44006][CONNECT][PYTHON] Support cache artifacts #41465

Closed

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Jun 5, 2023

What changes were proposed in this pull request?

In the PR, I propose to extend Artifact API of the Python connect client by two new methods similarly to #40827:

  1. is_cached_artifact() checks the cache of the given hash presents at the server side.
  2. cache_artifact() caches a blob in memory at the server side.

Why are the changes needed?

To allow creating a dataframe from a large local collection. spark.createDataFrame(...) fails with the following error w/o the changes:

pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.RESOURCE_EXHAUSTED
	details = "Sent message larger than max (629146388 vs. 134217728)"
	debug_error_string = "UNKNOWN:Error received from peer localhost:58218 {grpc_message:"Sent message larger than max (629146388 vs. 134217728)", grpc_status:8, created_time:"2023-06-05T18:35:50.912817+03:00"}"

Does this PR introduce any user-facing change?

No.

How was this patch tested?

By running new tests:

$ python/run-tests --parallelism=1 --testnames 'pyspark.sql.tests.connect.client.test_artifact ArtifactTests'

@@ -623,6 +623,20 @@ def test_with_local_list(self):
):
self.connect.createDataFrame(data, "col1 int, col2 int, col3 int")

def test_streaming_local_relation(self):
import random
import string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can just import them on the top

@@ -271,6 +271,17 @@ def func(x):
self.spark.addArtifacts(f"{archive_path}.zip#my_files", archive=True)
self.assertEqual(self.spark.range(1).select(func("id")).first()[0], "hello world!")

def test_cache_artifact(self):
import hashlib
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -294,6 +295,15 @@ def func(x):
self.spark.addArtifacts(file_path, file=True)
self.assertEqual(self.spark.range(1).select(func("id")).first()[0], "Hello world!!")

def test_cache_artifact(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon For some unknown reasons the test suit freezes after the test completes successfully.

I have added similar test for Scala client, please, review it: #41493

@MaxGekk MaxGekk changed the title [WIP][CONNECT][PYTHON] Support Python's createDataFrame in streaming manner [SPARK-44006][CONNECT][PYTHON] Support cache artifacts Jun 8, 2023
@MaxGekk MaxGekk marked this pull request as ready for review June 8, 2023 08:19
@MaxGekk
Copy link
Member Author

MaxGekk commented Jun 8, 2023

Merging to master. Thank you, @HyukjinKwon for review.

@MaxGekk MaxGekk closed this in 958b854 Jun 8, 2023
czxm pushed a commit to czxm/spark that referenced this pull request Jun 12, 2023
### What changes were proposed in this pull request?
In the PR, I propose to extend Artifact API of the Python connect client by two new methods similarly to apache#40827:
1. `is_cached_artifact()` checks the cache of the given hash presents at the server side.
2. `cache_artifact()` caches a blob in memory at the server side.

### Why are the changes needed?
To allow creating a dataframe from a large local collection. `spark.createDataFrame(...)` fails with the following error w/o the changes:
```python
pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.RESOURCE_EXHAUSTED
	details = "Sent message larger than max (629146388 vs. 134217728)"
	debug_error_string = "UNKNOWN:Error received from peer localhost:58218 {grpc_message:"Sent message larger than max (629146388 vs. 134217728)", grpc_status:8, created_time:"2023-06-05T18:35:50.912817+03:00"}"
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running new tests:
```
$ python/run-tests --parallelism=1 --testnames 'pyspark.sql.tests.connect.client.test_artifact ArtifactTests'
```

Closes apache#41465 from MaxGekk/streaming-createDataFrame-python-3.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants