-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17387][PYSPARK] Creating SparkContext() from python without spark-submit ignores user conf #14959
Conversation
Test build #64933 has finished for PR 14959 at commit
|
Test build #64935 has finished for PR 14959 at commit
|
Test build #64937 has finished for PR 14959 at commit
|
@@ -101,13 +101,31 @@ def __init__(self, loadDefaults=True, _jvm=None, _jconf=None): | |||
self._jconf = _jconf | |||
else: | |||
from pyspark.context import SparkContext | |||
SparkContext._ensure_initialized() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought, would it be possible to just add the --conf
args to the env variable before this is called if the jvm hasn't been created yet? Then you wouldn't need to propagate the confs all the way through to launch_gateway
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to set env variable PYSPARK_SUBMIT_ARGS ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, this wouldn't even be possible here. I didn't realize that SparkConf
can be created first, launching the jvm, and then configuration is added after.
The conf code looks kinda nasty with the checks for whether a JVM has been set or not... I guess part of it is mandatory because otherwise this wouldn't work, but in particular, I'm not so sure the If you just say |
Also someone else more familiar with pyspark (I know Holden has already looked), maybe @davies, should take a look. |
@vanzin Thanks for your reviews. I just update the PR, but don't get your following statement mean. Can you explain it ? Thanks
|
Test build #65551 has finished for PR 14959 at commit
|
It means that if you do this:
The internal SparkConf of the context will not be the same instance as |
@@ -50,13 +50,18 @@ def launch_gateway(): | |||
# proper classpath and settings from spark-env.sh | |||
on_windows = platform.system() == "Windows" | |||
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit" | |||
command = [os.path.join(SPARK_HOME, script)] | |||
if conf and conf.getAll(): | |||
conf_items = [['--conf', '%s=%s' % (k, v)] for k, v in conf.getAll()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No point in using list comprehension if it requires more code...
for k, v in conf.getAll():
command += ['--conf', '%s=%s' % (k, v)]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, will fix it.
@vanzin This is the existing implementation that python is different from scala. But I think it is correct. I guess the reason why in scala the internal SparkConf of the SparkContext is not the same instance as conf is to make sure changing SparkConf after SparkContext is created would not take effect. pyspark is the same in this perspective. Although in pyspark the internal SparkConf of SparkContext is the same instance as conf, changing conf after SparkContext is created would not take effect as it is guaranteed in scala side. |
Test build #65639 has finished for PR 14959 at commit
|
Test build #65641 has finished for PR 14959 at commit
|
Test build #65649 has finished for PR 14959 at commit
|
Are these tests flaky or is the failure related to this change? (Other PRs seem to be passing, so probably the latter?) |
Test build #65699 has finished for PR 14959 at commit
|
LGTM. Let's see if others have any comments. |
self._jconf.set(key, unicode(value)) | ||
else: | ||
# Don't use unicode for self._conf, otherwise we will get exception when launching jvm. | ||
self._conf[key] = value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you just cast the value to unicode here also? Then it would be consistent with the Java class and you wouldn't need to change the doctest above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned in the comment, unicode would cause exception when launching jvm using spark-submit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, I didn't get any exception. Here is what I tried
$ SPARK_HOME=$PWD PYTHONPATH=python:python/lib/py4j-0.10.3-src.zip python
Python 2.7.12 (default, Jul 1 2016, 15:12:24)
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from pyspark import SparkContext
>>> from pyspark import SparkConf
>>> conf = SparkConf().set("spark.driver.memory", "4g")
>>> sc = SparkContext(conf=conf)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/09/26 16:53:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/09/26 16:53:58 WARN Utils: Your hostname, resolves to a loopback address: 127.0.1.1; using *.*.*.* instead (on interface )
16/09/26 16:53:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
>>> conf.get("spark.driver.memory")
u'4g'
>>> sc.getConf().get("spark.driver.memory")
u'4g'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you hit this line if you are using spark-submit? Maybe I'm missing something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, It might be my last previous commits' issue.
|
||
def set(self, key, value): | ||
"""Set a configuration property.""" | ||
self._jconf.set(key, unicode(value)) | ||
# Try to set self._jconf first if JVM is created, set self._conf if JVM is not created yet. | ||
if self._jconf: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: I think it's generally better and would be more consistent with the rest of the code if you made these checks against a None
value, here and other places in this PR. For example:
if self._jconf is not None:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
pairs.append((elem._1(), elem._2())) | ||
else: | ||
for k, v in self._conf.items(): | ||
pairs.append((k, v)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be simplified:
if self._jconf is not None:
return [(elem._1(), elem._2()) for elem in self._jconf.getAll()]
else:
return self._conf.items()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
if self._jconf: | ||
return self._jconf.toDebugString() | ||
else: | ||
return '\n'.join('%s=%s' % (k, v) for k, v in self._conf.items()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a unit test to make sure the 2 ways to make a debug string are the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They may be different, because _jconf has the extra configuration in jvm side (like spark-defaults.conf), while self._conf only has the configuration in python side.
@@ -41,7 +41,7 @@ def can_convert_list(self, obj): | |||
ListConverter.can_convert = can_convert_list | |||
|
|||
|
|||
def launch_gateway(): | |||
def launch_gateway(conf=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be helpful to have a docstring with param description of conf
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
return self | ||
|
||
def setExecutorEnv(self, key=None, value=None, pairs=None): | ||
"""Set an environment variable to be passed to executors.""" | ||
if (key is not None and pairs is not None) or (key is None and pairs is None): | ||
raise Exception("Either pass one key-value pair or a list of pairs") | ||
elif key is not None: | ||
self._jconf.setExecutorEnv(key, value) | ||
self.set("spark.executorEnv." + key, value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since you duplicated the property prefix "spark.executorEnv." from the Scala file, it might be good to make a unit test that ensures calling setExecutorEnv
in both Python and Scala/Java actually sets a property with the same prefix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
I added a few comments @zjffdu. I also tested this PR out and looks good |
@zjffdu could you update the PR? Thanks! |
I'd also love to see this updated so we can finish the review and make it easier for people to use PySpark this way :) |
Test build #66587 has finished for PR 14959 at commit
|
Test build #66588 has finished for PR 14959 at commit
|
@vanzin @holdenk @BryanCutler PR is updated, please help review. |
Awesome, thanks for updating. I'm at PyData this weekend so will be a bit slow on my end. |
@@ -121,7 +121,15 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, | |||
def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, | |||
conf, jsc, profiler_cls): | |||
self.environment = environment or {} | |||
self._conf = conf or SparkConf(_jvm=self._jvm) | |||
# java gateway must have been launched at this point. | |||
if conf is not None and conf._jconf: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe also change this to and conf._jconf is not None
if self._jconf is not None: | ||
self._jconf.set(key, unicode(value)) | ||
else: | ||
# Don't use unicode for self._conf, otherwise we will get exception when launching jvm. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can remove this comment now
Thanks for the updates @zjffdu, just 2 minor comments otherwise LGTM! |
Test build #66700 has finished for PR 14959 at commit
|
retest this please |
Test build #66758 has finished for PR 14959 at commit
|
LGTM, merging to master. |
…ark-submit ignores user conf ## What changes were proposed in this pull request? The root cause that we would ignore SparkConf when launching JVM is that SparkConf require JVM to be created first. https://github.com/apache/spark/blob/master/python/pyspark/conf.py#L106 In this PR, I would defer the launching of JVM until SparkContext is created so that we can pass SparkConf to JVM correctly. ## How was this patch tested? Use the example code in the description of SPARK-17387, ``` $ SPARK_HOME=$PWD PYTHONPATH=python:python/lib/py4j-0.10.3-src.zip python Python 2.7.12 (default, Jul 1 2016, 15:12:24) [GCC 5.4.0 20160609] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> from pyspark import SparkContext >>> from pyspark import SparkConf >>> conf = SparkConf().set("spark.driver.memory", "4g") >>> sc = SparkContext(conf=conf) ``` And verify the spark.driver.memory is correctly picked up. ``` ...op/ -Xmx4g org.apache.spark.deploy.SparkSubmit --conf spark.driver.memory=4g pyspark-shell ``` Author: Jeff Zhang <zjffdu@apache.org> Closes apache#14959 from zjffdu/SPARK-17387.
What changes were proposed in this pull request?
The root cause that we would ignore SparkConf when launching JVM is that SparkConf require JVM to be created first. https://github.com/apache/spark/blob/master/python/pyspark/conf.py#L106
In this PR, I would defer the launching of JVM until SparkContext is created so that we can pass SparkConf to JVM correctly.
How was this patch tested?
Use the example code in the description of SPARK-17387,
And verify the spark.driver.memory is correctly picked up.