-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52346][SQL] Declarative Pipeline DataflowGraph
execution and event logging
#51050
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
68d6d84
to
8470cb6
Compare
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala
Outdated
Show resolved
Hide resolved
Heads up: I just rebased this PR on the latest changes inside the underlying graph resolution PR. |
I also just added some tests for |
54ddc7e
to
e66e67f
Compare
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
b7c4756
to
2d9b50e
Compare
...ipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/FlowProgressEventLogger.scala
Outdated
Show resolved
Hide resolved
"Please delete the table or change the declared partitioning to match its partitions." | ||
], | ||
"sqlState" : "42000" | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partition column is fixed once persisted, users needs to recreate the MV/ST to update the partition col
* @param context The context for the pipeline update. | ||
* @return The graph with materialized tables. | ||
*/ | ||
def materializeDatasets( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we loop through every tables defined in the graph and register them in the metastore with empty table if not exists . if table already eixsts, we update the table with latest definition
): DataflowGraph = { | ||
val (_, refreshTableIdentsSet, fullRefreshTableIdentsSet) = { | ||
DatasetManager.constructFullRefreshSet(resolvedDataflowGraph.tables, context) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refresh tables are tables needs to be updated during the current execution, by default it should be all but users can configure if they only update a subset of the table.
full fresh tables are tables needs to be completely recomputed, we will wipe out all the historical data and start from the new. This by default is None but can also be specified by users which tables they wanna full refresh
} | ||
|
||
/** A [[FlowExecution]] specifies how to execute a flow and manages its execution. */ | ||
trait FlowExecution { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each logical dataflow (DataflowGraph::Flow) would be translated into a concrete flow execution plan (FlowExecution) when we start the pipeline execution (in FlowPlanner.scala).
Here we define the interface for FlowExecution
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowPlanner.scala
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineConf.scala
Show resolved
Hide resolved
/** | ||
* Configuration for the pipeline system, which is read from the Spark session's SQL configuration. | ||
*/ | ||
class PipelineConf(spark: SparkSession) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we can move all these changes to SQLConf and use SQLConf.get to access the configs. We can have a follow-up for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG! Mark the PipelineConf as deprecated with a TODO!
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineConf.scala
Outdated
Show resolved
Hide resolved
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContext.scala
Outdated
Show resolved
Hide resolved
.structTypeToV2Columns(new StructType().add("x", IntegerType)) | ||
) | ||
} | ||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this line
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala
Outdated
Show resolved
Hide resolved
…aph/MaterializeTablesSuite.scala
…athan/spark into graph-execution
I noticed that the build is failing on the docs generation step:
I pushed a change that reverts all the changes in this PR to Flow.scala, FlowAnalysis.scala, and elements.scala, which appeared to be all docstring changes. The docstrings in those files were working fine before this PR, so I think safest to just leave them as they are. |
All test passed in https://github.com/SCHJonathan/spark/runs/43591841901 |
assert(table3.partitioning().toSeq == Seq(Expressions.identity("x"))) | ||
} | ||
|
||
test("specifying partition column different from existing partitioned table") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new test make maven daily test failed:
We can reproduce the issue using the following commands:
build/mvn clean install -DskipTests -pl sql/pipelines -am
build/mvn test -pl sql/pipelines
- specifying partition column different from existing partitioned table *** FAILED ***
org.apache.spark.SparkException: [INTERNAL_ERROR] Eagerly executed command failed. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace. SQLSTATE: XX000
at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:647)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:660)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:171)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:476)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:476)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
...
Cause: java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.SparkSession.sessionState()" because "sparkSession" is null
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:156)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:79)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:77)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:88)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
...
@SCHJonathan Could you fix this problem when you have some free time?
also cc @cloud-fan @gengliangwang @sryza
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this @LuciferYang – I've got a fix I will post soon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a fix: #51112
### What changes were proposed in this pull request? Fixes the issue identified here: #51050 (comment) ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` build/mvn clean install -DskipTests -pl sql/pipelines -am build/mvn test -pl sql/pipelines ``` ### Was this patch authored or co-authored using generative AI tooling? Closes #51112 from sryza/fix-mvn. Authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
… event logging ### What changes were proposed in this pull request? **See the flow chart describing the changes made in this PR: [flow chart link](https://lucid.app/lucidchart/c773b051-c634-4f0e-9a3c-a21e24ae540a/edit?viewport_loc=-4594%2C-78%2C5884%2C3280%2C0_0&invitationId=inv_3f036b9d-1a2a-4dd9-bf50-084cd90e5460)** As described in [Declarative Pipelines SPIP](https://docs.google.com/document/d/1PsSTngFuRVEOvUGzp_25CQL1yfzFHFr02XdMfQ7jOM4/edit?tab=t.0#heading=h.9g6a5f8v6xig), after we parse user's code and represent datasets and dataflows in a `DataflowGraph` (from PR apache#51003), we execute the `DataflowGraph`. This PR implements this execution. ## Main execution steps inside a pipeline run ### Step 1: Initialize the raw `DataflowGraph` In `PipelineExecution::runPipeline()`, we first initialize the dataflow graph by topologically sorting the dependencies and also figuring out the expected metadata (e.g., schema) for each dataset (`DataflowGraph::resolve()`). Also, we run some pre-flight validations to caught some early errors like circular dependencies, create a streaming table with batch data source, etc (`DataflowGraph::validate()`). ### Step 2: Materialize datasets defined in the `DataflowGraph` to the catalog After the graph is topologically sorted and validated and every dataset / flow has correct metadata populated, we publish the corresponding dataset in the catalog (which could be Hive, UC, or others) in `DatasetManager::materializeDatasets()`. For example, for each Materialized View and Table, it would register a empty table in the catalog with correct metadata (e.g., table schema, table properties, etc). If the table already exists, we alter it to have the correct metadata. ### Step 3: Populate data to the registered tables by executing the `DataflowGraph` After datasets have been registered to the catalog, inside `TriggeredGraphExecution`, we transform each dataflow defined in the `DataflowGraph` into an actual execution plan to run the actual workload and populate the data to the empty table (we transform `Flow` into `FlowExecution` through `FlowPlanner`). Each `FlowExecution` will be executed in topological order based on the sorted `DataflowGraph`, and we parallelize the execution as much as possible. Depending on the type of error, failed flows may be retried as part of execution. ## Main components of this PR: - **Flow execution** represents the execution of an individual flow in the dataflow graph. Relevant classes: - `FlowExecution` - `StreamingFlowExecution` - `BatchFlowExecution` - `FlowPlanner` – constructs `FlowExecution`s from `Flow` objects - **Graph execution** represents the execution of an entire dataflow graph, i.e. step 3 in the set of steps above. In the future, we will add a `ContinuousGraphExecution` class, which executes all the streams at once instead of in topological order. Relevant classes: - `GraphExecution` - `TriggeredGraphExecution` – executes flows in topological order, handles retries when necessary - `BackoffStrategy` – used for retries - `UncaughtExceptionHandler` - `PipelineConf` – a few configurations that control graph execution behavior - **Pipeline execution** represents a full "run" including all three execution steps above: graph resolution, catalog materialization, and graph execution. Relevant classes: - `PipelineExecution` - `RunTerminationReason` - `PipelineUpdateContext` – represents the parameters to a pipeline execution - `PipelineUpdateContextImpl` - **Catalog materialization** step 2 in the execution steps described above – represents datasets in the dataflow graph in the catalog. Uses DSv2 APIs. - `DatasetManager` - **Graph filtration / selection** allows selecting just a subset of the graph to be executed. In a followup, we will add the plumbing that allows specifying this from the CLI. Relevant classes: - `GraphFilter` - **Events** track the progress of a pipeline execution. The event messages are sent to the client for console logging, and the structured events are available for assertions inside tests. Eventually, these could power info in the Spark UI as well. Relevant classes: - `FlowProgressEventLogger` - `PipelineRunEventBuffer` - `StreamListener` - `ConstructPipelineEvent` ### Why are the changes needed? This PR implemented the core functionality to executing a Declarative Pipeline ### Does this PR introduce _any_ user-facing change? It introduces new behavior, but does not modify existing behavior. ### How was this patch tested? New unit test suite: - `TriggeredGraphExecutionSuite`: tests end-to-end executions of the pipeline under different scenarios (happy path, failure path, etc) and validate proper data has been written and proper event log is emitted. - `MaterializeTablesSuite`: tests the logic for registering datasets in the catalog. Augment existing test suites: - `ConstructPipelineEventSuite` and `PipelineEventSuite` to validate the new FlowProgress event log we're introducing. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51050 from SCHJonathan/graph-execution. Lead-authored-by: Yuheng Chang <jonathanyuheng@gmail.com> Co-authored-by: Gengliang Wang <gengliang@apache.org> Co-authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request? Fixes the issue identified here: apache#51050 (comment) ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` build/mvn clean install -DskipTests -pl sql/pipelines -am build/mvn test -pl sql/pipelines ``` ### Was this patch authored or co-authored using generative AI tooling? Closes apache#51112 from sryza/fix-mvn. Authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
### What changes were proposed in this pull request? As suggested by [the comment](#51050 (comment)), directly use `SqlConf` instead of maintaining the thin `PipelineConf` wrapper instead. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #51137 from SCHJonathan/jonathan-chang_data/deprecate-pipeline-conf. Authored-by: Yuheng Chang <jonathanyuheng@gmail.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
What changes were proposed in this pull request?
See the flow chart describing the changes made in this PR: flow chart link
As described in Declarative Pipelines SPIP, after we parse user's code and represent datasets and dataflows in a
DataflowGraph
(from PR #51003), we execute theDataflowGraph
. This PR implements this execution.Main execution steps inside a pipeline run
Step 1: Initialize the raw
DataflowGraph
In
PipelineExecution::runPipeline()
, we first initialize the dataflow graph by topologically sorting the dependencies and also figuring out the expected metadata (e.g., schema) for each dataset (DataflowGraph::resolve()
). Also, we run some pre-flight validations to caught some early errors like circular dependencies, create a streaming table with batch data source, etc (DataflowGraph::validate()
).Step 2: Materialize datasets defined in the
DataflowGraph
to the catalogAfter the graph is topologically sorted and validated and every dataset / flow has correct metadata populated, we publish the corresponding dataset in the catalog (which could be Hive, UC, or others) in
DatasetManager::materializeDatasets()
. For example, for each Materialized View and Table, it would register a empty table in the catalog with correct metadata (e.g., table schema, table properties, etc). If the table already exists, we alter it to have the correct metadata.Step 3: Populate data to the registered tables by executing the
DataflowGraph
After datasets have been registered to the catalog, inside
TriggeredGraphExecution
, we transform each dataflow defined in theDataflowGraph
into an actual execution plan to run the actual workload and populate the data to the empty table (we transformFlow
intoFlowExecution
throughFlowPlanner
).Each
FlowExecution
will be executed in topological order based on the sortedDataflowGraph
, and we parallelize the execution as much as possible. Depending on the type of error, failed flows may be retried as part of execution.Main components of this PR:
FlowExecution
StreamingFlowExecution
BatchFlowExecution
FlowPlanner
– constructsFlowExecution
s fromFlow
objectsContinuousGraphExecution
class, which executes all the streams at once instead of in topological order. Relevant classes:GraphExecution
TriggeredGraphExecution
– executes flows in topological order, handles retries when necessaryBackoffStrategy
– used for retriesUncaughtExceptionHandler
PipelineConf
– a few configurations that control graph execution behaviorPipelineExecution
RunTerminationReason
PipelineUpdateContext
– represents the parameters to a pipeline executionPipelineUpdateContextImpl
DatasetManager
GraphFilter
FlowProgressEventLogger
PipelineRunEventBuffer
StreamListener
ConstructPipelineEvent
Why are the changes needed?
This PR implemented the core functionality to executing a Declarative Pipeline
Does this PR introduce any user-facing change?
It introduces new behavior, but does not modify existing behavior.
How was this patch tested?
New unit test suite:
TriggeredGraphExecutionSuite
: tests end-to-end executions of the pipeline under different scenarios (happy path, failure path, etc) and validate proper data has been written and proper event log is emitted.MaterializeTablesSuite
: tests the logic for registering datasets in the catalog.Augment existing test suites:
ConstructPipelineEventSuite
andPipelineEventSuite
to validate the new FlowProgress event log we're introducing.Was this patch authored or co-authored using generative AI tooling?
No