From 80bba5ead0601f3ef4b05fff5391d07a61e06341 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 3 Jan 2017 03:06:21 +0000 Subject: [PATCH 1/3] Fix SparkSession initialization when previous SparkContext is stopped and new SparkContext is created. --- python/pyspark/sql/session.py | 8 +++++++- python/pyspark/sql/tests.py | 23 +++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 1e40b9c39fc4f..73801d5c1f524 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -162,7 +162,7 @@ def getOrCreate(self): from pyspark.context import SparkContext from pyspark.conf import SparkConf session = SparkSession._instantiatedContext - if session is None: + if session is None or session._sc._jsc is None: sparkConf = SparkConf() for key, value in self._options.items(): sparkConf.set(key, value) @@ -216,6 +216,12 @@ def __init__(self, sparkContext, jsparkSession=None): install_exception_handler() if SparkSession._instantiatedContext is None: SparkSession._instantiatedContext = self + else: + # If we had an instantiated SparkSession attached with a SparkContext + # which is stopped now, we need to renew the instantiated SparkSession. + # Otherwise, we will use invalid SparkSession when we call Builder.getOrCreate. + if SparkSession._instantiatedContext._sc._jsc is None: + SparkSession._instantiatedContext = self @since(2.0) def newSession(self): diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 18fd68ec5ef5d..d1782857e6d0d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -46,6 +46,7 @@ else: import unittest +from pyspark import SparkContext from pyspark.sql import SparkSession, HiveContext, Column, Row from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type @@ -1886,6 +1887,28 @@ def test_hivecontext(self): self.assertTrue(os.path.exists(metastore_path)) +class SQLTests2(ReusedPySparkTestCase): + + @classmethod + def setUpClass(cls): + ReusedPySparkTestCase.setUpClass() + cls.spark = SparkSession(cls.sc) + + @classmethod + def tearDownClass(cls): + ReusedPySparkTestCase.tearDownClass() + cls.spark.stop() + + # We can't include this test into SQLTests because we will stop class's SparkContext and cause + # other tests failed. + def test_sparksession_with_stopped_sparkcontext(self): + self.sc.stop() + sc = SparkContext('local[4]', self.sc.appName) + spark = SparkSession.builder.getOrCreate() + df = spark.createDataFrame([(1, 2)], ["c", "c"]) + df.collect() + + class HiveContextSQLTests(ReusedPySparkTestCase): @classmethod From a22a08fd8655862fbbadc7b376b1a0ad8acfaadb Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 6 Jan 2017 04:28:36 +0000 Subject: [PATCH 2/3] Rename SparkSession._instantiatedContext to SparkSession._instantiatedSession. --- python/pyspark/sql/session.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 73801d5c1f524..1e6822ec2ebda 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -161,7 +161,7 @@ def getOrCreate(self): with self._lock: from pyspark.context import SparkContext from pyspark.conf import SparkConf - session = SparkSession._instantiatedContext + session = SparkSession._instantiatedSession if session is None or session._sc._jsc is None: sparkConf = SparkConf() for key, value in self._options.items(): @@ -183,7 +183,7 @@ def getOrCreate(self): builder = Builder() - _instantiatedContext = None + _instantiatedSession = None @ignore_unicode_prefix def __init__(self, sparkContext, jsparkSession=None): @@ -214,14 +214,14 @@ def __init__(self, sparkContext, jsparkSession=None): self._wrapped = SQLContext(self._sc, self, self._jwrapped) _monkey_patch_RDD(self) install_exception_handler() - if SparkSession._instantiatedContext is None: - SparkSession._instantiatedContext = self + if SparkSession._instantiatedSession is None: + SparkSession._instantiatedSession = self else: # If we had an instantiated SparkSession attached with a SparkContext # which is stopped now, we need to renew the instantiated SparkSession. # Otherwise, we will use invalid SparkSession when we call Builder.getOrCreate. - if SparkSession._instantiatedContext._sc._jsc is None: - SparkSession._instantiatedContext = self + if SparkSession._instantiatedSession._sc._jsc is None: + SparkSession._instantiatedSession = self @since(2.0) def newSession(self): @@ -601,7 +601,7 @@ def stop(self): """Stop the underlying :class:`SparkContext`. """ self._sc.stop() - SparkSession._instantiatedContext = None + SparkSession._instantiatedSession = None @since(2.0) def __enter__(self): From 6078c7dd37152301859d31789ca571a6be92f4eb Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 6 Jan 2017 13:24:15 +0000 Subject: [PATCH 3/3] Simplify the condition. --- python/pyspark/sql/session.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 1e6822ec2ebda..9f4772eec9f2a 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -214,14 +214,12 @@ def __init__(self, sparkContext, jsparkSession=None): self._wrapped = SQLContext(self._sc, self, self._jwrapped) _monkey_patch_RDD(self) install_exception_handler() - if SparkSession._instantiatedSession is None: + # If we had an instantiated SparkSession attached with a SparkContext + # which is stopped now, we need to renew the instantiated SparkSession. + # Otherwise, we will use invalid SparkSession when we call Builder.getOrCreate. + if SparkSession._instantiatedSession is None \ + or SparkSession._instantiatedSession._sc._jsc is None: SparkSession._instantiatedSession = self - else: - # If we had an instantiated SparkSession attached with a SparkContext - # which is stopped now, we need to renew the instantiated SparkSession. - # Otherwise, we will use invalid SparkSession when we call Builder.getOrCreate. - if SparkSession._instantiatedSession._sc._jsc is None: - SparkSession._instantiatedSession = self @since(2.0) def newSession(self):