Skip to content

Conversation

@xinrong-meng
Copy link
Member

@xinrong-meng xinrong-meng commented Dec 4, 2024

What changes were proposed in this pull request?

Support DataFrame conversion to table arguments in Spark Classic, and enable UDTFs to accept table arguments in both PySpark and Scala.

Spark Connect support will be a follow-up, with the goal of completing it by the end of this month.

Why are the changes needed?

Part of SPARK-50391.
Table-Valued Functions (TVFs) and User-Defined Table Functions (UDTFs) are widely used in Spark workflows. These functions often require a table argument, which Spark internally represents as a Catalyst expression. While Spark SQL supports constructs like TABLE() for this purpose, there is no direct API in PySpark or Scala to convert a DataFrame into a table argument. So we propose to support DataFrame conversion to table arguments (in Spark Classic first), and enable UDTFs to accept table arguments in both PySpark and Scala..

Does this PR introduce any user-facing change?

Yes DataFrame conversion to table argument is supported in Spark Classic, and UDTFs accept table arguments in both PySpark and Scala.

>>> from pyspark.sql.functions import udtf
>>> from pyspark.sql import Row
>>> 
>>> @udtf(returnType="a: int")
... class TestUDTF:
...     def eval(self, row: Row):
...         if row[0] > 5:
...             yield row[0],
... 
>>> df = spark.range(8)
>>> 
>>> TestUDTF(df.asTable()).show()
+---+                                                                           
|  a|
+---+
|  6|
|  7|
+---+

>>> TestUDTF(df.asTable().partitionBy(df.id)).show()
+---+
|  a|
+---+
|  6|
|  7|
+---+

>>> TestUDTF(df.asTable().partitionBy(df.id).orderBy(df.id)).show()
+---+
|  a|
+---+
|  6|
|  7|
+---+

>>> TestUDTF(df.asTable().withSinglePartition()).show()
+---+
|  a|
+---+
|  6|
|  7|
+---+

>>> TestUDTF(df.asTable().partitionBy(df.id).withSinglePartition()).show()
Traceback (most recent call last):
...
pyspark.errors.exceptions.captured.IllegalArgumentException: Cannot call withSinglePartition() after partitionBy() has been called.

How was this patch tested?

Unit tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@xinrong-meng xinrong-meng changed the title [WIP][SPARK-50392][PYTHON] DataFrame conversion to table argument in Spark Classic [SPARK-50392][PYTHON] DataFrame conversion to table argument in Spark Classic Dec 13, 2024
@xinrong-meng xinrong-meng marked this pull request as ready for review December 13, 2024 01:46
Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the design, and LGTM

Copy link
Member

@ueshin ueshin Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about TVFArgument, TableFunctionArgument or TableValuedFunctionArgument?
In SQL, this is not only for UDTF but TVFs in general, although currently there is no builtin tvf that supports table arguments.
cc @dtenedor @allisonwang-db

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! UDTFs are a type of TVFs, how about TableValuedFunctionArgument?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableValuedFunctionArgument sounds good. This way we don't need to limit it to user-defined table functions.

@xinrong-meng xinrong-meng requested a review from ueshin December 19, 2024 02:25
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wha't the behavior of this? In SQL, order by only is not allowed, IIRC. cc @dtenedor

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, what happens if, e.g., func(df.asTable().partitionBy(df.key).orderBy(df.value)).partitionBy()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is allowed here

>>> TestUDTF(df.asTable().partitionBy("id").orderBy("id").partitionBy()).show()
+---+
|  a|
+---+
|  6|
|  7|
+---+

Copy link
Member

@ueshin ueshin Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dtenedor Wha't the behavior of this? I don't think we should allow this?

Copy link
Member Author

@xinrong-meng xinrong-meng Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

order by only is not supported in SQL as

[PARSE_SYNTAX_ERROR] Syntax error at or near 'ORDER'. SQLSTATE: 42601 (line 1, pos 59)

== SQL ==
SELECT * FROM test_udtf(TABLE (SELECT id FROM range(0, 8)) ORDER BY id)
-----------------------------------------------------------^^^

Adjusted

Copy link
Member Author

@xinrong-meng xinrong-meng Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple partition bys are not supported as

[PARSE_SYNTAX_ERROR] Syntax error at or near 'PARTITION'. SQLSTATE: 42601 (line 1, pos 87)

== SQL ==

SELECT * FROM test_udtf(TABLE (SELECT id FROM range(0, 8)) PARTITION BY id ORDER BY id PARTITION BY id)
---------------------------------------------------------------------------------------^^^

or

== SQL ==
SELECT * FROM test_udtf(TABLE (SELECT id FROM range(0, 8)) PARTITION BY id PARTITION BY id)
---------------------------------------------------------------------------^^^

Adjusted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests with named arguments?

Copy link
Member Author

@xinrong-meng xinrong-meng Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def partitionBy(self, *cols: "ColumnOrName") -> "TableArg":
does not accept keyword arguments, would you clarify what we are expecting here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func(row = df.asTable() ...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, included named arguments

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created SPARK-50392 as a follow-up to support named arguments.
It requires a change in PythonSQLUtils.namedArgumentExpression, which depends on the TableArg class in Spark Connect.

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, LGTM, pending #49055 (comment) and tests.

"org.apache.spark.sql.ExtendedExplainGenerator"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.UDTFRegistration"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataSourceRegistration"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.TableArg$"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Copy link
Member Author

@xinrong-meng xinrong-meng Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, I added the line due to a test failure hint.
Let me verify here.

@xinrong-meng
Copy link
Member Author

Otherwise, LGTM, pending #49055 (comment) and tests.

I appreciate your detailed review. That's very helpful!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @xinrong-meng , @HyukjinKwon , @ueshin , @allisonwang-db .

Newly added table_arg.py seems to break Spark Connect Python-only CI. Could you take a look at the failure, please?

  File "/opt/hostedtoolcache/Python/3.11.11/x64/lib/python3.11/site-packages/pyspark/sql/table_arg.py", line 20, in <module>
    from pyspark.sql.classic.column import _to_java_column, _to_seq
ModuleNotFoundError: No module named 'pyspark.sql.classic'
Screenshot 2025-01-12 at 15 05 54

@ueshin
Copy link
Member

ueshin commented Jan 13, 2025

@dongjoon-hyun I submitted a follow-up PR #49472 to fix it. Thanks. cc @xinrong-meng

@dongjoon-hyun
Copy link
Member

Thank you!

dongjoon-hyun pushed a commit that referenced this pull request Jan 14, 2025
…onnect-only` builds

### What changes were proposed in this pull request?

Move imports into methods to fix connect-only builds.

### Why are the changes needed?

#49055 broke the connect-only builds: #49055 (review)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49472 from ueshin/issues/SPARK-50392/fup.

Authored-by: Takuya Ueshin <ueshin@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants