Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24497][SQL] Support recursive SQL #40744

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Apr 11, 2023

What changes were proposed in this pull request?

This PR adds recursive query feature to Spark SQL.

A recursive query is defined using the WITH RECURSIVE keywords and referring the name of the common table expression within the query.
The implementation complies with SQL standard and follows similar rules to other relational databases:

  • A query is made of an anchor followed by a recursive term.
  • The anchor terms doesn't contain self reference and it is used to initialize the query.
  • The recursive term contains a self reference and it is used to expand the current set of rows with new ones.
  • The anchor and recursive terms must be joined with each other by UNION or UNION ALL operators.
  • New rows can only be derived from the newly added rows of the previous iteration (or from the initial set of rows of anchor term). This limitation implies that recursive references can't be used with some of the joins, aggregations or subqueries.

Please see cte-recursive.sql for some examples.

The implemetation has the same limiation that SPARK-36447 / #33671 has:

With-CTEs mixed with SQL commands or DMLs will still go through the old inline code path because of our non-standard language specs and not-unified command/DML interfaces.

which means that recursive queries are not supported in SQL commands and DMLs.
With #42036 this restriction is lifted and a recursive CTE only doesn't work when the CTE is force inlined (spark.sql.legacy.inlineCTEInCommands=true or the command is a multi-insert statement).

Why are the changes needed?

Recursive query is an ANSI SQL feature that is useful to process hierarchical data.

Does this PR introduce any user-facing change?

Yes, adds recursive query feature.

How was this patch tested?

Added new UTs and tests in cte-recursion.sql.

@peter-toth
Copy link
Contributor Author

This PR is WIP as it contains #40856. Once that PR is merged I will rebase and remove the WIP flag.

@peter-toth peter-toth force-pushed the SPARK-24497-recursive-cte branch 3 times, most recently from 9302d52 to 38f8324 Compare April 26, 2023 13:51
@peter-toth peter-toth changed the title [WIP][SPARK-24497][SQL] Support recursive SQL [SPARK-24497][SQL] Support recursive SQL Apr 27, 2023
@peter-toth
Copy link
Contributor Author

#40856 got merged and I've rebased this PR. I'm removing the WIP flag and the PR is ready for review.

cc @cloud-fan, @wangyum, @maryannxue, @sigmod

@wangyum
Copy link
Member

wangyum commented May 30, 2023

Thanks @peter-toth. I tested this patch locally. But it seem it throws StackOverflowError.
How to reproduce:

./dev/make-distribution.sh --tgz  -Phive -Phive-thriftserver
tar -zxf spark-3.5.0-SNAPSHOT-bin-3.3.5.tgz
cd spark-3.5.0-SNAPSHOT-bin-3.3.5
bin/spark-sql
spark-sql (default)> WITH RECURSIVE t(n) AS (
                   >     VALUES (1)
                   > UNION ALL
                   >     SELECT n+1 FROM t WHERE n < 100
                   > )
                   > SELECT sum(n) FROM t;
23/05/30 13:21:21 ERROR Executor: Exception in task 0.0 in stage 265.0 (TID 199)
java.lang.StackOverflowError
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

@peter-toth
Copy link
Contributor Author

peter-toth commented May 30, 2023

Thanks @peter-toth. I tested this patch locally. But it seem it throws StackOverflowError. How to reproduce:

./dev/make-distribution.sh --tgz  -Phive -Phive-thriftserver
tar -zxf spark-3.5.0-SNAPSHOT-bin-3.3.5.tgz
cd spark-3.5.0-SNAPSHOT-bin-3.3.5
bin/spark-sql
spark-sql (default)> WITH RECURSIVE t(n) AS (
                   >     VALUES (1)
                   > UNION ALL
                   >     SELECT n+1 FROM t WHERE n < 100
                   > )
                   > SELECT sum(n) FROM t;
23/05/30 13:21:21 ERROR Executor: Exception in task 0.0 in stage 265.0 (TID 199)
java.lang.StackOverflowError
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

Thanks for testing this PR @wangyum. Iterestingly, I didn't encounter stack overflow when recursion level is <100. The error starts to appear at level ~170 in my tests. I guess this depends on your default stack size. Since recursion works in a way that each iteration depends on the previous iteration, the RDD lineage of the tasks are getting bigger and bigger and the deserialization of those tasks can throw stack overflow error at some point. Let me amend this PR with adding optional checkpointing so as to truncate RDD linage and be able to deal with deeper recursion...


private def cacheAndCount(plan: LogicalPlan, limit: Option[Long]) = {
val limitedPlan = limit.map(l => Limit(Literal(l.toInt), plan)).getOrElse(plan)
val df = Dataset.ofRows(session, limitedPlan).persist()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we replace persist() with repartition() to avoid stack overflow issue?

Copy link
Contributor Author

@peter-toth peter-toth Jun 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repartition() seems to be good option to truncate RDD lineage and decrease task sizes to avoid stack overflow. I added it as the default cache mode in 2c206a0.

var currentLimit = limit.map(_.toLong)
var (prevDF, prevCount) = cacheAndCount(anchor, currentLimit)

var currentLevel = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why currentLevel is 0, not 1?

@peter-toth peter-toth force-pushed the SPARK-24497-recursive-cte branch 2 times, most recently from 02f527d to 206e9a8 Compare June 2, 2023 08:54
@ksn06
Copy link

ksn06 commented Jul 6, 2023

Hey folks,
So glad to see this feature is being worked on. Do you have any estimates when this could be released ?

@peter-toth
Copy link
Contributor Author

peter-toth commented Jul 6, 2023

Hey folks, So glad to see this feature is being worked on. Do you have any estimates when this could be released ?

This feature very likely won't make it into the next release (Spark 3.5) as tbe branch cut is in 2 weeks. But I will try to add it to the one after next (Spark 4.0).

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added Stale and removed CORE labels Oct 15, 2023
@peter-toth peter-toth removed the Stale label Oct 15, 2023
@milimetric
Copy link

@peter-toth thank you so much for sticking with this over three major versions and three separate pull requests. Recursive queries would be really nice to have in Spark SQL.

@KamilKandzia
Copy link

@peter-toth Hi, we are very much expecting a recursive sql. We hope you will be able to complete this pull request :)

@peter-toth peter-toth force-pushed the SPARK-24497-recursive-cte branch 2 times, most recently from 3dfb1f6 to ae25f5f Compare December 14, 2023 15:30
@peter-toth
Copy link
Contributor Author

peter-toth commented Dec 21, 2023

@milastdbx do you think you can take over this PR?

cc @cloud-fan, @mitkedb, @MaxGekk

@milastdbx
Copy link
Contributor

milastdbx commented Dec 25, 2023 via email

@waywtdcc
Copy link

waywtdcc commented Mar 4, 2024

Can this PR be merged? I also encountered this scenario

@firstim
Copy link

firstim commented Mar 6, 2024

try this when this feature is not available yet.
https://pypi.org/project/pyspark-connectby/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants