Skip to content

Conversation

@SCHJonathan
Copy link
Contributor

@SCHJonathan SCHJonathan commented Nov 12, 2025

What changes were proposed in this pull request?

This PR adds support for spark.sql(...) Python API inside query functions for Spark Declarative Pipelines. Users can now use spark.sql(...) to define query functions, and dependencies are correctly tracked.

Example usage:

@dp.materialized_view()
def source():
    return spark.sql("SELECT * FROM RANGE(5)")

@dp.materialized_view()
def target():
    return spark.sql("SELECT * FROM source")

This PR also adds restrictions on the set of SQL commands users can execute. Unsupported commands (e.g., spark.sql("CREATE TABLE ...")) inside query functions will raise an error.

Implementation details:

  1. Added PipelineAnalysisContext to Spark Connect's user context extensions, enabling the server to identify requests originating from Spark Declarative Pipelines and apply appropriate restrictions.
  2. The flow_name field in PipelineAnalysisContext determines execution behavior:
    • Inside query functions (flow_name is set): Spark Connect server treats spark.sql() as a no-op and returns the raw logical plan to SDP for deferred analysis as part of the Dataflow Graph.
    • Outside query functions (flow_name is empty): Spark Connect server eagerly executes the command, but only SDP-allowlisted commands are permitted.

Why are the changes needed?

spark.sql(...) is a common and intuitive pattern for users who are more familiar with SQL to define query functions. Supporting this API improves usability and allows SQL-first developers to work more naturally with Spark Declarative Pipelines.

Does this PR introduce any user-facing change?

Yes. Previously, spark.sql(...) inside query functions was not supported and users would see an ATTEMPT_ANALYSIS_IN_PIPELINE_QUERY_FUNCTION exception. This PR lifts that restriction.

How was this patch tested?

New test cases in PythonPipelineSuite unit test

Was this patch authored or co-authored using generative AI tooling?

No

Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a bunch of code cleanup changes here that seem great but are outside the critical path of the main goal of this PR (supporting spark.sql inside pipelines). Would it be difficult to move those changes into a separate PR to reduce risk?

@SCHJonathan
Copy link
Contributor Author

SCHJonathan commented Nov 12, 2025

There are a bunch of code cleanup changes here that seem great but are outside the critical path of the main goal of this PR (supporting spark.sql inside pipelines). Would it be difficult to move those changes into a separate PR to reduce risk?

@sryza I will polish up the PR more to reflect this but unfortunately I think most of the changes are necessary:

  1. We currently don't support eager analysis / execution outside flow function that needs to go through pipeline analysis (e.g., spark.sql("SELECT * FROM external_table") outside the flow function or spark.read.table("external_table").schema). They need to go through pipeline analysis otherwise identifier won't be correctly qualified with current catalog / schema tracked inside the pipeline. I introduced a ExternalQueryAnalysisContext to handle that
  2. Currently, changing current catalog / schema is a SQL only concept and related current catalog / schema tracking logic is inside SqlGraphRegisterationContext, I need to port that to GraphRegisterationContext as Python is using that.
  3. There are a few unrelated format change caused by my local scalafmt, will revert these before requesting formal review

@dongjoon-hyun dongjoon-hyun marked this pull request as draft November 12, 2025 23:31
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @SCHJonathan .

If you don't mind, please file a JIRA issue in the ASF community repository. It will help you prevent potential accidents like the following commits.

  • c707f59 Add PipelineAnalysisContext message to support pipeline analysis during Spark Connect query execution
  • f03c644 Fix: SparkML-connect can't load SparkML (legacy mode) saved model

@SCHJonathan
Copy link
Contributor Author

@dongjoon-hyun Absolutely! Thanks for the reminding!

@sryza
Copy link
Contributor

sryza commented Nov 13, 2025

I think there's an existing JIRA for this: https://issues.apache.org/jira/browse/SPARK-54020

@dongjoon-hyun
Copy link
Member

I think there's an existing JIRA for this: https://issues.apache.org/jira/browse/SPARK-54020

Please lead the contributor to update the PR title before starting review, @sryza .

@SCHJonathan SCHJonathan force-pushed the jonathan-chang_data/spark-sql branch from 17800b5 to e515b85 Compare November 13, 2025 03:13
@SCHJonathan SCHJonathan changed the title Jonathan chang data/spark sql [SPARK-54020] Support spark.sql(...) Python API for Spark Declarative Pipeline Nov 13, 2025
@SCHJonathan SCHJonathan changed the title [SPARK-54020] Support spark.sql(...) Python API for Spark Declarative Pipeline [SPARK-54020] Support spark.sql(...) Python API inside query functions for Spark Declarative Pipeline Nov 13, 2025
@SCHJonathan SCHJonathan requested a review from sryza November 13, 2025 03:36
@dongjoon-hyun dongjoon-hyun marked this pull request as ready for review November 13, 2025 04:32
Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this @SCHJonathan ! This closes a significant issue with declarative pipelines that would be great to get in this week and have fixed for Spark 4.1.

@SCHJonathan SCHJonathan requested a review from sryza November 13, 2025 06:08
Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I'll wait until the underlying user context extensions PR gets merged before merging this.



@contextmanager
def block_spark_connect_execution_and_analysis() -> Generator[None, None, None]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have the context on the server side, it might make more sense to block these operations there – then we don't need to replicate this weird monkeypatching logic across all the clients when we add support for other languages. Doesn't need to be part of this PR though.

@dongjoon-hyun
Copy link
Member

Could you make the CI happy, @SCHJonathan ? There are several test pipeline failures.

@SCHJonathan
Copy link
Contributor Author

Could you make the CI happy, @SCHJonathan ? There are several test pipeline failures.

Absolutely, just fixed

@dongjoon-hyun
Copy link
Member

Thank you. Could you check these too?

[info] *** 26 TESTS FAILED ***
[error] Failed tests:
[error] 	org.apache.spark.sql.connect.pipelines.PythonPipelineSuite
[error] 	org.apache.spark.sql.connect.pipelines.SparkDeclarativePipelinesServerSuite

@dongjoon-hyun
Copy link
Member

Sounds good to me, too.

@github-actions github-actions bot added the INFRA label Nov 14, 2025
@dongjoon-hyun
Copy link
Member

It turns out we revealed another failures which Python failures hide from us.

[info] *** 21 TESTS FAILED ***
[error] Failed tests:
[error] 	org.apache.spark.sql.connect.pipelines.EndToEndAPISuite

@SCHJonathan
Copy link
Contributor Author

SCHJonathan commented Nov 14, 2025

[PIPELINE_STORAGE_ROOT_INVALID] Pipeline storage root must be an absolute path with a URI scheme (e.g., file://, s3a://, hdfs://). Got: `/home/runner/work/spark/spark/target/tmp/spark-06c9bfe0-9410-4887-a376-2eec929a70de/storage`. SQLSTATE: 42K03
Working on a fix

@SCHJonathan
Copy link
Contributor Author

@dongjoon-hyun , validate the CI fix works. File a ticket and create a CI fix PR: #53058

dongjoon-hyun pushed a commit that referenced this pull request Nov 14, 2025
…ndard==0.25.0`

### What changes were proposed in this pull request?

In #53024 (comment), PR CI Python unit tests failed due to
```
pyspark.errors.exceptions.base.PySparkImportError: [PACKAGE_NOT_INSTALLED] zstandard >= 0.25.0 must be installed; however, it was not found.
```
This PR add the required dependency to the pre-merge CI.

### Why are the changes needed?

Recover Python unit tests CI

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

PR #53024 Python CI back to healthy with this change

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #53058 from SCHJonathan/jonathan-chang_data/fix-python-ci-dep.

Authored-by: Yuheng Chang <jonathanyuheng@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
dongjoon-hyun pushed a commit that referenced this pull request Nov 14, 2025
…ndard==0.25.0`

### What changes were proposed in this pull request?

In #53024 (comment), PR CI Python unit tests failed due to
```
pyspark.errors.exceptions.base.PySparkImportError: [PACKAGE_NOT_INSTALLED] zstandard >= 0.25.0 must be installed; however, it was not found.
```
This PR add the required dependency to the pre-merge CI.

### Why are the changes needed?

Recover Python unit tests CI

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

PR #53024 Python CI back to healthy with this change

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #53058 from SCHJonathan/jonathan-chang_data/fix-python-ci-dep.

Authored-by: Yuheng Chang <jonathanyuheng@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit a916690)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@github-actions github-actions bot removed the INFRA label Nov 14, 2025
test("reading external datasets outside query function works") {
sql("CREATE TABLE spark_catalog.default.src AS SELECT * FROM RANGE(5)")
val graph = buildGraph(s"""
|spark_sql_df = spark.sql("SELECT * FROM spark_catalog.default.src")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation?

@SCHJonathan
Copy link
Contributor Author

@sryza @dongjoon-hyun Hi all, managed to get a all green CI, and it's ready to be merged

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @SCHJonathan and @sryza .

Merged to master/4.1 for Apache Spark 4.1.0.

dongjoon-hyun pushed a commit that referenced this pull request Nov 15, 2025
…ons for Spark Declarative Pipeline

### What changes were proposed in this pull request?

This PR adds support for `spark.sql(...)` Python API inside query functions for Spark Declarative Pipelines. Users can now use `spark.sql(...)` to define query functions, and dependencies are correctly tracked.

**Example usage:**
```python
dp.materialized_view()
def source():
    return spark.sql("SELECT * FROM RANGE(5)")

dp.materialized_view()
def target():
    return spark.sql("SELECT * FROM source")
```

This PR also adds restrictions on the set of SQL commands users can execute. Unsupported commands (e.g., `spark.sql("CREATE TABLE ...")`) inside query functions will raise an error.

**Implementation details:**
1. Added `PipelineAnalysisContext` to Spark Connect's user context extensions, enabling the server to identify requests originating from Spark Declarative Pipelines and apply appropriate restrictions.
2. The `flow_name` field in `PipelineAnalysisContext` determines execution behavior:
   - **Inside query functions** (`flow_name` is set): Spark Connect server treats `spark.sql()` as a no-op and returns the raw logical plan to SDP for deferred analysis as part of the Dataflow Graph.
   - **Outside query functions** (`flow_name` is empty): Spark Connect server eagerly executes the command, but only SDP-allowlisted commands are permitted.

### Why are the changes needed?

`spark.sql(...)` is a common and intuitive pattern for users who are more familiar with SQL to define query functions. Supporting this API improves usability and allows SQL-first developers to work more naturally with Spark Declarative Pipelines.

### Does this PR introduce _any_ user-facing change?

Yes. Previously, `spark.sql(...)` inside query functions was not supported and users would see an `ATTEMPT_ANALYSIS_IN_PIPELINE_QUERY_FUNCTION` exception. This PR lifts that restriction.

### How was this patch tested?

New test cases in `PythonPipelineSuite` unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #53024 from SCHJonathan/jonathan-chang_data/spark-sql.

Authored-by: Yuheng Chang <jonathanyuheng@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit cc72c64)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@dongjoon-hyun
Copy link
Member

@SCHJonathan . What is your Apache JIRA ID? I want to assign SPARK-54020 to you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants