Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47572][SQL] Enforce Window partitionSpec is orderable. #45730

Closed
wants to merge 3 commits into from

Conversation

chenhao-db
Copy link
Contributor

@chenhao-db chenhao-db commented Mar 26, 2024

What changes were proposed in this pull request?

In the Window node, both partitionSpec and orderSpec must be orderable, but the current type check only verifies orderSpec is orderable. This can cause an error in later optimizing phases.

Given a query:

with t as (select id, map(id, id) as m from range(0, 10))
select rank() over (partition by m order by id) from t

Before the PR, it fails with an INTERNAL_ERROR:

org.apache.spark.SparkException: [INTERNAL_ERROR] grouping/join/window partition keys cannot be map type. SQLSTATE: XX000
at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
at org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers$.needNormalize(NormalizeFloatingNumbers.scala:103)
at org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers$.org$apache$spark$sql$catalyst$optimizer$NormalizeFloatingNumbers$$needNormalize(NormalizeFloatingNumbers.scala:94)
...

After the PR, it fails with a EXPRESSION_TYPE_IS_NOT_ORDERABLE, which is expected:

  org.apache.spark.sql.catalyst.ExtendedAnalysisException: [EXPRESSION_TYPE_IS_NOT_ORDERABLE] Column expression "m" cannot be sorted because its type "MAP<BIGINT, BIGINT>" is not orderable. SQLSTATE: 42822; line 2 pos 53;
Project [RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4]
+- Project [id#1L, m#0, RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4, RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4]
   +- Window [rank(id#1L) windowspecdefinition(m#0, id#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#4], [m#0], [id#1L ASC NULLS FIRST]
      +- Project [id#1L, m#0]
         +- SubqueryAlias t
            +- SubqueryAlias t
               +- Project [id#1L, map(id#1L, id#1L) AS m#0]
                  +- Range (0, 10, step=1, splits=None)
  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:52)
...

How was this patch tested?

Unit test.

@cloud-fan
Copy link
Contributor

I think ordering and equality are two abilities. For example, map type is not orderable, but can be used as grouping keys.

@cloud-fan
Copy link
Contributor

what is the behavior today? Do we allow to use map type as window partition key?

@chenhao-db
Copy link
Contributor Author

chenhao-db commented Mar 29, 2024

Things get a bit more complex after #45549.

For example, map type is not orderable, but can be used as grouping keys.

It can only be used as grouping keys after #45549. However, with the map normalization and comparison code introduced in this PR, the map type can also be orderable.

what is the behavior today?

Before #45549, the SQL snippet in my PR description would fail with an INTERNAL_ERROR. After #45549, it would be successfully executed. When a column is used as a window partition key, it is translated into a local Sort over a HashPartitioning. Although the current type check code of SortOrder rejects map type, the check doesn't run when the SortOrder is constructed from a window partition key. Then, the Sort node will successfully compare map inputs, because #45549 supports map comparison.

There are still some issues with sorting maps, because the map normalization code is only inserted into aggregate, but not sort. But I think they are fixable, and the map type is indeed orderable. However, if a type is not orderable, it cannot be a window partition key, so the code change in this PR is still necessary.

partitionSpec.foreach { p =>
if (!RowOrdering.isOrderable(p.dataType)) {
p.dataTypeMismatch(p, TypeCheckResult.DataTypeMismatch(
errorSubClass = "INVALID_ORDERING_TYPE",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use the error class EXPRESSION_TYPE_IS_NOT_ORDERABLE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot really tell the difference between these two error classes, so I just randomly picked one. Now I changed it into EXPRESSION_TYPE_IS_NOT_ORDERABLE instead.

@cloud-fan
Copy link
Contributor

@chenhao-db thanks for the analysis! Let's be strict first and we can relax map type later if needed.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 609bd48 Mar 29, 2024
sweisdb pushed a commit to sweisdb/spark that referenced this pull request Apr 1, 2024
### What changes were proposed in this pull request?

In the `Window` node, both `partitionSpec` and `orderSpec` must be orderable, but the current type check only verifies `orderSpec` is orderable. This can cause an error in later optimizing phases.

Given a query:

```
with t as (select id, map(id, id) as m from range(0, 10))
select rank() over (partition by m order by id) from t
```

Before the PR, it fails with an `INTERNAL_ERROR`:

```
org.apache.spark.SparkException: [INTERNAL_ERROR] grouping/join/window partition keys cannot be map type. SQLSTATE: XX000
at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
at org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers$.needNormalize(NormalizeFloatingNumbers.scala:103)
at org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers$.org$apache$spark$sql$catalyst$optimizer$NormalizeFloatingNumbers$$needNormalize(NormalizeFloatingNumbers.scala:94)
...
```

After the PR, it fails with a `EXPRESSION_TYPE_IS_NOT_ORDERABLE`, which is expected:

```
  org.apache.spark.sql.catalyst.ExtendedAnalysisException: [EXPRESSION_TYPE_IS_NOT_ORDERABLE] Column expression "m" cannot be sorted because its type "MAP<BIGINT, BIGINT>" is not orderable. SQLSTATE: 42822; line 2 pos 53;
Project [RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)apache#4]
+- Project [id#1L, m#0, RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)apache#4, RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)apache#4]
   +- Window [rank(id#1L) windowspecdefinition(m#0, id#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY m ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)apache#4], [m#0], [id#1L ASC NULLS FIRST]
      +- Project [id#1L, m#0]
         +- SubqueryAlias t
            +- SubqueryAlias t
               +- Project [id#1L, map(id#1L, id#1L) AS m#0]
                  +- Range (0, 10, step=1, splits=None)
  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:52)
...
```

### How was this patch tested?

Unit test.

Closes apache#45730 from chenhao-db/SPARK-47572.

Authored-by: Chenhao Li <chenhao.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants