New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-38121][PYTHON][SQL] Use SparkSession instead of SQLContext inside PySpark #35410
Conversation
@@ -104,9 +104,24 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): | |||
.. versionadded:: 1.3.0 | |||
""" | |||
|
|||
def __init__(self, jdf: JavaObject, sql_ctx: "SQLContext"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the main change.
69e64ce
to
b76a0bc
Compare
dc84bec
to
58eec26
Compare
58eec26
to
30dddf3
Compare
Still WIP ... |
e4deaad
to
251fd60
Compare
@property | ||
def _jconf(self) -> SparkConf: | ||
"""Accessor for the JVM SQL-specific configurations""" | ||
return self._jsparkSession.sessionState().conf() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should get this from SQLConf.get to respect the current SparkSession
. I will do it in a separate PR.
64d5b11
to
abf1fff
Compare
def _sparkContext(self, sc: SparkContext) -> "SparkSession.Builder": | ||
with self._lock: | ||
self._sc = sc | ||
return self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only have one Spark context running globally now. It doesn't make sense to make it able to set Spark context anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this func is private
I will do some manual tests to make sure but I believe this is ready for a review. Adding @viirya @ueshin @BryanCutler in case you guys find some time to review. |
@@ -718,10 +715,10 @@ def __init__(self, sparkContext: SparkContext, jhiveContext: Optional[JavaObject | |||
sparkContext._conf.set( # type: ignore[attr-defined] | |||
"spark.sql.catalogImplementation", "hive" | |||
) | |||
sparkSession = SparkSession.builder._sparkContext(sparkContext).getOrCreate() | |||
else: | |||
sparkSession = SparkSession(sparkContext, jhiveContext.sparkSession()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.sql.catalogImplementation
is a static configuration that can only be set for the root parent Spark session. Therefore, we can just get the any active child Spark session. It doesn't have to be the Spark session in HiveContext
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a quick pass, I think this is a good idea to be more consistent with the rest of Spark. LGTM.
a6d3e04
to
c929b14
Compare
Thanks @BryanCutler. I made a bit of more cleanups here while checking line by line with manual tests. |
c929b14
to
00203a0
Compare
return cls._get_or_create(sc) | ||
|
||
@classmethod | ||
def _get_or_create(cls: Type["SQLContext"], sc: SparkContext) -> "SQLContext": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the above getOrCreate
is deprecated for a while, why adding a new _get_or_create
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually called _get_or_create
in shell.py
... to avoid unnecessary warning in PySpark shell.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks okay. SQLContext
is deprecated for a while but we haven't cleaned its usage in PySpark yet.
Will merge this in few days after double checking one more time ... if we're all fine with it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typing notes aside, LGTM.
00203a0
to
e49bf49
Compare
Thanks @zero323. I addressed your comments about type hints. I will take another look before merging this in. |
Merged to master. |
@JoshRosen when you find some time, mind taking a look as a post-review? Just wanted to make sure I haven't missed anything. |
Thanks all for reviewing this. |
else: | ||
sparkSession = SparkSession(sparkContext, jhiveContext.sparkSession()) | ||
SQLContext.__init__(self, sparkContext, sparkSession, jhiveContext) | ||
static_conf = {"spark.sql.catalogImplementation": "in-memory"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shoot. I pushed a wrong change in the last minute - I was manually testing if this value is correctly set or not. I has to be hive
. This uses an existing session that has hive
by default in tests so it doesn't affect the tests but would affect the production codes if users use HiveContext
directly without the existing Spark session.
Sorry this was my mistake. I will make a quick followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…ation' to 'hive' in HiveContext ### What changes were proposed in this pull request? This PR is a followup of #35410 to fix a mistake. `HiveContext` should set `spark.sql.catalogImplementation` to `hive` instead of `in-memory`. This PR also includes several changes: - Make `HiveContext.getOrCreate` works identically as `SQLContext.getOrCreate` - Match the signature of `HiveContext.__init__` and `SQLContext.__init__` (both are not supported to be directly called by users though). ### Why are the changes needed? See #35410 (comment) ### Does this PR introduce _any_ user-facing change? No to end users because this change has not been released out yet. It creates a non-Hive supported `SparkSession` if there isn't an existing SparkSession running. See also #35410 (comment). Nobody uses `HiveContext` directly anymore but it's better to keep the behaviours unchanged. ### How was this patch tested? Manually tested (in PySpark shell): ```python spark.stop() from pyspark import SparkContext from pyspark.sql import HiveContext HiveContext.getOrCreate(SparkContext.getOrCreate()).getConf("spark.sql.catalogImplementation") ``` **Before:** ```pyspark 'in-memory' ``` **After:** ```pyspark 'hive' ``` Closes #35517 from HyukjinKwon/SPARK-38121-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
from pyspark.sql.session import SparkSession | ||
|
||
if self._session is None: | ||
self._session = SparkSession._getActiveSessionOrCreate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like not correct. The current DataFrame's sparkSession
may not be the active session. Why not get the session from self.__sql_ctx
and assign it to self._session
in the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let me make a quick followup after double checking it (see #35575).
For a bit of clarification, this code path will be triggered when thirdparty libraries or external users directly create DataFrame(..., SQLContext)
so standard public APIs won't be affected. So the only case it gets affected is when a user manually creates a DataFrame(..., SQLContext)
, and they call this new API df.sparkSession
, which is It's pretty much less likely and rather minor especially given that 1. I don't think SQLContext
users would care about SparkSession
, 2. SQLContext
is deprecated many years ago, 3. DataFrame(...) constructor is subjected to be internal.
… session that created DataFrame when SQLContext is used ### What changes were proposed in this pull request? This PR is a followup of #35410 that makes `df.sparkSession` return the session that created DataFrame when `SQLContext` is used. ### Why are the changes needed? See #35410 (comment) ### Does this PR introduce _any_ user-facing change? No to end users as it's not released yet. See also #35410 (comment) ### How was this patch tested? ```python >>> from pyspark.sql import DataFrame >>> DataFrame(sqlContext.range(1)._jdf, sqlContext).sparkSession /.../spark/python/pyspark/sql/dataframe.py:127: UserWarning: DataFrame constructor is internal. Do not directly use it. warnings.warn("DataFrame constructor is internal. Do not directly use it.") <pyspark.sql.session.SparkSession object at 0x7fa7483972b0> ``` Closes #35575 from HyukjinKwon/SPARK-38121-followup2. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This PR proposes to
SparkSession
within PySpark. This is a base work for respecting runtime configurations, etc. Currently, we rely on old deprecatedSQLContext
internally that doesn't respect Spark session's runtime configurations correctly.This PR also contains related changes (and a bit of refactoring in the code this PR touches) as below:
DataFrame.sparkSession
like Scala API does.SQLContext._conf
->SparkSession._jconf
.rdd_array
todf_array
atDataFrame.randomSplit
.DataFrame.sql_ctx
andDataFrame(..., sql_ctx)
.Why are the changes needed?
df.sparkSession
)SQLContext
.Does this PR introduce any user-facing change?
Yes.
DataFrame.sql_ctx
andDataFrame(..., sql_ctx)
.DataFrame.sparkSession
How was this patch tested?
Existing test cases should cover them.