Skip to content

Commit

Permalink
[SPARK-25525][SQL][PYSPARK] Do not update conf for existing SparkCont…
Browse files Browse the repository at this point in the history
…ext in SparkSession.getOrCreate.

## What changes were proposed in this pull request?

In [SPARK-20946](https://issues.apache.org/jira/browse/SPARK-20946), we modified `SparkSession.getOrCreate` to not update conf for existing `SparkContext` because `SparkContext` is shared by all sessions.
We should not update it in PySpark side as well.

## How was this patch tested?

Added tests.

Closes apache#22545 from ueshin/issues/SPARK-25525/not_update_existing_conf.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
  • Loading branch information
ueshin authored and daspalrahul committed Sep 29, 2018
1 parent 220bd3c commit 6862bbb
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
14 changes: 4 additions & 10 deletions python/pyspark/sql/session.py
Expand Up @@ -156,7 +156,7 @@ def getOrCreate(self):
default.
>>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate()
>>> s1.conf.get("k1") == s1.sparkContext.getConf().get("k1") == "v1"
>>> s1.conf.get("k1") == "v1"
True
In case an existing SparkSession is returned, the config options specified
Expand All @@ -179,19 +179,13 @@ def getOrCreate(self):
sparkConf = SparkConf()
for key, value in self._options.items():
sparkConf.set(key, value)
sc = SparkContext.getOrCreate(sparkConf)
# This SparkContext may be an existing one.
for key, value in self._options.items():
# we need to propagate the confs
# before we create the SparkSession. Otherwise, confs like
# warehouse path and metastore url will not be set correctly (
# these confs cannot be changed once the SparkSession is created).
sc._conf.set(key, value)
sc = SparkContext.getOrCreate(sparkConf)
# Do not update `SparkConf` for existing `SparkContext`, as it's shared
# by all sessions.
session = SparkSession(sc)
for key, value in self._options.items():
session._jsparkSession.sessionState().conf().setConfString(key, value)
for key, value in self._options.items():
session.sparkContext._conf.set(key, value)
return session

builder = Builder()
Expand Down
46 changes: 45 additions & 1 deletion python/pyspark/sql/tests.py
Expand Up @@ -80,7 +80,7 @@
_have_pyarrow = _pyarrow_requirement_message is None
_test_compiled = _test_not_compiled_message is None

from pyspark import SparkContext
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext, HiveContext, Column, Row
from pyspark.sql.types import *
from pyspark.sql.types import UserDefinedType, _infer_type, _make_type_verifier
Expand Down Expand Up @@ -283,6 +283,50 @@ def test_invalid_create_row(self):
self.assertRaises(ValueError, lambda: row_class(1, 2, 3))


class SparkSessionBuilderTests(unittest.TestCase):

def test_create_spark_context_first_then_spark_session(self):
sc = None
session = None
try:
conf = SparkConf().set("key1", "value1")
sc = SparkContext('local[4]', "SessionBuilderTests", conf=conf)
session = SparkSession.builder.config("key2", "value2").getOrCreate()

self.assertEqual(session.conf.get("key1"), "value1")
self.assertEqual(session.conf.get("key2"), "value2")
self.assertEqual(session.sparkContext, sc)

self.assertFalse(sc.getConf().contains("key2"))
self.assertEqual(sc.getConf().get("key1"), "value1")
finally:
if session is not None:
session.stop()
if sc is not None:
sc.stop()

def test_another_spark_session(self):
session1 = None
session2 = None
try:
session1 = SparkSession.builder.config("key1", "value1").getOrCreate()
session2 = SparkSession.builder.config("key2", "value2").getOrCreate()

self.assertEqual(session1.conf.get("key1"), "value1")
self.assertEqual(session2.conf.get("key1"), "value1")
self.assertEqual(session1.conf.get("key2"), "value2")
self.assertEqual(session2.conf.get("key2"), "value2")
self.assertEqual(session1.sparkContext, session2.sparkContext)

self.assertEqual(session1.sparkContext.getConf().get("key1"), "value1")
self.assertFalse(session1.sparkContext.getConf().contains("key2"))
finally:
if session1 is not None:
session1.stop()
if session2 is not None:
session2.stop()


class SQLTests(ReusedSQLTestCase):

@classmethod
Expand Down

0 comments on commit 6862bbb

Please sign in to comment.