-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-52334][CORE][K8S] update all files, jars, and pyFiles to reference the working directory after they are downloaded #51037
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Hi @dongjoon-hyun , |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think you can write a test case, @TongWei1105 ?
|
"/home/thejar.jar", | ||
"arg1") | ||
val appArgs = new SparkSubmitArguments(clArgs) | ||
val _ = submit.prepareSubmitEnvironment(appArgs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update this test to handle archive as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update this test to handle archive as well ?
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log4j2.properties
is not an archive file - and so ends up getting copied for destination (variant of existing cases).
I am trying to ensure that if (isArchive) {
works as expected when the file actually results in unpacking the file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log4j2.properties
is not an archive file - and so ends up getting copied for destination (variant of existing cases). I am trying to ensure thatif (isArchive) {
works as expected when the file actually results in unpacking the file
The isArchive logic does get triggered in this case — the test has been updated to cover that scenario accordingly.
Thank you for your suggestion.
26b6316
to
818ff74
Compare
…rking directory after they are downloaded.
When you have a moment, could you please take another look at this PR? Thanks! |
…rking directory after they are downloaded.
What changes were proposed in this pull request?
This PR fixes a bug where submitting a Spark job using the --files option and also calling SparkContext.addFile() for a file with the same name causes Spark to throw an exception
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: File a.text was already registered with a different path (old path = /tmp/spark-6aa5129d-5bbb-464a-9e50-5b6ffe364ffb/a.text, new path = /opt/spark/work-dir/a.text
Why are the changes needed?
bin/spark-submit --files s3://bucket/a.text --class testDemo app.jar
sc.addFile("a.text", true)
This works correctly in YARN mode, but throws an error in Kubernetes mode.
After SPARK-33782, in Kubernetes mode, --files, --jars, --archiveFiles, and --pyFiles are all downloaded to the working directory.
However, in the code, args.files = filesLocalFiles, and filesLocalFiles refers to a temporary download path, not the working directory.
This causes issues when user code like testDemo calls sc.addFile("a.text", true), resulting in an error such as:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: File a.text was already registered with a different path (old path = /tmp/spark-6aa5129d-5bbb-464a-9e50-5b6ffe364ffb/a.text, new path = /opt/spark/work-dir/a.text
Does this PR introduce any user-facing change?
This issue can be resolved after this PR.
How was this patch tested?
Existed UT
Was this patch authored or co-authored using generative AI tooling?
no