Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server #40256

Closed
wants to merge 5 commits into from

Conversation

vicennial
Copy link
Contributor

@vicennial vicennial commented Mar 2, 2023

What changes were proposed in this pull request?

This PR introduces a mechanism to transfer artifacts (currently, local .jar + .class files) from a Spark Connect JVM/Scala client over to the server side of Spark Connect. The mechanism follows the protocol as defined in #40147 and supports batching (for multiple "small" artifacts) and chunking (for large artifacts).

Note: Server-side artifact handling is not covered in this PR.

Why are the changes needed?

In the decoupled client-server architecture of Spark Connect, a remote client may use a local JAR or a new class in their UDF that may not be present on the server. To handle these cases of missing "artifacts", we implement a mechanism to transfer artifacts from the client side over to the server side as per the protocol defined in #40147.

Does this PR introduce any user-facing change?

Yes, users would be able to use the addArtifact and addArtifacts methods (via a SparkSession instance) to transfer local files (.jar and .class extensions).

How was this patch tested?

Unit tests - located in ArtifactSuite.

*
* Currently only local files with extensions .jar and .class are supported.
*/
def addArtifact(path: String): Unit = client.addArtifact(path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mark these as experimental?

writeBatch()
}
stream.onCompleted()
ThreadUtils.awaitResult(promise.future, Duration.Inf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit on the fence about this one. This is fine for now, but in a not so far away future we shouldn't block indefinitely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we need a timeout policy. Handling this as part of https://issues.apache.org/jira/browse/SPARK-42658 (along with retry policy)

}
stream.onCompleted()
ThreadUtils.awaitResult(promise.future, Duration.Inf)
// TODO: Handle responses containing CRC failures.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File a ticket please.

}

trait ClassFinder {
def findClasses(): Iterator[Artifact]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document this a bit better. For example is this method returning all REPL generated classes, or only the new ones?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

trait ClassFinder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move it to its own source file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleting the class finder related code for now, will add it as part of https://issues.apache.org/jira/browse/SPARK-42657 (since we don't use them in this PR)

/**
* Payload stored on this machine.
*/
sealed trait LocalData extends Storage {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can flatten this hierarchy for now. There is no other data than local data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes sense. Keeping the name LocalData (rather than renaming it to say, Data) intact to make it explicit that the data needs to be present locally for the transfer to take place (for now).

* @param chunk
* @return
*/
private def checkChunkDataAndCrc(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit of a high level point. You are now using the same code to compute the crc, and to verify it. Is it possible to create more separation here. I would consider checking crcs in, or creating a file with known crc segments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added truth/golden files 👍

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look pretty good overall. Left a couple of small comments.

val data = proto.AddArtifactsRequest.ArtifactChunk
.newBuilder()
.setData(ByteString.readFrom(in))
.setCrc(in.getChecksum.getValue)
Copy link
Contributor

@amaliujia amaliujia Mar 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not an expert on networking so just a question for my self education:

so the gRPC level bytes transmission is not 100% reliable so we need another CRC to check nothing is corrupted?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about grpc's guarantees. However I have seen network transfers go wrong, and then checksums are your friend.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Thanks

@amaliujia
Copy link
Contributor

Overall looks good. Thank you!

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

hvanhovell pushed a commit that referenced this pull request Mar 3, 2023
### What changes were proposed in this pull request?

This PR introduces a mechanism to transfer artifacts (currently, local `.jar` + `.class` files) from a Spark Connect JVM/Scala client over to the server side of Spark Connect. The mechanism follows the protocol as defined in #40147 and supports batching (for multiple "small" artifacts) and chunking (for large artifacts).

Note: Server-side artifact handling is not covered in this PR.

### Why are the changes needed?

In the decoupled client-server architecture of Spark Connect, a remote client may use a local JAR or a new class in their UDF that may not be present on the server. To handle these cases of missing "artifacts", we implement a mechanism to transfer artifacts from the client side over to the server side as per the protocol defined in #40147.

### Does this PR introduce _any_ user-facing change?

Yes, users would be able to use the `addArtifact` and `addArtifacts` methods (via a `SparkSession` instance) to transfer local files (`.jar` and `.class` extensions).

### How was this patch tested?

Unit tests - located in `ArtifactSuite`.

Closes #40256 from vicennial/SPARK-42653.

Authored-by: vicennial <venkata.gudesa@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
(cherry picked from commit 8a0d626)
Signed-off-by: Herman van Hovell <herman@databricks.com>
@hvanhovell hvanhovell closed this in 8a0d626 Mar 3, 2023
Files
.readAllLines(artifactCrcPath.resolve(crcFileName))
.asScala
.map(_.toLong)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala 2.13 build is broken by this. I made a quick followup #40267

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

hvanhovell pushed a commit that referenced this pull request Mar 23, 2023
### What changes were proposed in this pull request?

This PR adds server-side artifact management as a follow up to the client-side artifact transfer introduced in #40256.

Note: The artifacts added on the server are visible to **all users** of the cluster. This is a limitation of the current spark architecture (unisolated classloaders).

Apart from storing generic artifacts, we handle jars and classfiles in specific ways:

- Jars:
  - Jars may be added but not removed or overwritten.
  - Added jars would be visible to **all** users/tasks/queries.
- Classfiles:
  - Classfiles may not be explicitly removed but are allowed to be overwritten.
  - We piggyback on top of the REPL architecture to serve classfiles to the executors
    -  If a REPL is initialized, classfiles are stored in the existing `spark.repl.class.outputDir` and share the URI with `spark.repl.class.uri`.
    - If a REPL is not being used, we use a custom directory (root: `sparkContext. sparkConnectArtifactDirectory`) to store classfiles and point the `spark.repl.class.uri` towards it.
  - Class files are visible to **all** users/tasks/queries.

### Why are the changes needed?

#40256 implements the client-side transfer of artifacts to the server but currently, the server does not process these requests.

We need to implement a server-side management mechanism to handle the storage of these artifacts on the driver as well as perform further processing (such as adding jars and moving class files to the right directories).

### Does this PR introduce _any_ user-facing change?

Yes, a new experimental API but no behavioural changes.
A new method called `sparkConnectArtifactDirectory` is accessible through SparkContext (the directory storing all artifacts from SparkConnect)

### How was this patch tested?

New unit tests.

Closes #40368 from vicennial/SPARK-42748.

Lead-authored-by: vicennial <venkata.gudesa@databricks.com>
Co-authored-by: Venkata Sai Akhil Gudesa <venkata.gudesa@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
val buf = new Array[Byte](CHUNK_SIZE)
var bytesRead = 0
var count = 0
while (count != -1 && bytesRead < CHUNK_SIZE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: why do we need this while loop? Seems like:

count = in.read(buf, 0, CHUNK_SIZE)
if (count == 0) ByteString.empty()
else ByteString.copyFrom(buf, 0, count)

would be good enough because read is blocked until it meets EOF IIRC.

HyukjinKwon added a commit that referenced this pull request May 23, 2023
…in Python client

### What changes were proposed in this pull request?

This PR implements `SparkSession.addArtifact(s)`. The logic is basically translated from Scala (#40256) to Python here.

One difference is that, it does not support `class` files and `cache` (#40827) because it's not realistic for Python client to add `class` files. For `cache`, this implementation will be used as a base work.

This PR is also a base work to implement sending py-files and archive files

### Why are the changes needed?

For feature parity w/ Scala client. In addition, this is also base work for `cache` implementation, and Python dependency management (https://www.databricks.com/blog/2020/12/22/how-to-manage-python-dependencies-in-pyspark.html)

### Does this PR introduce _any_ user-facing change?

Yes, this exposes an API `SparkSession.addArtifact(s)`.

### How was this patch tested?

Unittests were added. Also manually tested.

Closes #41250 from HyukjinKwon/python-addArtifacts.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
### What changes were proposed in this pull request?

This PR introduces a mechanism to transfer artifacts (currently, local `.jar` + `.class` files) from a Spark Connect JVM/Scala client over to the server side of Spark Connect. The mechanism follows the protocol as defined in apache#40147 and supports batching (for multiple "small" artifacts) and chunking (for large artifacts).

Note: Server-side artifact handling is not covered in this PR.

### Why are the changes needed?

In the decoupled client-server architecture of Spark Connect, a remote client may use a local JAR or a new class in their UDF that may not be present on the server. To handle these cases of missing "artifacts", we implement a mechanism to transfer artifacts from the client side over to the server side as per the protocol defined in apache#40147.

### Does this PR introduce _any_ user-facing change?

Yes, users would be able to use the `addArtifact` and `addArtifacts` methods (via a `SparkSession` instance) to transfer local files (`.jar` and `.class` extensions).

### How was this patch tested?

Unit tests - located in `ArtifactSuite`.

Closes apache#40256 from vicennial/SPARK-42653.

Authored-by: vicennial <venkata.gudesa@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
(cherry picked from commit 8a0d626)
Signed-off-by: Herman van Hovell <herman@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants