Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21945][YARN][PYTHON] Make --py-files work with PySpark shell in Yarn client mode #21267

Closed
wants to merge 3 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented May 8, 2018

What changes were proposed in this pull request?

Problem

When we run PySpark shell with Yarn client mode, specified --py-files are not recognised in driver side.

Here are the steps I took to check:

$ cat /home/spark/tmp.py
def testtest():
    return 1
$ ./bin/pyspark --master yarn --deploy-mode client --py-files /home/spark/tmp.py
>>> def test():
...     import tmp
...     return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()  # executor side
[1]
>>> test()  # driver side
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in test
ImportError: No module named tmp

How did it happen?

Unlike Yarn cluster and client mode with Spark submit, when Yarn client mode with PySpark shell specifically,

  1. It first runs Python shell via:

as pointed out by @tgravescs in the JIRA.

  1. this triggers shell.py and submit another application to launch a py4j gateway:

SPARK_HOME = _find_spark_home()
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
command = [os.path.join(SPARK_HOME, script)]
if conf:
for k, v in conf.getAll():
command += ['--conf', '%s=%s' % (k, v)]
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
if os.environ.get("SPARK_TESTING"):
submit_args = ' '.join([
"--conf spark.ui.enabled=false",
submit_args
])
command = command + shlex.split(submit_args)

  1. it runs a Py4J gateway:

args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"

  1. it copies (or downloads) --py-files into local temp directory:

var localPyFiles: String = null
if (deployMode == CLIENT) {
localPrimaryResource = Option(args.primaryResource).map {
downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
localJars = Option(args.jars).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
localPyFiles = Option(args.pyFiles).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
}

and then these files are set up to spark.submit.pyFiles

  1. Py4J JVM is launched and then the Python paths are set via:

# Deploy code dependencies set by spark-submit; these will already have been added
# with SparkContext.addFile, so we just need to add them to the PYTHONPATH
for path in self._conf.get("spark.submit.pyFiles", "").split(","):
if path != "":
(dirname, filename) = os.path.split(path)
if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
self._python_includes.append(filename)
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))

However, these are not actually set because those files were copied into a tmp directory in 4. whereas this code path looks for SparkFiles.getRootDirectory where the files are stored only when SparkContext.addFile() is called.

In other cluster mode, spark.files are set via:

OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.files"),

and those files are explicitly added via:

_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))

So we are fine in other modes.

In case of Yarn client and cluster with submit, these are manually being handled. In particular #6360 added most of the logics. In this case, the Python path looks manually set via, for example, deploy.PythonRunner. We don't use spark.files here.

How does the PR fix the problem?

I tried to make an isolated approach as possible as I can: simply copy py file or zip files into SparkFiles.getRootDirectory() in driver side if not existing. Another possible way is to set spark.files but it does unnecessary stuff together and sounds a bit invasive.

Before

>>> def test():
...     import tmp
...     return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()
[1]
>>> test()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in test
ImportError: No module named tmp

After

>>> def test():
...     import tmp
...     return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()
[1]
>>> test()
1

How was this patch tested?

I manually tested in standalone and yarn cluster with PySpark shell. .zip and .py files were also tested with the similar steps above. It's difficult to add a test.

@HyukjinKwon
Copy link
Member Author

cc @vanzin, @jerryshao and @tgravescs, could you take a look and see if it makes sense please?

@HyukjinKwon HyukjinKwon changed the title [SPARK-21945][YARN][PYTHON] Make --py-files work in PySpark shell in Yarn client mode [SPARK-21945][YARN][PYTHON] Make --py-files work with PySpark shell in Yarn client mode May 8, 2018
sys.path.insert(1, filepath)
except Exception as e:
from pyspark import util
warnings.warn(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log was also tested manually:

.../python/pyspark/context.py:230: RuntimeWarning: Python file [/home/spark/tmp.py] specified in 'spark.submit.pyFiles' failed to be added in the Python path, excluding this in the Python path.
  : ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this should now be safer in any case since we now don't put non-existent files and print out warnings.

@SparkQA
Copy link

SparkQA commented May 8, 2018

Test build #90364 has finished for PR 21267 at commit 68be3ba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

# not added via SparkContext.addFile. Here we check if the file exists,
# try to copy and then add it to the path. See SPARK-21945.
shutil.copyfile(path, filepath)
if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I missing anything? Looks like PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar'). So .py seems not in that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the root is added into the path above. .py file needs its parent directory ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see.

@jerryshao
Copy link
Contributor

Does it only happen in yarn client PySpark shell? I would suggest to fix this in the SparkSubmit side, to treat this as a special case and set the proper config.

@HyukjinKwon
Copy link
Member Author

Yea, this is specific to yarn client PySpark shell. In case of yarn client and cluster with submit, they are specially handled via #6360 but I think PySpark shell in yarn client mode was missed out. The way of launching it is diverted if I understood correctly.

@HyukjinKwon
Copy link
Member Author

(I have tried to explain why it's specific to PySpark shell with Yarn client mode in PR description)

@HyukjinKwon
Copy link
Member Author

Will try to put this into SparkSubmit.

@HyukjinKwon
Copy link
Member Author

Hm .. @jerryshao, seems it's a bit difficult to do so. The simplest way should be just to copy files into the directories in SparkFiles.getRootDirectory; however, SparkEnv is inaccessible at this stage in SparkSubmit ..

Another way might be to find if there's a way by setting spark.files so that they are added via addFile later which put the file in SparkFiles.getRootDirectory at driver side too but .. I wonder if it makes sense to set this which Yarn doesn't use.

# In case of YARN with shell mode, 'spark.submit.pyFiles' files are
# not added via SparkContext.addFile. Here we check if the file exists,
# try to copy and then add it to the path. See SPARK-21945.
shutil.copyfile(path, filepath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this copy necessary? Couldn't you just add path to sys.path (instead of adding filepath) and that would solve the problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's the initial approach I tried. thing is, .py file in the configuration. it needs its parent directory (not .py file itself) and it would add other .py files too if there are in the directort.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For file types in PACKAGE_EXTENSIONS, do we need to copy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so but that's already being done in other cluster / client modes. The copies are made via addFile in other modes but it's not being copied in this case specifically. I think we should better consistently copy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are 'spark.submit.pyFiles' files only missing on driver side? I mean, if they are not added by SparkContext.addFile, shouldn't they also be missing on executors?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's only missing on driver side in this mode specifically. Yarn doesn't add it since spark.files is not set if I understood correctly. They are specially handled in case of submit but shell case seems missing.

I described a bit in the PR description too.

In case of Yarn client and cluster with submit, these are manually being handled. In particular #6360 added most of the logics. In this case, the Python path looks manually set via, for example, deploy.PythonRunner. We don't use spark.files here.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented May 14, 2018

Test build #90565 has finished for PR 21267 at commit 68be3ba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

except Exception as e:
from pyspark import util
warnings.warn(
"Python file [%s] specified in 'spark.submit.pyFiles' failed "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify this message?

"Failed to add file [%s] speficied in 'spark.submit.pyFiles' to Python path:\n %s"

@vanzin
Copy link
Contributor

vanzin commented May 14, 2018

Looks good aside from the log message.

sys.path.insert(1, filepath)
except Exception:
from pyspark import util
warnings.warn(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, I checked the warning manually:

.../pyspark/context.py:229: RuntimeWarning: Failed to add file [/home/spark/tmp.py] speficied in 'spark.submit.pyFiles' to Python path:

...
  /usr/lib64/python27.zip
  /usr/lib64/python2.7
... 

@SparkQA
Copy link

SparkQA commented May 15, 2018

Test build #90614 has finished for PR 21267 at commit b9e312e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 15, 2018

Test build #90616 has finished for PR 21267 at commit ef3555e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented May 17, 2018

Test build #90704 has finished for PR 21267 at commit ef3555e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Merged to master.

@asfgit asfgit closed this in 9a641e7 May 17, 2018
asfgit pushed a commit that referenced this pull request May 17, 2018
…n Yarn client mode

When we run _PySpark shell with Yarn client mode_, specified `--py-files` are not recognised in _driver side_.

Here are the steps I took to check:

```bash
$ cat /home/spark/tmp.py
def testtest():
    return 1
```

```bash
$ ./bin/pyspark --master yarn --deploy-mode client --py-files /home/spark/tmp.py
```

```python
>>> def test():
...     import tmp
...     return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()  # executor side
[1]
>>> test()  # driver side
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in test
ImportError: No module named tmp
```

Unlike Yarn cluster and client mode with Spark submit, when Yarn client mode with PySpark shell specifically,

1. It first runs Python shell via:

https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L158 as pointed out by tgravescs in the JIRA.

2. this triggers shell.py and submit another application to launch a py4j gateway:

https://github.com/apache/spark/blob/209b9361ac8a4410ff797cff1115e1888e2f7e66/python/pyspark/java_gateway.py#L45-L60

3. it runs a Py4J gateway:

https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L425

4. it copies (or downloads) --py-files  into local temp directory:

https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L365-L376

and then these files are set up to `spark.submit.pyFiles`

5. Py4J JVM is launched and then the Python paths are set via:

https://github.com/apache/spark/blob/7013eea11cb32b1e0038dc751c485da5c94a484b/python/pyspark/context.py#L209-L216

However, these are not actually set because those files were copied into a tmp directory in 4. whereas this code path looks for `SparkFiles.getRootDirectory` where the files are stored only when `SparkContext.addFile()` is called.

In other cluster mode, `spark.files` are set via:

https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L554-L555

and those files are explicitly added via:

https://github.com/apache/spark/blob/ecb8b383af1cf1b67f3111c148229e00c9c17c40/core/src/main/scala/org/apache/spark/SparkContext.scala#L395

So we are fine in other modes.

In case of Yarn client and cluster with _submit_, these are manually being handled. In particular #6360 added most of the logics. In this case, the Python path looks manually set via, for example, `deploy.PythonRunner`. We don't use `spark.files` here.

I tried to make an isolated approach as possible as I can: simply copy py file or zip files into `SparkFiles.getRootDirectory()` in driver side if not existing. Another possible way is to set `spark.files` but it does unnecessary stuff together and sounds a bit invasive.

**Before**

```python
>>> def test():
...     import tmp
...     return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()
[1]
>>> test()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in test
ImportError: No module named tmp
```

**After**

```python
>>> def test():
...     import tmp
...     return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()
[1]
>>> test()
1
```

I manually tested in standalone and yarn cluster with PySpark shell. .zip and .py files were also tested with the similar steps above. It's difficult to add a test.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21267 from HyukjinKwon/SPARK-21945.

(cherry picked from commit 9a641e7)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
@HyukjinKwon HyukjinKwon deleted the SPARK-21945 branch October 16, 2018 12:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants