Skip to content

Commit

Permalink
[SPARK-19307][PYSPARK] Make sure user conf is propagated to SparkCont…
Browse files Browse the repository at this point in the history
…ext.

The code was failing to propagate the user conf in the case where the
JVM was already initialized, which happens when a user submits a
python script via spark-submit.

Tested with new unit test and by running a python script in a real cluster.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#16682 from vanzin/SPARK-19307.
  • Loading branch information
Marcelo Vanzin authored and cmonkey committed Feb 15, 2017
1 parent 1307432 commit 41e1a54
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
3 changes: 3 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self._conf = conf
else:
self._conf = SparkConf(_jvm=SparkContext._jvm)
if conf is not None:
for k, v in conf.getAll():
self._conf.set(k, v)

self._batchSize = batchSize # -1 represents an unlimited batch size
self._unbatched_serializer = serializer
Expand Down
20 changes: 20 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2035,6 +2035,26 @@ def test_single_script_on_cluster(self):
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 4, 6]", out.decode('utf-8'))

def test_user_configuration(self):
"""Make sure user configuration is respected (SPARK-19307)"""
script = self.createTempFile("test.py", """
|from pyspark import SparkConf, SparkContext
|
|conf = SparkConf().set("spark.test_config", "1")
|sc = SparkContext(conf = conf)
|try:
| if sc._conf.get("spark.test_config") != "1":
| raise Exception("Cannot find spark.test_config in SparkContext's conf.")
|finally:
| sc.stop()
""")
proc = subprocess.Popen(
[self.sparkSubmit, "--master", "local", script],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode, msg="Process failed with error:\n {0}".format(out))


class ContextTests(unittest.TestCase):

Expand Down

0 comments on commit 41e1a54

Please sign in to comment.