Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41127][CONNECT][PYTHON] Implement DataFrame.CreateGlobalView in Python client #38642

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions python/pyspark/sql/connect/client.py
Expand Up @@ -400,6 +400,13 @@ def schema(self, plan: pb2.Plan) -> StructType:
def explain_string(self, plan: pb2.Plan) -> str:
return self._analyze(plan).explain_string

def execute_command(self, command: pb2.Command) -> None:
req = pb2.Request()
if self._user_id:
req.user_context.user_id = self._user_id
req.plan.command.CopyFrom(command)
self._execute_and_fetch(req)

def _analyze(self, plan: pb2.Plan) -> AnalyzeResult:
req = pb2.Request()
if self._user_id:
Expand Down
42 changes: 42 additions & 0 deletions python/pyspark/sql/connect/dataframe.py
Expand Up @@ -633,6 +633,48 @@ def explain(self) -> str:
else:
return ""

def createGlobalTempView(self, name: str) -> None:
"""Creates a global temporary view with this :class:`DataFrame`.

The lifetime of this temporary view is tied to this Spark application.

.. versionadded:: 3.4.0

Parameters
----------
name : str
Name of the view.

Returns
-------
None
amaliujia marked this conversation as resolved.
Show resolved Hide resolved
"""
command = plan.CreateView(
child=self._plan, name=name, is_global=True, replace=False
).command(session=self._session)
self._session.execute_command(command)

def createOrReplaceGlobalTempView(self, name: str) -> None:
"""Creates or replaces a global temporary view using the given name.

The lifetime of this temporary view is tied to this Spark application.

.. versionadded:: 3.4.0

Parameters
----------
name : str
Name of the view.

Returns
-------
None
amaliujia marked this conversation as resolved.
Show resolved Hide resolved
"""
command = plan.CreateView(
child=self._plan, name=name, is_global=True, replace=True
).command(session=self._session)
self._session.execute_command(command)


class DataFrameStatFunctions:
"""Functionality for statistic functions with :class:`DataFrame`.
Expand Down
45 changes: 45 additions & 0 deletions python/pyspark/sql/connect/plan.py
Expand Up @@ -69,6 +69,9 @@ def to_attr_or_expression(
def plan(self, session: "RemoteSparkSession") -> proto.Relation:
...

def command(self, session: "RemoteSparkSession") -> proto.Command:
...

def _verify(self, session: "RemoteSparkSession") -> bool:
"""This method is used to verify that the current logical plan
can be serialized to Proto and back and afterwards is identical."""
Expand Down Expand Up @@ -862,3 +865,45 @@ def _repr_html_(self) -> str:
</li>
</ul>
"""


class CreateView(LogicalPlan):
def __init__(
self, child: Optional["LogicalPlan"], name: str, is_global: bool, replace: bool
) -> None:
super().__init__(child)
self._name = name
self._is_gloal = is_global
self._replace = replace

def command(self, session: "RemoteSparkSession") -> proto.Command:
assert self._child is not None

plan = proto.Command()
plan.create_dataframe_view.replace = self._replace
plan.create_dataframe_view.is_global = self._is_gloal
plan.create_dataframe_view.name = self._name
plan.create_dataframe_view.input.CopyFrom(self._child.plan(session))
return plan

def print(self, indent: int = 0) -> str:
i = " " * indent
return (
f"{i}"
f"<CreateView name='{self._name}' "
f"is_global='{self._is_gloal} "
f"replace='{self._replace}'>"
)

def _repr_html_(self) -> str:
return f"""
<ul>
<li>
<b>CreateView</b><br />
name: {self._name} <br />
is_global: {self._is_gloal} <br />
replace: {self._replace} <br />
{self._child_repr_()}
</li>
</ul>
"""
17 changes: 15 additions & 2 deletions python/pyspark/sql/tests/connect/test_connect_basic.py
Expand Up @@ -20,8 +20,9 @@
import tempfile

import grpc # type: ignore
from grpc._channel import _MultiThreadedRendezvous # type: ignore

from pyspark.testing.sqlutils import have_pandas
from pyspark.testing.sqlutils import have_pandas, SQLTestUtils

if have_pandas:
import pandas
Expand All @@ -39,7 +40,7 @@


@unittest.skipIf(not should_test_connect, connect_requirement_message)
class SparkConnectSQLTestCase(ReusedPySparkTestCase):
class SparkConnectSQLTestCase(ReusedPySparkTestCase, SQLTestUtils):
"""Parent test fixture class for all Spark Connect related
test cases."""

Expand Down Expand Up @@ -207,6 +208,18 @@ def test_range(self):
.equals(self.spark.range(start=0, end=10, step=3, numPartitions=2).toPandas())
)

def test_create_global_temp_view(self):
# SPARK-41127: test global temp view creation.
with self.tempView("view_1"):
self.connect.sql("SELECT 1 AS X LIMIT 0").createGlobalTempView("view_1")
self.connect.sql("SELECT 2 AS X LIMIT 1").createOrReplaceGlobalTempView("view_1")
self.assertTrue(self.spark.catalog.tableExists("global_temp.view_1"))

# Test when creating a view which is alreayd exists but
self.assertTrue(self.spark.catalog.tableExists("global_temp.view_1"))
with self.assertRaises(_MultiThreadedRendezvous):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @zhengruifeng @HyukjinKwon

I found this is what we have now when there is an exception happened during a RPC call.

I guess we need to improve it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_MultiThreadedRendezvous is raised from gRPC channel when gRPC finds response state code is not OK (thus some error has happened).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, right now, the client does not do any error handling, the exception simply bubbles up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just check for grpc.RpcError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@grundprinzip yes! It is much better to use grpc.RpcError for readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually we should also use error classes in client side (based on the grpc error to map to error classes by a way). Right now we only have this.

self.connect.sql("SELECT 1 AS X LIMIT 0").createGlobalTempView("view_1")

def test_empty_dataset(self):
# SPARK-41005: Test arrow based collection with empty dataset.
self.assertTrue(
Expand Down