Skip to content

[SPARK-52346][SQL] Declarative Pipeline DataflowGraph execution and event logging #51050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from

Conversation

SCHJonathan
Copy link
Contributor

@SCHJonathan SCHJonathan commented May 29, 2025

What changes were proposed in this pull request?

See the flow chart describing the changes made in this PR: flow chart link

As described in Declarative Pipelines SPIP, after we parse user's code and represent datasets and dataflows in a DataflowGraph (from PR #51003), we execute the DataflowGraph. This PR implements this execution.

Main execution steps inside a pipeline run

Step 1: Initialize the raw DataflowGraph

In PipelineExecution::runPipeline(), we first initialize the dataflow graph by topologically sorting the dependencies and also figuring out the expected metadata (e.g., schema) for each dataset (DataflowGraph::resolve()). Also, we run some pre-flight validations to caught some early errors like circular dependencies, create a streaming table with batch data source, etc (DataflowGraph::validate()).

Step 2: Materialize datasets defined in the DataflowGraph to the catalog

After the graph is topologically sorted and validated and every dataset / flow has correct metadata populated, we publish the corresponding dataset in the catalog (which could be Hive, UC, or others) in DatasetManager::materializeDatasets(). For example, for each Materialized View and Table, it would register a empty table in the catalog with correct metadata (e.g., table schema, table properties, etc). If the table already exists, we alter it to have the correct metadata.

Step 3: Populate data to the registered tables by executing the DataflowGraph

After datasets have been registered to the catalog, inside TriggeredGraphExecution, we transform each dataflow defined in the DataflowGraph into an actual execution plan to run the actual workload and populate the data to the empty table (we transform Flow into FlowExecution through FlowPlanner).

Each FlowExecution will be executed in topological order based on the sorted DataflowGraph, and we parallelize the execution as much as possible. Depending on the type of error, failed flows may be retried as part of execution.

Main components of this PR:

  • Flow execution represents the execution of an individual flow in the dataflow graph. Relevant classes:
    • FlowExecution
    • StreamingFlowExecution
    • BatchFlowExecution
    • FlowPlanner – constructs FlowExecutions from Flow objects
  • Graph execution represents the execution of an entire dataflow graph, i.e. step 3 in the set of steps above. In the future, we will add a ContinuousGraphExecution class, which executes all the streams at once instead of in topological order. Relevant classes:
    • GraphExecution
    • TriggeredGraphExecution – executes flows in topological order, handles retries when necessary
    • BackoffStrategy – used for retries
    • UncaughtExceptionHandler
    • PipelineConf – a few configurations that control graph execution behavior
  • Pipeline execution represents a full "run" including all three execution steps above: graph resolution, catalog materialization, and graph execution. Relevant classes:
    • PipelineExecution
    • RunTerminationReason
    • PipelineUpdateContext – represents the parameters to a pipeline execution
    • PipelineUpdateContextImpl
  • Catalog materialization step 2 in the execution steps described above – represents datasets in the dataflow graph in the catalog. Uses DSv2 APIs.
    • DatasetManager
  • Graph filtration / selection allows selecting just a subset of the graph to be executed. In a followup, we will add the plumbing that allows specifying this from the CLI. Relevant classes:
    • GraphFilter
  • Events track the progress of a pipeline execution. The event messages are sent to the client for console logging, and the structured events are available for assertions inside tests. Eventually, these could power info in the Spark UI as well. Relevant classes:
    • FlowProgressEventLogger
    • PipelineRunEventBuffer
    • StreamListener
    • ConstructPipelineEvent

Why are the changes needed?

This PR implemented the core functionality to executing a Declarative Pipeline

Does this PR introduce any user-facing change?

It introduces new behavior, but does not modify existing behavior.

How was this patch tested?

New unit test suite:

  • TriggeredGraphExecutionSuite: tests end-to-end executions of the pipeline under different scenarios (happy path, failure path, etc) and validate proper data has been written and proper event log is emitted.
  • MaterializeTablesSuite: tests the logic for registering datasets in the catalog.

Augment existing test suites:

  • ConstructPipelineEventSuite and PipelineEventSuite to validate the new FlowProgress event log we're introducing.

Was this patch authored or co-authored using generative AI tooling?

No

@sryza
Copy link
Contributor

sryza commented Jun 2, 2025

Heads up: I just rebased this PR on the latest changes inside the underlying graph resolution PR.

@sryza
Copy link
Contributor

sryza commented Jun 2, 2025

I also just added some tests for DatasetManager inside MaterializeTablesSuite

"Please delete the table or change the declared partitioning to match its partitions."
],
"sqlState" : "42000"
},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition column is fixed once persisted, users needs to recreate the MV/ST to update the partition col

* @param context The context for the pipeline update.
* @return The graph with materialized tables.
*/
def materializeDatasets(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we loop through every tables defined in the graph and register them in the metastore with empty table if not exists . if table already eixsts, we update the table with latest definition

): DataflowGraph = {
val (_, refreshTableIdentsSet, fullRefreshTableIdentsSet) = {
DatasetManager.constructFullRefreshSet(resolvedDataflowGraph.tables, context)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refresh tables are tables needs to be updated during the current execution, by default it should be all but users can configure if they only update a subset of the table.

full fresh tables are tables needs to be completely recomputed, we will wipe out all the historical data and start from the new. This by default is None but can also be specified by users which tables they wanna full refresh

}

/** A [[FlowExecution]] specifies how to execute a flow and manages its execution. */
trait FlowExecution {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each logical dataflow (DataflowGraph::Flow) would be translated into a concrete flow execution plan (FlowExecution) when we start the pipeline execution (in FlowPlanner.scala).

Here we define the interface for FlowExecution

@SCHJonathan SCHJonathan requested review from cloud-fan and sryza June 5, 2025 18:43
/**
* Configuration for the pipeline system, which is read from the Spark session's SQL configuration.
*/
class PipelineConf(spark: SparkSession) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we can move all these changes to SQLConf and use SQLConf.get to access the configs. We can have a follow-up for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG! Mark the PipelineConf as deprecated with a TODO!

@SCHJonathan SCHJonathan requested a review from gengliangwang June 5, 2025 21:43
.structTypeToV2Columns(new StructType().add("x", IntegerType))
)
}
//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this line

@sryza
Copy link
Contributor

sryza commented Jun 6, 2025

I noticed that the build is failing on the docs generation step:

[error] /__w/spark/spark/sql/pipelines/target/java/org/apache/spark/sql/pipelines/graph/Input.java:7:1:  error: element not closed: code
[error]    * Returns a <code>DataFrame</code> that is a result of loading data from this Input<code>.
[error]                                                                                       ^
[error] /__w/spark/spark/sql/pipelines/target/java/org/apache/spark/sql/pipelines/graph/Input.java:9:1:  error: unexpected end tag: </code>
[error]    * @return Streaming or batch </code>DataFrame<code> of this Input's data.
[error]                                 ^
[error] /__w/spark/spark/sql/pipelines/target/java/org/apache/spark/sql/pipelines/graph/Input.java:9:1:  error: element not closed: code
[error]    * @return Streaming or batch </code>DataFrame<code> of this Input's data.

I pushed a change that reverts all the changes in this PR to Flow.scala, FlowAnalysis.scala, and elements.scala, which appeared to be all docstring changes. The docstrings in those files were working fine before this PR, so I think safest to just leave them as they are.

@gengliangwang
Copy link
Member

All test passed in https://github.com/SCHJonathan/spark/runs/43591841901
@SCHJonathan @sryza thanks for the work and detailed PR description! Merging to master.

assert(table3.partitioning().toSeq == Seq(Expressions.identity("x")))
}

test("specifying partition column different from existing partitioned table") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new test make maven daily test failed:

image

We can reproduce the issue using the following commands:

build/mvn clean install -DskipTests -pl sql/pipelines -am 
build/mvn test -pl sql/pipelines
- specifying partition column different from existing partitioned table *** FAILED ***
  org.apache.spark.SparkException: [INTERNAL_ERROR] Eagerly executed command failed. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace. SQLSTATE: XX000
  at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
  at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:647)
  at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:660)
  at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:171)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:476)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:476)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
  ...
  Cause: java.lang.NullPointerException: Cannot invoke "org.apache.spark.sql.SparkSession.sessionState()" because "sparkSession" is null
  at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:156)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:79)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:77)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:88)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
  at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
  at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
  at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
  ...

@SCHJonathan Could you fix this problem when you have some free time?

also cc @cloud-fan @gengliangwang @sryza

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this @LuciferYang – I've got a fix I will post soon

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a fix: #51112

gengliangwang pushed a commit that referenced this pull request Jun 9, 2025
### What changes were proposed in this pull request?

Fixes the issue identified here: #51050 (comment)

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

```
build/mvn clean install -DskipTests -pl sql/pipelines -am
build/mvn test -pl sql/pipelines
```

### Was this patch authored or co-authored using generative AI tooling?

Closes #51112 from sryza/fix-mvn.

Authored-by: Sandy Ryza <sandy.ryza@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
yhuang-db pushed a commit to yhuang-db/spark that referenced this pull request Jun 9, 2025
… event logging

### What changes were proposed in this pull request?

**See the flow chart describing the changes made in this PR: [flow chart link](https://lucid.app/lucidchart/c773b051-c634-4f0e-9a3c-a21e24ae540a/edit?viewport_loc=-4594%2C-78%2C5884%2C3280%2C0_0&invitationId=inv_3f036b9d-1a2a-4dd9-bf50-084cd90e5460)**

As described in [Declarative Pipelines SPIP](https://docs.google.com/document/d/1PsSTngFuRVEOvUGzp_25CQL1yfzFHFr02XdMfQ7jOM4/edit?tab=t.0#heading=h.9g6a5f8v6xig), after we parse user's code and represent datasets and dataflows in a `DataflowGraph` (from PR apache#51003), we execute the `DataflowGraph`. This PR implements this execution.

## Main execution steps inside a pipeline run

### Step 1: Initialize the raw `DataflowGraph`
In `PipelineExecution::runPipeline()`, we first initialize the dataflow graph by topologically sorting the dependencies and also figuring out the expected metadata (e.g., schema) for each dataset (`DataflowGraph::resolve()`). Also, we run some pre-flight validations to caught some early errors like circular dependencies, create a streaming table with batch data source, etc (`DataflowGraph::validate()`).

### Step 2: Materialize datasets defined in the `DataflowGraph` to the catalog
After the graph is topologically sorted and validated and every dataset / flow has correct metadata populated, we publish the corresponding dataset in the catalog (which could be Hive, UC, or others) in `DatasetManager::materializeDatasets()`. For example, for each Materialized View and Table, it would register a empty table in the catalog with correct metadata (e.g., table schema, table properties, etc). If the table already exists, we alter it to have the correct metadata.

### Step 3: Populate data to the registered tables by executing the `DataflowGraph`
After datasets have been registered to the catalog, inside `TriggeredGraphExecution`, we transform each dataflow defined in the `DataflowGraph` into an actual execution plan to run the actual workload and populate the data to the empty table (we transform `Flow` into `FlowExecution` through `FlowPlanner`).

Each `FlowExecution` will be executed in topological order based on the sorted `DataflowGraph`, and we parallelize the execution as much as possible. Depending on the type of error, failed flows may be retried as part of execution.

## Main components of this PR:

- **Flow execution** represents the execution of an individual flow in the dataflow graph. Relevant classes:
  - `FlowExecution`
  - `StreamingFlowExecution`
  - `BatchFlowExecution`
  - `FlowPlanner` – constructs `FlowExecution`s from `Flow` objects
- **Graph execution** represents the execution of an entire dataflow graph, i.e. step 3 in the set of steps above. In the future, we will add a `ContinuousGraphExecution` class, which executes all the streams at once instead of in topological order. Relevant classes:
  - `GraphExecution`
  - `TriggeredGraphExecution` – executes flows in topological order, handles retries when necessary
  - `BackoffStrategy` – used for retries
  - `UncaughtExceptionHandler`
  - `PipelineConf` – a few configurations that control graph execution behavior
- **Pipeline execution** represents a full "run" including all three execution steps above: graph resolution, catalog materialization, and graph execution. Relevant classes:
  - `PipelineExecution`
  - `RunTerminationReason`
  - `PipelineUpdateContext` – represents the parameters to a pipeline execution
  - `PipelineUpdateContextImpl`
- **Catalog materialization** step 2 in the execution steps described above – represents datasets in the dataflow graph in the catalog. Uses DSv2 APIs.
  - `DatasetManager`
- **Graph filtration / selection** allows selecting just a subset of the graph to be executed. In a followup, we will add the plumbing that allows specifying this from the CLI. Relevant classes:
  - `GraphFilter`
- **Events** track the progress of a pipeline execution. The event messages are sent to the client for console logging, and the structured events are available for assertions inside tests. Eventually, these could power info in the Spark UI as well. Relevant classes:
  - `FlowProgressEventLogger`
  - `PipelineRunEventBuffer`
  - `StreamListener`
  - `ConstructPipelineEvent`

### Why are the changes needed?

This PR implemented the core functionality to executing a Declarative Pipeline

### Does this PR introduce _any_ user-facing change?

It introduces new behavior, but does not modify existing behavior.

### How was this patch tested?

New unit test suite:
- `TriggeredGraphExecutionSuite`: tests end-to-end executions of the pipeline under different scenarios (happy path, failure path, etc) and validate proper data has been written and proper event log is emitted.
- `MaterializeTablesSuite`: tests the logic for registering datasets in the catalog.

Augment existing test suites:
- `ConstructPipelineEventSuite` and `PipelineEventSuite` to validate the new FlowProgress event log we're introducing.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#51050 from SCHJonathan/graph-execution.

Lead-authored-by: Yuheng Chang <jonathanyuheng@gmail.com>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Co-authored-by: Sandy Ryza <sandy.ryza@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
yhuang-db pushed a commit to yhuang-db/spark that referenced this pull request Jun 9, 2025
### What changes were proposed in this pull request?

Fixes the issue identified here: apache#51050 (comment)

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

```
build/mvn clean install -DskipTests -pl sql/pipelines -am
build/mvn test -pl sql/pipelines
```

### Was this patch authored or co-authored using generative AI tooling?

Closes apache#51112 from sryza/fix-mvn.

Authored-by: Sandy Ryza <sandy.ryza@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
gengliangwang pushed a commit that referenced this pull request Jun 9, 2025
### What changes were proposed in this pull request?

As suggested by [the comment](#51050 (comment)), directly use `SqlConf` instead of maintaining the thin `PipelineConf` wrapper instead.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #51137 from SCHJonathan/jonathan-chang_data/deprecate-pipeline-conf.

Authored-by: Yuheng Chang <jonathanyuheng@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants