Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45266][PYTHON][FOLLOWUP] Fix to resolve UnresolvedPolymorphicPythonUDTF when the table argument is specified as a named argument #43355

Closed
wants to merge 1 commit into from

Conversation

ueshin
Copy link
Member

@ueshin ueshin commented Oct 12, 2023

What changes were proposed in this pull request?

This is a follow-up of #43042.

Fix to resolve UnresolvedPolymorphicPythonUDTF when the table argument is specified as a named argument.

Why are the changes needed?

The Python UDTF analysis result was not applied when the table argument is specified as a named argument.

For example, for the following UDTF:

@udtf
class TestUDTF:
    def __init__(self):
        self._count = 0
        self._sum = 0
        self._last = None

    @staticmethod
    def analyze(*args, **kwargs):
        return AnalyzeResult(
            schema=StructType()
            .add("count", IntegerType())
            .add("total", IntegerType())
            .add("last", IntegerType()),
            with_single_partition=True,
            order_by=[OrderingColumn("input"), OrderingColumn("partition_col")],
        )

    def eval(self, row: Row):
        # Make sure that the rows arrive in the expected order.
        if self._last is not None and self._last > row["input"]:
            raise Exception(
                f"self._last was {self._last} but the row value was {row['input']}"
            )
        self._count += 1
        self._last = row["input"]
        self._sum += row["input"]

    def terminate(self):
        yield self._count, self._sum, self._last

spark.udtf.register("test_udtf", TestUDTF)

The following query shows a wrong result:

>>> spark.sql("""
...     WITH t AS (
...       SELECT id AS partition_col, 1 AS input FROM range(1, 21)
...       UNION ALL
...       SELECT id AS partition_col, 2 AS input FROM range(1, 21)
...     )
...     SELECT count, total, last
...     FROM test_udtf(row => TABLE(t))
...     ORDER BY 1, 2
... """).show()
+-----+-----+----+
|count|total|last|
+-----+-----+----+
|    1|    1|   1|
|    1|    1|   1|
|    1|    1|   1|
|    1|    1|   1|
|    1|    1|   1|
|    1|    1|   1|
|    1|    1|   1|
|    1|    1|   1|
|    1|    1|   1|
|    1|    1|   1|
|    1|    1|   1|
|    1|    1|   1|
|    1|    2|   2|
|    1|    2|   2|
|    1|    2|   2|
|    1|    2|   2|
|    1|    2|   2|
|    1|    2|   2|
|    1|    2|   2|
|    1|    2|   2|
+-----+-----+----+
only showing top 20 rows

That should equal to the result without named argument:

>>> spark.sql("""
...     WITH t AS (
...       SELECT id AS partition_col, 1 AS input FROM range(1, 21)
...       UNION ALL
...       SELECT id AS partition_col, 2 AS input FROM range(1, 21)
...     )
...     SELECT count, total, last
...     FROM test_udtf(TABLE(t))
...     ORDER BY 1, 2
... """).show()
+-----+-----+----+
|count|total|last|
+-----+-----+----+
|   40|   60|   2|
+-----+-----+----+

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Modified the related tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@ueshin
Copy link
Member Author

ueshin commented Oct 12, 2023

cc @dtenedor

Copy link
Contributor

@dtenedor dtenedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks for the fix!

@ueshin
Copy link
Member Author

ueshin commented Oct 13, 2023

The failed tests are not related to this PR.

KafkaSourceStressSuite.stress test with multiple topics and partitions

@ueshin
Copy link
Member Author

ueshin commented Oct 13, 2023

Thanks! merging to master.

@ueshin ueshin closed this in 12880c8 Oct 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants