Skip to content

Commit

Permalink
[SPARK-40816][CONNECT][PYTHON] Rename LogicalPlan.collect to LogicalP…
Browse files Browse the repository at this point in the history
…lan.to_proto

### What changes were proposed in this pull request?

The `collect` method in `class LogicalPlan` is really to generate connect proto plan. It's confusing to use `collect` which overlaps with `collect` in dataframe API that returns materialized data.

This PR proposes to rename this method to `to_proto` to match its implementation.

### Why are the changes needed?

Improve codebase readability.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Closes #38279 from amaliujia/rename_logical_plan_collect2.

Authored-by: Rui Wang <rui.wang@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
amaliujia authored and HyukjinKwon committed Oct 19, 2022
1 parent 7af39b6 commit 646d716
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 10 deletions.
4 changes: 2 additions & 2 deletions python/pyspark/sql/connect/dataframe.py
Expand Up @@ -247,12 +247,12 @@ def toPandas(self) -> Optional["pandas.DataFrame"]:
raise Exception("Cannot collect on empty plan.")
if self._session is None:
raise Exception("Cannot collect on empty session.")
query = self._plan.collect(self._session)
query = self._plan.to_proto(self._session)
return self._session._to_pandas(query)

def explain(self) -> str:
if self._plan is not None:
query = self._plan.collect(self._session)
query = self._plan.to_proto(self._session)
if self._session is None:
raise Exception("Cannot analyze without RemoteSparkSession.")
return self._session.analyze(query).explain_string
Expand Down
12 changes: 11 additions & 1 deletion python/pyspark/sql/connect/plan.py
Expand Up @@ -80,9 +80,19 @@ def _verify(self, session: "RemoteSparkSession") -> bool:

return test_plan == plan

def collect(
def to_proto(
self, session: Optional["RemoteSparkSession"] = None, debug: bool = False
) -> proto.Plan:
"""
Generates connect proto plan based on this LogicalPlan.
Parameters
----------
session : :class:`RemoteSparkSession`, optional.
a session that connects remote spark cluster.
debug: bool
if enabled, the proto plan will be printed.
"""
plan = proto.Plan()
plan.root.CopyFrom(self.plan(session))

Expand Down
Expand Up @@ -46,7 +46,7 @@ def test_simple_column_expressions(self):
def test_column_literals(self):
df = c.DataFrame.withPlan(p.Read("table"))
lit_df = df.select(fun.lit(10))
self.assertIsNotNone(lit_df._plan.collect(None))
self.assertIsNotNone(lit_df._plan.to_proto(None))

self.assertIsNotNone(fun.lit(10).to_plan(None))
plan = fun.lit(10).to_plan(None)
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/sql/tests/connect/test_connect_plan_only.py
Expand Up @@ -27,13 +27,13 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
generation but do not call Spark."""

def test_simple_project(self):
plan = self.connect.readTable(table_name=self.tbl_name)._plan.collect(self.connect)
plan = self.connect.readTable(table_name=self.tbl_name)._plan.to_proto(self.connect)
self.assertIsNotNone(plan.root, "Root relation must be set")
self.assertIsNotNone(plan.root.read)

def test_filter(self):
df = self.connect.readTable(table_name=self.tbl_name)
plan = df.filter(df.col_name > 3)._plan.collect(self.connect)
plan = df.filter(df.col_name > 3)._plan.to_proto(self.connect)
self.assertIsNotNone(plan.root.filter)
self.assertTrue(
isinstance(
Expand All @@ -45,7 +45,7 @@ def test_filter(self):

def test_relation_alias(self):
df = self.connect.readTable(table_name=self.tbl_name)
plan = df.alias("table_alias")._plan.collect(self.connect)
plan = df.alias("table_alias")._plan.to_proto(self.connect)
self.assertEqual(plan.root.common.alias, "table_alias")

def test_simple_udf(self):
Expand All @@ -59,7 +59,7 @@ def test_simple_udf(self):
def test_all_the_plans(self):
df = self.connect.readTable(table_name=self.tbl_name)
df = df.select(df.col1).filter(df.col2 == 2).sort(df.col3.asc())
plan = df._plan.collect(self.connect)
plan = df._plan.to_proto(self.connect)
self.assertIsNotNone(plan.root, "Root relation must be set")
self.assertIsNotNone(plan.root.read)

Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/tests/connect/test_connect_select_ops.py
Expand Up @@ -24,7 +24,7 @@
class SparkConnectToProtoSuite(PlanOnlyTestFixture):
def test_select_with_literal(self):
df = DataFrame.withPlan(Read("table"))
self.assertIsNotNone(df.select(col("name"))._plan.collect())
self.assertIsNotNone(df.select(col("name"))._plan.to_proto())
self.assertRaises(InputValidationError, df.select, "name")

def test_join_with_join_type(self):
Expand All @@ -39,7 +39,7 @@ def test_join_with_join_type(self):
("leftanti", proto.Join.JoinType.JOIN_TYPE_LEFT_ANTI),
("leftsemi", proto.Join.JoinType.JOIN_TYPE_LEFT_SEMI),
]:
joined_df = df_left.join(df_right, on=col("name"), how=join_type_str)._plan.collect()
joined_df = df_left.join(df_right, on=col("name"), how=join_type_str)._plan.to_proto()
self.assertEqual(joined_df.root.join.join_type, join_type)


Expand Down

0 comments on commit 646d716

Please sign in to comment.