Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43474] [SS] [CONNECT] Add a spark connect access to runtime Dataframes by ID. #41580

Closed
wants to merge 17 commits into from

Conversation

rangadi
Copy link
Contributor

@rangadi rangadi commented Jun 13, 2023

[This is a continuation of #41146, to change the author of the PR. Retains the description.]

What changes were proposed in this pull request?

This change adds a new spark connect relation type CachedRemoteRelation, which can represent a DataFrame that's been cached on the server side.

On the server side, each SessionHolder has a cache to maintain mapping from Dataframe ID to actual dataframe.

On the client side, a new relation type and function is added. The new function can create a DataFrame reference given a key. The key is the id of a cached DataFrame, which is usually passed from server to the client. When transforming the DataFrame reference, the server finds the actual DataFrame from the cache and replace it.

One use case of this function will be streaming foreachBatch(). Server needs to call user function for every batch which takes a DataFrame as argument. With the new function, we can cache the DataFrame on the server. Pass the id back to client which can creates the DataFrame reference.

Why are the changes needed?

This change is needed to support streaming foreachBatch() in Spark Connect.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Scala unit test.
Manual test.
(More end to end test will be added when foreachBatch() is supported. Currently there is no way to add a dataframe to the server cache using Python.)

// Represents a remote relation that has been cached on server.
message CachedRemoteRelation {
// (Required) ID of the remote related (assigned by the service).
string relation_id = 3;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[continuation of the comment here]
@grundprinzip removed user_id & 'session_id'.

@@ -788,6 +790,14 @@ class SparkConnectPlanner(val session: SparkSession) extends Logging {
.logicalPlan
}

private def transformCachedRemoteRelation(
session: SparkSession,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw the session is already a class member of SparkConnectPlanner.


// Session.sessionUUID -> Map[DF Reference ID -> DF]
@GuardedBy("this")
private val dataFrameCache = mutable.Map[String, mutable.Map[String, DataFrame]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once you move this into the session holder, please use a concurrent hashmap instead because you only need one then.

import org.apache.spark.sql.connect.common.InvalidPlanInput
import org.apache.spark.sql.test.SharedSparkSession

class SparkConnectCachedDataFrameManagerSuite extends SharedSparkSession {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to add tests that when you have different sessions with different users that you don't get access to a "guessed" cached plan. This is the key part for security.

hvanhovell pushed a commit that referenced this pull request Jun 16, 2023
### What changes were proposed in this pull request?

This adds SessionHolder rather than just SparkSession to `SparkConnectPlanner`. This is to allow access to session specific state at connect server level. Note that this is Spark-Connect specific session state, and is not stored with SparkSession.

E.g.
  * Mapping from _dataframe reference id_ to actual dataframe in  #41580
  * File and archives stored with session in #41495

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
 - Existing unit tests.

Closes #41618 from rangadi/session-holder.

Authored-by: Raghu Angadi <raghu.angadi@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
czxm pushed a commit to czxm/spark that referenced this pull request Jun 19, 2023
### What changes were proposed in this pull request?

This adds SessionHolder rather than just SparkSession to `SparkConnectPlanner`. This is to allow access to session specific state at connect server level. Note that this is Spark-Connect specific session state, and is not stored with SparkSession.

E.g.
  * Mapping from _dataframe reference id_ to actual dataframe in  apache#41580
  * File and archives stored with session in apache#41495

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
 - Existing unit tests.

Closes apache#41618 from rangadi/session-holder.

Authored-by: Raghu Angadi <raghu.angadi@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
Copy link
Contributor

@bogao007 bogao007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

*/
private[connect] def cacheDataFrameById(dfId: String, df: DataFrame): Unit = {
if (dataFrameCache.putIfAbsent(dfId, df) != null) {
throw new IllegalArgumentException(s"A dataframe is already associated with id $dfId")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot you use the error framework for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any existing ones I can use? This should be rare since it would only be caused by our bug. This is not very user visible.
I didn't see much of framework errors used in connect server yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any existing ones I can use?

Add new error class to error-classes.json, and raise SparkException.

it would only be caused by our bug

Then we should consider SparkException.internalError

I didn't see much of framework errors used in connect server yet.

Should start doing that if we are going to transfer errors/exceptions from server to client in a consistent way.

Copy link
Contributor Author

@rangadi rangadi Jun 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can use SparkException.internalError. Seems like it is not used anywhere in connect yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far, we are migrating on the error class, and converted 72 cases already:

$ find . -name "*.scala" -print0|xargs -0 grep 'SparkException.internalError'|wc -l
      72

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. Updated it to use SparkException.internalError.

private[connect] def getDataFrameOrThrow(dfId: String): DataFrame = {
Option(dataFrameCache.get(dfId))
.getOrElse {
throw InvalidPlanInput(s"No DataFrame with id $dfId is found in the session $sessionId")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here, how about to introduce an error class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be not needed since InvalidPlanInput is used widely for this exact purpose and this is less user visible.

@HyukjinKwon
Copy link
Member

Merged to master.

dongjoon-hyun added a commit that referenced this pull request Nov 4, 2023
…scala` to `SparkConnectSessionHolderSuite.scala`

### What changes were proposed in this pull request?

This PR aims to fix a typo `Hodler` in file name.
- `SparkConnectSessionHodlerSuite.scala` (from)
- `SparkConnectSessionHolderSuite.scala` (to)

It's also unmatched with the class name in the file because class name itself is correct.

https://github.com/apache/spark/blob/3363c2af3f6a59363135451d251f25e328a4fddf/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala#L37

### Why are the changes needed?

This is a typo from the original PR.
- #41580

Since the original PR is shipped as Apache Spark 3.5.0, I created a JIRA instead of a follow-up. We need to backport this patch to `branch-3.5`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43657 from dongjoon-hyun/SPARK-45791.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 6d669fa)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
dongjoon-hyun added a commit that referenced this pull request Nov 4, 2023
…scala` to `SparkConnectSessionHolderSuite.scala`

### What changes were proposed in this pull request?

This PR aims to fix a typo `Hodler` in file name.
- `SparkConnectSessionHodlerSuite.scala` (from)
- `SparkConnectSessionHolderSuite.scala` (to)

It's also unmatched with the class name in the file because class name itself is correct.

https://github.com/apache/spark/blob/3363c2af3f6a59363135451d251f25e328a4fddf/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala#L37

### Why are the changes needed?

This is a typo from the original PR.
- #41580

Since the original PR is shipped as Apache Spark 3.5.0, I created a JIRA instead of a follow-up. We need to backport this patch to `branch-3.5`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43657 from dongjoon-hyun/SPARK-45791.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
*/
private[connect] def cacheDataFrameById(dfId: String, df: DataFrame): Unit = {
if (dataFrameCache.putIfAbsent(dfId, df) != null) {
SparkException.internalError(s"A dataframe is already associated with id $dfId")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internalError just creates SparkException, so, need to throw it apparently. Here is the PR #44400 with a minor fix of this mistake and another one.

HyukjinKwon added a commit that referenced this pull request May 21, 2024
…k Connect

### What changes were proposed in this pull request?

This PR proposes to `DataFrame.checkpoint` and `DataFrame.localCheckpoint` API in Spark Connect.

#### Overview

![Screenshot 2024-05-16 at 10 39 25 AM](https://github.com/apache/spark/assets/6477701/c5c4754f-3d5e-4f4a-8f9d-a7218ce49320)

1. Spark Connect Client invokes [local]checkpoint
    - Connects to the server, store (Session UI, UUID) <> Checkpointed DataFrame
2. Execute [local]checkpoint
3. Returns UUID for the checkedpointed DataFrame.
   - Client side holds the UUID with truncated (replaced) the protobuf message
4. When the DataFrame in client side is garbage-collected, it is invoked to clear the state within Spark Connect server.
5. If the checkpointed RDD is not referred anymore (e.g., not even by temp view as an example), it is cleaned by ContextCleaner (which runs separately, and periodically)
6. *When the session is closed, it attempts to clear all mapped state in Spark Connect server (because it is not guaranteed to call `DataFrame.__del__` in Python upon garbage-collection)
7. *If the checkpointed RDD is not referred anymore (e.g., not even by temp view as an example), it is cleaned by ContextCleaner (which runs separately, and periodically)

*In 99.999% cases, the state (map<(session_id, uuid), c'p'dataframe>) will be cleared when DataFrame is garbage-collected, e.g., unless there are some crashes. Practically, Py4J also leverages to clean up their Java objects. For 0.001% cases, the 6. and 7. address them. Both steps  happen when session is closed, and session holder is released, see also [#41580](#41580).

#### Command/RPCs

Reuse `CachedRemoteRelation` (from [#41580](#41580))

```proto
message Command {
  oneof command_type {
    ...
    CheckpointCommand checkpoint_command = 14;
    RemoveCachedRemoteRelationCommand remove_cached_remote_relation_command = 15;
    ...
  }
}

// Command to remove `CashedRemoteRelation`
message RemoveCachedRemoteRelationCommand {
  // (Required) The remote to be related
  CachedRemoteRelation relation = 1;
}

message CheckpointCommand {
  // (Required) The logical plan to checkpoint.
  Relation relation = 1;

  // (Optional) Locally checkpoint using a local temporary
  // directory in Spark Connect server (Spark Driver)
  optional bool local = 2;

  // (Optional) Whether to checkpoint this dataframe immediately.
  optional bool eager = 3;
}

message CheckpointCommandResult {
  // (Required) The logical plan checkpointed.
  CachedRemoteRelation relation = 1;
}
```

```proto
message ExecutePlanResponse {

  ...

  oneof response_type {

    ...

    CheckpointCommandResult checkpoint_command_result = 19;
  }

  ...

  message Checkpoint {
    // (Required) The logical plan checkpointed.
    CachedRemoteRelation relation = ...;
  }
}
```

#### Usage

```bash
./sbin/start-connect-server.sh --conf spark.checkpoint.dir=/path/to/checkpoint
```

```python
spark.range(1).localCheckpoint()
spark.range(1).checkpoint()
```

### Why are the changes needed?

For feature parity without Spark Connect.

### Does this PR introduce _any_ user-facing change?

Yes, it adds both `DataFrame.checkpoint` and `DataFrame.localCheckpoint` API in Spark Connect.

### How was this patch tested?

Unittests, and manually tested as below:

**Code**

```bash
./bin/pyspark --remote "local[*]"
```

```python
>>> df = spark.range(1).localCheckpoint()
>>> df.explain(True)
== Parsed Logical Plan ==
LogicalRDD [id#1L], false

== Analyzed Logical Plan ==
id: bigint
LogicalRDD [id#1L], false

== Optimized Logical Plan ==
LogicalRDD [id#1L], false

== Physical Plan ==
*(1) Scan ExistingRDD[id#1L]

>>> df._plan
<pyspark.sql.connect.plan.CachedRemoteRelation object at 0x147734a50>
>>> del df
```

**Logs**

```
...
{"ts":"2024-05-14T06:18:01.711Z","level":"INFO","msg":"Caching DataFrame with id 7316f315-d20d-446d-b5e7-ac848870e280","context":{"dataframe_id":"7316f315-d20d-446d-b5e7-ac848870e280"},"logger":"SparkConnectAnalyzeHandler"}
...
{"ts":"2024-05-14T06:18:11.718Z","level":"INFO","msg":"Removing DataFrame with id 7316f315-d20d-446d-b5e7-ac848870e280 from the cache","context":{"dataframe_id":"7316f315-d20d-446d-b5e7-ac848870e280"},"logger":"SparkConnectPlanner"}
...
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46570 from HyukjinKwon/SPARK-48258.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants