Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43906][PYTHON][CONNECT] Implement the file support in SparkSession.addArtifacts #41415

Closed
wants to merge 1 commit into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jun 1, 2023

What changes were proposed in this pull request?

This PR proposes to add the support of the regular files in SparkSession.addArtifacts.

Why are the changes needed?

So users can add the regular files in the worker nodes.

Does this PR introduce any user-facing change?

Yes, it adds the support of arbitrary regular files in SparkSession.addArtifacts.

How was this patch tested?

Added a couple of unittests.

Also manually tested in local-cluster:

./sbin/start-connect-server.sh --jars `ls connector/connect/server/target/**/spark-connect*SNAPSHOT.jar` --master "local-cluster[2,2,1024]"
./bin/pyspark --remote "sc://localhost:15002"
import os
import tempfile
from pyspark.sql.functions import udf
from pyspark import SparkFiles

with tempfile.TemporaryDirectory() as d:
    file_path = os.path.join(d, "my_file.txt")
    with open(file_path, "w") as f:
        f.write("Hello world!!")
    @udf("string")
    def func(x):
        with open(
            os.path.join(SparkFiles.getRootDirectory(), "my_file.txt"), "r"
        ) as my_file:
            return my_file.read().strip()
    spark.addArtifacts(file_path, file=True)
    spark.range(1).select(func("id")).show()

@HyukjinKwon
Copy link
Member Author

. cc @hvanhovell @vicennial, mind taking a look please?

@@ -154,6 +154,8 @@ class SparkConnectArtifactManager private[connect] {
val canonicalUri =
fragment.map(UriBuilder.fromUri(target.toUri).fragment).getOrElse(target.toUri)
sessionHolder.session.sparkContext.addArchive(canonicalUri.toString)
} else if (remoteRelativePath.startsWith(s"files${File.separator}")) {
sessionHolder.session.sparkContext.addFile(target.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are going to add session isolation for scala udfs soon. How do you think we should implement file support when there are multiple users uploading files with the same name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe jars have the same problem, and I believe we could share the same fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only problem here is that we need to design this for the python side. In practice artifacts for a session are exposed in a session specific location. How would a python user interact with these files? Through org.apache.spark.SparkFiles?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For regular files and archives, I don't intend to expose org.apache.spark.SparkFiles for now.
Since the files are archives are always stored at the current working directory of executors in production, I was simply thinking about creating a session dedicated directory, and change the current working directory to that (during Python UDF execution).

Meaning that the end users would continue accessing to their file with ./myfile.txt or ./myarchive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(SparkFiles is being used in the test case here but that's a sort of hack to make sure of cleaning up, etc.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be extra clear, each Spark Connect session will have a dedicated directory on worker nodes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make a PR right away after this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offline discussed with Herman. Will merge this first, and make a PR to address it.

@HyukjinKwon
Copy link
Member Author

Merged to master.

czxm pushed a commit to czxm/spark that referenced this pull request Jun 12, 2023
…sion.addArtifacts

### What changes were proposed in this pull request?

This PR proposes to add the support of the regular files in `SparkSession.addArtifacts`.

### Why are the changes needed?

So users can add the regular files in the worker nodes.

### Does this PR introduce _any_ user-facing change?

Yes, it adds the support of arbitrary regular files in `SparkSession.addArtifacts`.

### How was this patch tested?

Added a couple of unittests.

Also manually tested in `local-cluster`:

```bash
./sbin/start-connect-server.sh --jars `ls connector/connect/server/target/**/spark-connect*SNAPSHOT.jar` --master "local-cluster[2,2,1024]"
./bin/pyspark --remote "sc://localhost:15002"
```

```python
import os
import tempfile
from pyspark.sql.functions import udf
from pyspark import SparkFiles

with tempfile.TemporaryDirectory() as d:
    file_path = os.path.join(d, "my_file.txt")
    with open(file_path, "w") as f:
        f.write("Hello world!!")
    udf("string")
    def func(x):
        with open(
            os.path.join(SparkFiles.getRootDirectory(), "my_file.txt"), "r"
        ) as my_file:
            return my_file.read().strip()
    spark.addArtifacts(file_path, file=True)
    spark.range(1).select(func("id")).show()
```

Closes apache#41415 from HyukjinKwon/addFile.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@HyukjinKwon HyukjinKwon deleted the addFile branch January 15, 2024 00:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants