Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39135][SQL] DS V2 aggregate partial push-down should supports group by without aggregate functions #36492

Closed
wants to merge 1 commit into from

Conversation

beliefer
Copy link
Contributor

What changes were proposed in this pull request?

Currently, the SQL show below not supported by DS V2 aggregate partial push-down.
select key from tab group by key

Why are the changes needed?

Make DS V2 aggregate partial push-down supports group by without aggregate functions.

Does this PR introduce any user-facing change?

'No'.
New feature.

How was this patch tested?

New tests

@github-actions github-actions bot added the SQL label May 10, 2022
@beliefer
Copy link
Contributor Author

ping @huaxingao cc @cloud-fan

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.3! (it's kind of a mistake in the agg pushdown)

cloud-fan pushed a commit that referenced this pull request May 10, 2022
…group by without aggregate functions

### What changes were proposed in this pull request?
Currently, the SQL show below not supported by DS V2 aggregate partial push-down.
`select key from tab group by key`

### Why are the changes needed?
Make DS V2 aggregate partial push-down supports group by without aggregate functions.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New tests

Closes #36492 from beliefer/SPARK-39135.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit decd393)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan cloud-fan closed this in decd393 May 10, 2022
@beliefer
Copy link
Contributor Author

@cloud-fan Thank you!

chenzhx pushed a commit to chenzhx/spark that referenced this pull request May 17, 2022
…group by without aggregate functions

### What changes were proposed in this pull request?
Currently, the SQL show below not supported by DS V2 aggregate partial push-down.
`select key from tab group by key`

### Why are the changes needed?
Make DS V2 aggregate partial push-down supports group by without aggregate functions.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New tests

Closes apache#36492 from beliefer/SPARK-39135.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
chenzhx pushed a commit to chenzhx/spark that referenced this pull request May 27, 2022
…group by without aggregate functions

### What changes were proposed in this pull request?
Currently, the SQL show below not supported by DS V2 aggregate partial push-down.
`select key from tab group by key`

### Why are the changes needed?
Make DS V2 aggregate partial push-down supports group by without aggregate functions.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New tests

Closes apache#36492 from beliefer/SPARK-39135.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
chenzhx added a commit to Kyligence/spark that referenced this pull request May 31, 2022
* [SPARK-38997][SQL] DS V2 aggregate push-down supports group by expressions

### What changes were proposed in this pull request?
Currently, Spark DS V2 aggregate push-down only supports group by column.
But the SQL show below is very useful and common.
```
SELECT
  CASE
    WHEN 'SALARY' > 8000.00
      AND 'SALARY' < 10000.00
    THEN 'SALARY'
    ELSE 0.00
  END AS key,
  SUM('SALARY')
FROM "test"."employee"
GROUP BY key
```

### Why are the changes needed?
Let DS V2 aggregate push-down supports group by expressions

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New tests

Closes apache#36325 from beliefer/SPARK-38997.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

fix ut

* [SPARK-39135][SQL] DS V2 aggregate partial push-down should supports group by without aggregate functions

### What changes were proposed in this pull request?
Currently, the SQL show below not supported by DS V2 aggregate partial push-down.
`select key from tab group by key`

### Why are the changes needed?
Make DS V2 aggregate partial push-down supports group by without aggregate functions.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New tests

Closes apache#36492 from beliefer/SPARK-39135.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-39157][SQL] H2Dialect should override getJDBCType so as make the data type is correct

### What changes were proposed in this pull request?
Currently, `H2Dialect` not implement `getJDBCType` of `JdbcDialect`, so the DS V2 push-down will throw exception show below:
```
Job aborted due to stage failure: Task 0 in stage 13.0 failed 1 times, most recent failure: Lost task 0.0 in stage 13.0 (TID 13) (jiaan-gengdembp executor driver):
 org.h2.jdbc.JdbcSQLNonTransientException: Unknown data type: "STRING"; SQL statement:
SELECT "DEPT","NAME","SALARY","BONUS","IS_MANAGER" FROM "test"."employee"  WHERE ("BONUS" IS NOT NULL) AND ("DEPT" IS NOT NULL) AND (CAST("BONUS" AS string) LIKE '%30%') AND (CAST("DEPT" AS byte) > 1) AND (CAST("DEPT" AS short) > 1) AND (CAST("BONUS" AS decimal(20,2)) > 1200.00)    [50004-210]
```
H2Dialect should implement `getJDBCType` of `JdbcDialect`.

### Why are the changes needed?
 make the H2 data type is correct.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Fix a bug for `H2Dialect`.

### How was this patch tested?
New tests.

Closes apache#36516 from beliefer/SPARK-39157.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-39162][SQL] Jdbc dialect should decide which function could be pushed down

### What changes were proposed in this pull request?
Regardless of whether the functions are ANSI or not, most databases are actually unsure of their support.
So we should add a new API into `JdbcDialect` so that jdbc dialect decide which function could be pushed down.

### Why are the changes needed?
Let function push-down more flexible.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
Exists tests.

Closes apache#36521 from beliefer/SPARK-39162.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: huaxingao <huaxin_gao@apple.com>

* [SPARK-38897][SQL] DS V2 supports push down string functions

### What changes were proposed in this pull request?

Currently, Spark have some string functions of ANSI standard. Please refer

https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L503

These functions show below:
`SUBSTRING,`
`UPPER`,
`LOWER`,
`TRANSLATE`,
`TRIM`,
`OVERLAY`

The mainstream databases support these functions show below.
Function | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | SQLite | Influxdata | Singlestore | ElasticSearch
-- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
`SUBSTRING` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes
`UPPER` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes
`LOWER` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | YES | Yes | Yes | Yes | Yes
`TRIM` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes
`TRANSLATE` | Yes | No | Yes | No | Yes | Yes | No | No | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | No | No | No | No | No | No
`OVERLAY` | Yes | No | No | No | Yes | No | No | No | No | Yes | Yes | No | No | No | No | No | No | No | No | No | No | No

DS V2 should supports push down these string functions.

### Why are the changes needed?

DS V2 supports push down string functions

### Does this PR introduce _any_ user-facing change?

'No'.
New feature.

### How was this patch tested?

New tests.

Closes apache#36330 from chenzhx/spark-master.

Authored-by: chenzhx <chen@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-28330][SQL] Support ANSI SQL: result offset clause in query expression

### What changes were proposed in this pull request?
This is a ANSI SQL and feature id is `F861`
```
<query expression> ::=
[ <with clause> ] <query expression body>
[ <order by clause> ] [ <result offset clause> ] [ <fetch first clause> ]

<result offset clause> ::=
OFFSET <offset row count> { ROW | ROWS }
```
For example:
```
SELECT customer_name, customer_gender FROM customer_dimension
   WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name;
    customer_name     | customer_gender
----------------------+-----------------
 Amy X. Lang          | Female
 Anna H. Li           | Female
 Brian O. Weaver      | Male
 Craig O. Pavlov      | Male
 Doug Z. Goldberg     | Male
 Harold S. Jones      | Male
 Jack E. Perkins      | Male
 Joseph W. Overstreet | Male
 Kevin . Campbell     | Male
 Raja Y. Wilson       | Male
 Samantha O. Brown    | Female
 Steve H. Gauthier    | Male
 William . Nielson    | Male
 William Z. Roy       | Male
(14 rows)

SELECT customer_name, customer_gender FROM customer_dimension
   WHERE occupation='Dancer' AND customer_city = 'San Francisco' ORDER BY customer_name OFFSET 8;
   customer_name   | customer_gender
-------------------+-----------------
 Kevin . Campbell  | Male
 Raja Y. Wilson    | Male
 Samantha O. Brown | Female
 Steve H. Gauthier | Male
 William . Nielson | Male
 William Z. Roy    | Male
(6 rows)
```
There are some mainstream database support the syntax.

**Druid**
https://druid.apache.org/docs/latest/querying/sql.html#offset

**Kylin**
http://kylin.apache.org/docs/tutorial/sql_reference.html#QUERYSYNTAX

**Exasol**
https://docs.exasol.com/sql/select.htm

**Greenplum**
http://docs.greenplum.org/6-8/ref_guide/sql_commands/SELECT.html

**MySQL**
https://dev.mysql.com/doc/refman/5.6/en/select.html

**Monetdb**
https://www.monetdb.org/Documentation/SQLreference/SQLSyntaxOverview#SELECT

**PostgreSQL**
https://www.postgresql.org/docs/11/queries-limit.html

**Sqlite**
https://www.sqlite.org/lang_select.html

**Vertica**
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SQLReferenceManual/Statements/SELECT/OFFSETClause.htm?zoom_highlight=offset

The description for design:
**1**. Consider `OFFSET` as the special case of `LIMIT`. For example:
`SELECT * FROM a limit 10;` similar to `SELECT * FROM a limit 10 offset 0;`
`SELECT * FROM a offset 10;` similar to `SELECT * FROM a limit -1 offset 10;`
**2**. Because the current implement of `LIMIT` has good performance. For example:
`SELECT * FROM a limit 10;` parsed to the logic plan as below:
```
GlobalLimit (limit = 10)
|--LocalLimit (limit = 10)
```
and then the physical plan as below:
```
GlobalLimitExec (limit = 10) // Take the first 10 rows globally
|--LocalLimitExec (limit = 10) // Take the first 10 rows locally
```
This operator reduce massive shuffle and has good performance.
Sometimes, the logic plan transformed to the physical plan as:
```
CollectLimitExec (limit = 10) // Take the first 10 rows globally
```
If the SQL contains order by, such as `SELECT * FROM a order by c limit 10;`.
This SQL will be transformed to the physical plan as below:
```
TakeOrderedAndProjectExec (limit = 10) // Take the first 10 rows after sort globally
```

Based on this situation, this PR produces the following operations. For example:
`SELECT * FROM a limit 10 offset 10;` parsed to the logic plan as below:
```
GlobalLimit (limit = 10)
|--LocalLimit (limit = 10)
   |--Offset (offset = 10)
```
After optimization, the above logic plan will be transformed to:
```
GlobalLimitAndOffset (limit = 10, offset = 10) // Limit clause accompanied by offset clause
|--LocalLimit (limit = 20)   // 10 + offset = 20
```

and then the physical plan as below:
```
GlobalLimitAndOffsetExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally
|--LocalLimitExec (limit = 20) // Take the first 20(limit + offset) rows locally
```
Sometimes, the logic plan transformed to the physical plan as:
```
CollectLimitExec (limit = 10, offset = 10) // Skip the first 10 rows and take the next 10 rows globally
```
If the SQL contains order by, such as `SELECT * FROM a order by c limit 10 offset 10;`.
This SQL will be transformed to the physical plan as below:
```
TakeOrderedAndProjectExec (limit = 10, offset 10) // Skip the first 10 rows and take the next 10 rows after sort globally
```
**3**.In addition to the above, there is a special case that is only offset but no limit. For example:
`SELECT * FROM a offset 10;` parsed to the logic plan as below:
```
Offset (offset = 10) // Only offset clause
```
If offset is very large, will generate a lot of overhead. So this PR will refuse use offset clause without limit clause, although we can parse, transform and execute it.

A balanced idea is add a configuration item `spark.sql.forceUsingOffsetWithoutLimit` to force running query when user knows the offset is small enough. The default value of `spark.sql.forceUsingOffsetWithoutLimit` is false. This PR just came up with the idea so that it could be implemented at a better time in the future.

Note: The origin PR to support this feature is apache#25416.
Because the origin PR too old, there exists massive conflict which is hard to resolve. So I open this new PR to support this feature.

### Why are the changes needed?
new feature

### Does this PR introduce any user-facing change?
'No'

### How was this patch tested?
Exists and new UT

Closes apache#35975 from beliefer/SPARK-28330.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-39057][SQL] Offset could work without Limit

### What changes were proposed in this pull request?
Currently, `Offset` must work with `Limit`. The behavior not allow to use offset alone and add offset API into `DataFrame`.

If we use `Offset` alone, there are two situations:
1. If `Offset` is the last operator, collect the result to the driver and then drop/skip the first n (offset value) rows. Users can test or debug `Offset` in the way.
2. If `Offset` is the intermediate operator, shuffle all the result to one task and drop/skip the first n (offset value) rows and the result will be passed to the downstream operator.

For example, `SELECT * FROM a offset 10; ` parsed to the logic plan as below:
```
Offset (offset = 10) // Only offset clause
|--Relation
```

and then the physical plan as below:
```
CollectLimitExec(limit = -1, offset = 10) // Collect the result to the driver and skip the first 10 rows
|--JDBCRelation
```
or
```
GlobalLimitAndOffsetExec(limit = -1, offset = 10) // Collect the result and skip the first 10 rows
|--JDBCRelation
```

After this PR merged, users could input the SQL show below:
```
SELECT '' AS ten, unique1, unique2, stringu1
 		FROM onek
 		ORDER BY unique1 OFFSET 990;
```

Note: apache#35975 supports offset clause, it create a logical node named
`GlobalLimitAndOffset`. In fact, we can avoid use this node and use `Offset` instead and the latter is good with unify name.

### Why are the changes needed?
Improve the implement of offset clause.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
Exists test cases.

Closes apache#36417 from beliefer/SPARK-28330_followup2.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-39159][SQL] Add new Dataset API for Offset

### What changes were proposed in this pull request?
Currently, Spark added `Offset` operator.
This PR try to add `offset` API into `Dataset`.

### Why are the changes needed?
`offset` API is very useful and construct test case more easily.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New tests.

Closes apache#36519 from beliefer/SPARK-39159.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* fix ut

* update spark version

Co-authored-by: Jiaan Geng <beliefer@163.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants