Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42585][CONNECT] Streaming of local relations #40827

Closed
wants to merge 61 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Apr 17, 2023

What changes were proposed in this pull request?

In the PR, I propose to transfer a local relation to the server in streaming way when it exceeds some size which is defined by the SQL config spark.sql.session.localRelationCacheThreshold. The config value is 64MB by default. In particular:

  1. The client applies the sha256 function over the arrow form of the local relation;
  2. It checks presents of the relation at the server side by sending the relation hash to the server;
  3. If the server doesn't have the local relation, the client transfers the local relation as an artefact with the name cache/<sha256>;
  4. As soon as the relation has presented at the server already, or transferred recently, the client transform the logical plan by replacing the LocalRelation node by CachedLocalRelation with the hash.
  5. On another hand, the server converts CachedLocalRelation back to LocalRelation by retrieving the relation body from the local cache.

Details of the implementation

The client sends new command ArtifactStatusesRequest to check either the local relation is cached at the server or not. New command comes via new RPC endpoint ArtifactStatus. And the server answers by new message ArtifactStatusesResponse, see base.proto.

The client transfers serialized (in avro) body of local relation and its schema via the RPC endpoint AddArtifacts. On another hand, the server stores the received artifact in the block manager using the id CacheId. The last one has 3 parts:

  • userId - the identifier of the user that created the local relation,
  • sessionId - the identifier of the session which the relation belongs to,
  • hash - a sha-256 hash over relation body.

See SparkConnectArtifactManager.addArtifact().

The current query is blocked till the local relation is cached at the server side.

When the server receives the query, it retrieves userId, sessionId and hash from CachedLocalRelation, and gets the local relation data from the block manager. See SparkConnectPlanner.transformCachedLocalRelation().

The occupied blocks at the block manager are removed when an user session is invalidated in userSessionMapping. See SparkConnectService.RemoveSessionListener and BlockManager.removeCache()`.

Why are the changes needed?

To allow creating a dataframe from a large local collection. spark.createDataFrame(...) fails with the following error w/o the changes:

23/04/21 20:32:20 WARN NettyServerStream: Exception processing message
org.sparkproject.connect.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: gRPC message exceeds maximum size 134217728: 268435456
	at org.sparkproject.connect.grpc.Status.asRuntimeException(Status.java:526)

Does this PR introduce any user-facing change?

No. The changes extend the existing proto API.

How was this patch tested?

By running the new tests:

$ build/sbt "test:testOnly *.ArtifactManagerSuite"
$ build/sbt "test:testOnly *.ClientE2ETestSuite"
$ build/sbt "test:testOnly *.ArtifactStatusesHandlerSuite"

@MaxGekk MaxGekk changed the title [WIP][SPARK-42585][CONNECT] Streaming the createDataFrame implementation [WIP][SPARK-42585][CONNECT] Streaming of local relations Apr 17, 2023
tmpFile = tmpFile,
blockSize = tmpFile.length())
updater.save()
}(catchBlock = {tmpFile.delete()})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will connect server remove the temp files after closing the connect session?
I guess we may add session id and user id to the blockId, and release all the related blocks when a session ends.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, makes sense. Let me try that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@grundprinzip
Copy link
Contributor

Are the Python changes done in a follow up?

val arrowData = ConvertToArrow(encoder, data, timeZoneId, allocator)
localRelationBuilder.setData(arrowData)
val (arrowData, arrowDataSize) = ConvertToArrow(encoder, data, timeZoneId, allocator)
if (arrowDataSize <= conf.get(SQLConf.LOCAL_RELATION_CACHE_THRESHOLD.key).toInt) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of weird that we're using an internal API for the client side confs.

Ideally we leverage the existing stub configs in connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala for now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of weird that we're using an internal API for the client side confs.

  1. I thought the caching approach could be implemented not only in the connect.
  2. The place you pointed out has constants only but I want to give users some control over the feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes absolutely, your're right this needs to be configurable. My point is mostly that we don't have Spark Confs on the client. In Python we don't have the JVM to parse them on startup for example, you can set them via spark.conf.set but that's it.

My general recommendation would be to avoid pulling in additional SQL/Core dependencies in the client.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is mostly that we don't have Spark Confs on the client.

ahh, you meant this:

Suggested change
if (arrowDataSize <= conf.get(SQLConf.LOCAL_RELATION_CACHE_THRESHOLD.key).toInt) {
if (arrowDataSize <= conf.get("spark.sql.session.localRelationCacheThreshold").toInt) {

like @hvanhovell did above:

val timeZoneId = conf.get("spark.sql.session.timeZone")

@MaxGekk
Copy link
Member Author

MaxGekk commented May 1, 2023

Are the Python changes done in a follow up?

Yep, in a separate PR.

@HyukjinKwon
Copy link
Member

Merged to master.

LuciferYang pushed a commit to LuciferYang/spark that referenced this pull request May 10, 2023
### What changes were proposed in this pull request?
In the PR, I propose to transfer a local relation to the server in streaming way when it exceeds some size which is defined by the SQL config `spark.sql.session.localRelationCacheThreshold`. The config value is 64MB by default. In particular:
1. The client applies the `sha256` function over the arrow form of the local relation;
2. It checks presents of the relation at the server side by sending the relation hash to the server;
3. If the server doesn't have the local relation, the client transfers the local relation as an artefact with the name `cache/<sha256>`;
4. As soon as the relation has presented at the server already, or transferred recently, the client transform the logical plan by replacing the `LocalRelation` node by `CachedLocalRelation` with the hash.
5. On another hand, the server converts `CachedLocalRelation` back to `LocalRelation` by retrieving the relation body from the local cache.

#### Details of the implementation
The client sends new command `ArtifactStatusesRequest` to check either the local relation is cached at the server or not. New command comes via new RPC endpoint `ArtifactStatus`. And the server answers by new message `ArtifactStatusesResponse`, see **base.proto**.

The client transfers serialized (in avro) body of local relation and its schema via the RPC endpoint `AddArtifacts`. On another hand, the server stores the received artifact in the block manager using the id `CacheId`. The last one has 3 parts:
- `userId` - the identifier of the user that created the local relation,
- `sessionId` - the identifier of the session which the relation belongs to,
- `hash` - a `sha-256` hash over relation body.

See **SparkConnectArtifactManager.addArtifact()**.

The current query is blocked till the local relation is cached at the server side.

When the server receives the query, it retrieves `userId`, `sessionId` and `hash` from `CachedLocalRelation`, and gets the local relation data from the block manager. See **SparkConnectPlanner.transformCachedLocalRelation()**.

The occupied blocks at the block manager are removed when an user session is invalidated in `userSessionMapping`. See **SparkConnectService.RemoveSessionListener** and **BlockManager.removeCache()`**.

### Why are the changes needed?
To allow creating a dataframe from a large local collection. `spark.createDataFrame(...)` fails with the following error w/o the changes:
```java
23/04/21 20:32:20 WARN NettyServerStream: Exception processing message
org.sparkproject.connect.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: gRPC message exceeds maximum size 134217728: 268435456
	at org.sparkproject.connect.grpc.Status.asRuntimeException(Status.java:526)
```

### Does this PR introduce _any_ user-facing change?
No. The changes extend the existing proto API.

### How was this patch tested?
By running the new tests:
```
$ build/sbt "test:testOnly *.ArtifactManagerSuite"
$ build/sbt "test:testOnly *.ClientE2ETestSuite"
$ build/sbt "test:testOnly *.ArtifactStatusesHandlerSuite"
```

Closes apache#40827 from MaxGekk/streaming-createDataFrame-2.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request May 23, 2023
…in Python client

### What changes were proposed in this pull request?

This PR implements `SparkSession.addArtifact(s)`. The logic is basically translated from Scala (#40256) to Python here.

One difference is that, it does not support `class` files and `cache` (#40827) because it's not realistic for Python client to add `class` files. For `cache`, this implementation will be used as a base work.

This PR is also a base work to implement sending py-files and archive files

### Why are the changes needed?

For feature parity w/ Scala client. In addition, this is also base work for `cache` implementation, and Python dependency management (https://www.databricks.com/blog/2020/12/22/how-to-manage-python-dependencies-in-pyspark.html)

### Does this PR introduce _any_ user-facing change?

Yes, this exposes an API `SparkSession.addArtifact(s)`.

### How was this patch tested?

Unittests were added. Also manually tested.

Closes #41250 from HyukjinKwon/python-addArtifacts.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
MaxGekk added a commit that referenced this pull request Jun 8, 2023
### What changes were proposed in this pull request?
In the PR, I propose to extend Artifact API of the Python connect client by two new methods similarly to #40827:
1. `is_cached_artifact()` checks the cache of the given hash presents at the server side.
2. `cache_artifact()` caches a blob in memory at the server side.

### Why are the changes needed?
To allow creating a dataframe from a large local collection. `spark.createDataFrame(...)` fails with the following error w/o the changes:
```python
pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.RESOURCE_EXHAUSTED
	details = "Sent message larger than max (629146388 vs. 134217728)"
	debug_error_string = "UNKNOWN:Error received from peer localhost:58218 {grpc_message:"Sent message larger than max (629146388 vs. 134217728)", grpc_status:8, created_time:"2023-06-05T18:35:50.912817+03:00"}"
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running new tests:
```
$ python/run-tests --parallelism=1 --testnames 'pyspark.sql.tests.connect.client.test_artifact ArtifactTests'
```

Closes #41465 from MaxGekk/streaming-createDataFrame-python-3.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
MaxGekk added a commit that referenced this pull request Jun 9, 2023
…reaming manner

### What changes were proposed in this pull request?
In the PR, I propose to transfer a local relation from **the Python connect client** to the server in streaming way when it exceeds some size which is defined by the SQL config `spark.sql.session.localRelationCacheThreshold`. The implementation is similar to #40827.  In particular:
1. The client applies the `sha256` function over **the proto form** of the local relation;
2. It checks presents of the relation at the server side by sending the relation hash to the server;
3. If the server doesn't have the local relation, the client transfers the local relation as an artefact with the name `cache/<sha256>`;
4. As soon as the relation has presented at the server already, or transferred recently, the client transform the logical plan by replacing the `LocalRelation` node by `CachedLocalRelation` with the hash.
5. On another hand, the server converts `CachedLocalRelation` back to `LocalRelation` by retrieving the relation body from the local cache.

### Why are the changes needed?
To fix the issues of creating a large dataframe from a local collection:
```python
pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.RESOURCE_EXHAUSTED
	details = "Sent message larger than max (134218508 vs. 134217728)"
	debug_error_string = "UNKNOWN:Error received from peer localhost:50982 {grpc_message:"Sent message larger than max (134218508 vs. 134217728)", grpc_status:8, created_time:"2023-06-09T15:34:08.362797+03:00"}
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running new test:
```
$ python/run-tests --parallelism=1 --testnames 'pyspark.sql.tests.connect.test_connect_basic SparkConnectBasicTests.test_streaming_local_relation'
```

Closes #41537 from MaxGekk/streaming-createDataFrame-python-4.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
czxm pushed a commit to czxm/spark that referenced this pull request Jun 12, 2023
### What changes were proposed in this pull request?
In the PR, I propose to extend Artifact API of the Python connect client by two new methods similarly to apache#40827:
1. `is_cached_artifact()` checks the cache of the given hash presents at the server side.
2. `cache_artifact()` caches a blob in memory at the server side.

### Why are the changes needed?
To allow creating a dataframe from a large local collection. `spark.createDataFrame(...)` fails with the following error w/o the changes:
```python
pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.RESOURCE_EXHAUSTED
	details = "Sent message larger than max (629146388 vs. 134217728)"
	debug_error_string = "UNKNOWN:Error received from peer localhost:58218 {grpc_message:"Sent message larger than max (629146388 vs. 134217728)", grpc_status:8, created_time:"2023-06-05T18:35:50.912817+03:00"}"
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running new tests:
```
$ python/run-tests --parallelism=1 --testnames 'pyspark.sql.tests.connect.client.test_artifact ArtifactTests'
```

Closes apache#41465 from MaxGekk/streaming-createDataFrame-python-3.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
czxm pushed a commit to czxm/spark that referenced this pull request Jun 12, 2023
…reaming manner

### What changes were proposed in this pull request?
In the PR, I propose to transfer a local relation from **the Python connect client** to the server in streaming way when it exceeds some size which is defined by the SQL config `spark.sql.session.localRelationCacheThreshold`. The implementation is similar to apache#40827.  In particular:
1. The client applies the `sha256` function over **the proto form** of the local relation;
2. It checks presents of the relation at the server side by sending the relation hash to the server;
3. If the server doesn't have the local relation, the client transfers the local relation as an artefact with the name `cache/<sha256>`;
4. As soon as the relation has presented at the server already, or transferred recently, the client transform the logical plan by replacing the `LocalRelation` node by `CachedLocalRelation` with the hash.
5. On another hand, the server converts `CachedLocalRelation` back to `LocalRelation` by retrieving the relation body from the local cache.

### Why are the changes needed?
To fix the issues of creating a large dataframe from a local collection:
```python
pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.RESOURCE_EXHAUSTED
	details = "Sent message larger than max (134218508 vs. 134217728)"
	debug_error_string = "UNKNOWN:Error received from peer localhost:50982 {grpc_message:"Sent message larger than max (134218508 vs. 134217728)", grpc_status:8, created_time:"2023-06-09T15:34:08.362797+03:00"}
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running new test:
```
$ python/run-tests --parallelism=1 --testnames 'pyspark.sql.tests.connect.test_connect_basic SparkConnectBasicTests.test_streaming_local_relation'
```

Closes apache#41537 from MaxGekk/streaming-createDataFrame-python-4.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants