[SPARK-48344][SQL] SQL API change to support execution of compound statements#47403
[SPARK-48344][SQL] SQL API change to support execution of compound statements#47403miland-db wants to merge 23 commits intoapache:masterfrom
Conversation
sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
Outdated
Show resolved
Hide resolved
| isInternal = false) | ||
| } | ||
|
|
||
| def execute(executionPlan: Iterator[CompoundStatementExec]): Iterator[Array[Row]] = { |
There was a problem hiding this comment.
I would maybe make buildExecutionPlan private, and than call it from execute - seems like we will never need to call buildExecutionPlan from outside?
for example, if we add separate execute* function in the future, it can also call buildExecutionPlan internally.
There was a problem hiding this comment.
We are using buildExecutionPlan in SqlScriptingInterpreterSuite. Should we change the testing logic as well?
We can call buildExecutionPlan from execute but can we also leave it to be public?
There was a problem hiding this comment.
This probably means that we should change the tests a bit, we are simulating execute in the tests as well, because before we didn't have it in the interpreter. Now that we have it, we should probably change tests.
Let's do what you said last for now, until other folks review the PR (leave this comment unresolved) and then we can figure if we want to change tests as well.
|
Just as a reminder, we should probably include SQL config with this PR as well! Should be fairly simple, but let's talk about it on Monday. |
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
Outdated
Show resolved
Hide resolved
| // execute the plan directly if it is not a single statement | ||
| val lastRow = executeScript(plan).foldLeft(Array.empty[Row])((_, next) => next) | ||
| val attributes = DataTypeUtils.toAttributes(lastRow.head.schema) | ||
| Dataset.ofRows(self, LocalRelation.fromExternalRows(attributes, lastRow.toIndexedSeq)) |
There was a problem hiding this comment.
shall we return the last DataFrame from the script instead of collecting the result at the driver side?
There was a problem hiding this comment.
this is a bit of a hard topic at the moment... we decided on this approach for preview for multiple reasons:
- this what we will do for initial version of the changes for Spark Connect execution
- discussions around multiple topics are still open:
- multiple results API - decisions around this will affect single results API as well
- multiple results API - from correctness perspective, all statements (including SELECTs) need to be executed eagerly. It makes sense then to have the same behavior with single results API as well - in this case all statements are executed eagerly, but results for all of them except the last one are dropped.
- there is still an open discussion whether to include
returnstatement - a ton of questions about stored procedures are still an open topic
what will probably happen down the line is:
- sql() API remains unchanged and only last DataFrame is returned (as you suggested). Requires still a lot of work to support Connect execution, current approach works with Connect already.
- [optional] new API to do what we are doing at the moment.
- new API for multiple results, stored procedures, execute immediate, etc.
since the last part is still an open question, we figured out that we will do a simplest thing that works e2e in all cases and then, after we gather initial feedback from preview, and understand better what we want to do for stored procedures/multiple results, we should actually commit to implement all of the API changes.
please let us know your thoughts on this.
There was a problem hiding this comment.
all statements (including SELECTs) need to be executed eagerly
We need to materialize the query result but we don't need to collect the result at the driver side (except for the last statement). E.g. we can write the query result to a noop sink.
There was a problem hiding this comment.
What is noop sink doing? Is it df.write.format("noop").mode("overwrite").save()? Is it the same as doing df.collect() but it just throws away the result?
We have a hard time determining which statement is the last statement. That is the reason why we are doing it this way (we have to save the result of the last dataframe).
There was a problem hiding this comment.
Yea it's the same as doing df.collect() but it just throws away the result
| shouldCollectResult = true) | ||
| } | ||
|
|
||
| def execute(compoundBody: CompoundBody): Iterator[Array[Row]] = { |
There was a problem hiding this comment.
can we decouple the two parts: 1) produce DataFrames from the script. 2) execute the DataFrames one by one w.r.t. the order.
There was a problem hiding this comment.
We decided to do it this way because each statement has to be executed as soon as we encounter it because it can throw an exception. Very soon we are introducing handlers/conditions to deal with this exceptions.
There was a problem hiding this comment.
We did it the way you suggested before, but as soon as we started introducing exception handling, we figured out it's hardly extensible in the future. When SQL statements throws an exception, exception handlers need to be executed immediately, and we need interpreter for that - hence, we need to do execution within the interpreter as well.
There was a problem hiding this comment.
I'm OK with a simple implementation (collect all the result rows) as a beginning, but we should build the basic framework with proper abstraction, otherwise I don't know how to review it. The idea of handlers makes sense.
There was a problem hiding this comment.
For now, because we don't know which one is the last statement to do only one collect() and use noop sink for the rest, we decided to do collect for all statements.
Either way, it is important to execute each statement as soon as we encounter it to be able to handle errors properly. PR introducing handlers is currently work in progress and will probably explain why we did things the way we did in this PR.
# Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/errors/SqlScriptingErrors.scala # sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/SqlScriptingParserSuite.scala
|
@cloud-fan any unresolved topics here? Milan won't be available next week - I can finish the PR next week if needed, but let's try to figure out if there are any concerns so Milan can try to sort them out today. |
| Dataset.ofRows(self, singleStmtPlan.parsedPlan, tracker) | ||
| case _ => | ||
| // execute the plan directly if it is not a single statement | ||
| val lastRow = executeScript(plan).foldLeft(Array.empty[Row])((_, next) => next) |
There was a problem hiding this comment.
let's think if we want to do this exactly this way, because:
executeScriptis basically a simple one-liner and alias for interpreter'sexecutefunction- when we introduce multiple results in the future, it seems best to:
- have
executeMultipleResultsin the interpreter - each function (
executeandexecuteMultipleResultsand maybe something new?) should collect data based on the type of data it needs to return
- have
I propose that execute family of methods in the interpreter should be responsible to handle the logic of which data is returned, instead of fetching last row here in SparkSession.
I didn't write a ton of details here, I'm writing this comment as a reminder and we can discuss more offline.
| .trim | ||
| } | ||
|
|
||
| test("SQL Scripting not enabled") { |
There was a problem hiding this comment.
nit: move this above the // Helper methods
| "feature flag.") | ||
| .version("4.0.0") | ||
| .booleanConf | ||
| .createWithDefault(Utils.isTesting) |
There was a problem hiding this comment.
Did we talk about changing this to false by default? Why did we decide not to do it?
| /** | ||
| * Script interpreter that produces execution plan and executes SQL scripts. | ||
| */ | ||
| protected lazy val scriptingInterpreter: SqlScriptingInterpreter = { |
There was a problem hiding this comment.
nit: sqlScriptingInterpreter maybe?
| shouldCollectResult = true) | ||
| } | ||
|
|
||
| def execute(compoundBody: CompoundBody): Iterator[Array[Row]] = { |
There was a problem hiding this comment.
nit: let's keep public stuff at top, and private below
What changes were proposed in this pull request?
Previous PRs introduced basic changes for SQL Scripting.
This PR is a follow-up to introduce changes to SQL API in
SparkSessionto support execution of compound statements.SQL Config
SQL_SCRIPTING_ENABLEDis added to enable usage of SQL Scripting.CompoundNestedStatementIteratorExecis removed since it is an unnecessary abstraction layer.Statement flags
isInternalflag is still existing to indicate if statement is added by us (generated during tree transformation for example to drop variables). We will see in the future if we still need this flag.shouldCollectResultis indicating whether we should collect and potentially return result or to throw is away.Why are the changes needed?
This change is needed to open the possibility for users to execute and get results from sql scripts.
Does this PR introduce any user-facing change?
Users will now see be able to execute sql scripts using
spark.sql()API.How was this patch tested?
There are tests for newly introduced execution changes:
SqlScriptingExecutionNodeSuite- updated unit tests for execution nodes.Was this patch authored or co-authored using generative AI tooling?
No.