Skip to content

Conversation

@LuciferYang
Copy link
Contributor

What changes were proposed in this pull request?

This pr aims to make the test cases in PythonPipelineSuite perform a default check for PythonTestDepsChecker.isConnectDepsAvailable.

Why are the changes needed?

Simplify the dependency checks for Python modules in test cases within PythonPipelineSuite.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Pass Github Actions

Was this patch authored or co-authored using generative AI tooling?

No

@LuciferYang LuciferYang marked this pull request as draft November 18, 2025 03:25
@LuciferYang LuciferYang changed the title [CONNECT][TESTS] Make PythonPipelineSuite perform a default check for PythonTestDepsChecker.isConnectDepsAvailable [SPARK-XXXX][CONNECT][TESTS] Make PythonPipelineSuite perform a default check for PythonTestDepsChecker.isConnectDepsAvailable Nov 18, 2025
@LuciferYang LuciferYang changed the title [SPARK-XXXX][CONNECT][TESTS] Make PythonPipelineSuite perform a default check for PythonTestDepsChecker.isConnectDepsAvailable [SPARK-54375][CONNECT][TESTS][FOLLOWUP] Make PythonPipelineSuite perform a default check for PythonTestDepsChecker.isConnectDepsAvailable Nov 18, 2025
@LuciferYang LuciferYang marked this pull request as ready for review November 18, 2025 03:29
Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this @LuciferYang.


override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
if (PythonTestDepsChecker.isConnectDepsAvailable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would using assume here work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me have a try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the following modification methods:

  override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
      pos: Position): Unit = {
    assume(PythonTestDepsChecker.isConnectDepsAvailable)
    super.test(testName, testTags: _*)(testFun)
  }

then run build/sbt clean "connect/testOnly org.apache.spark.sql.connect.pipelines.PythonPipelineSuite" but failed:

[info] PythonPipelineSuite:
[info] org.apache.spark.sql.connect.pipelines.PythonPipelineSuite *** ABORTED *** (9 milliseconds)
[info]   org.apache.spark.sql.connect.PythonTestDepsChecker.isConnectDepsAvailable was false (PythonPipelineSuite.scala:121)
[info]   org.scalatest.exceptions.TestCanceledException:
[info]   at org.scalatest.Assertions.newTestCanceledException(Assertions.scala:475)
[info]   at org.scalatest.Assertions.newTestCanceledException$(Assertions.scala:474)
[info]   at org.scalatest.Assertions$.newTestCanceledException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssume(Assertions.scala:1310)
[info]   at org.apache.spark.sql.connect.pipelines.PythonPipelineSuite.test(PythonPipelineSuite.scala:1110)
[info]   at org.apache.spark.sql.connect.pipelines.PythonPipelineSuite.<init>(PythonPipelineSuite.scala:121)
[info]   at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
[info]   at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
[info]   at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
[info]   at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
[info]   at java.base/java.lang.reflect.ReflectAccess.newInstance(ReflectAccess.java:128)
[info]   at java.base/jdk.internal.reflect.ReflectionFactory.newInstance(ReflectionFactory.java:347)
[info]   at java.base/java.lang.Class.newInstance(Class.java:645)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:454)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)
[info] Run completed in 1 second, 500 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 1
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] *** 1 SUITE ABORTED ***
[error] Error during tests:
[error] 	org.apache.spark.sql.connect.pipelines.PythonPipelineSuite
[error] (connect / Test / testOnly) sbt.TestsFailedException: Tests unsuccessful

It seems that assume cannot be used directly. @sryza

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a better way of this? @sryza

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm when I tried this it succeeded (and actually ran the tests). Is there something you need to do to make the connect deps not available?

Copy link
Contributor Author

@LuciferYang LuciferYang Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you verified the scenario where at least one pyconnect dependency is missing to ensure that PythonTestDepsChecker.isConnectDepsAvailable returns false? In this scenario, all the tests in this file should be skipped rather than executed. However, if we switch to the pattern you proposed, an exception will be thrown in my test environment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: Using incubator modules: jdk.incubator.vector
Traceback (most recent call last):
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/connect/utils.py", line 105, in require_minimum_zstandard_version
    import zstandard  # noqaOnly 3s
    ^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'zstandard'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/connect/utils.py", line 40, in check_dependencies
    require_minimum_zstandard_version()
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/connect/utils.py", line 107, in require_minimum_zstandard_version
    raise PySparkImportError(
pyspark.errors.exceptions.base.PySparkImportError: [PACKAGE_NOT_INSTALLED] zstandard >= 0.25.0 must be installed; however, it was not found.
[info] PythonPipelineSuite:
[info] org.apache.spark.sql.connect.pipelines.PythonPipelineSuite *** ABORTED *** (8 milliseconds)
[info]   org.apache.spark.sql.connect.PythonTestDepsChecker.isConnectDepsAvailable was false (PythonPipelineSuite.scala:121)
[info]   org.scalatest.exceptions.TestCanceledException:
[info]   at org.scalatest.Assertions.newTestCanceledException(Assertions.scala:475)
[info]   at org.scalatest.Assertions.newTestCanceledException$(Assertions.scala:474)
[info]   at org.scalatest.Assertions$.newTestCanceledException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssume(Assertions.scala:1310)
[info]   at org.apache.spark.sql.connect.pipelines.PythonPipelineSuite.test(PythonPipelineSuite.scala:1110)
[info]   at org.apache.spark.sql.connect.pipelines.PythonPipelineSuite.<init>(PythonPipelineSuite.scala:121)
[info]   at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
[info]   at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
[info]   at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
[info]   at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
[info]   at java.base/java.lang.reflect.ReflectAccess.newInstance(ReflectAccess.java:128)
[info]   at java.base/jdk.internal.reflect.ReflectionFactory.newInstance(ReflectionFactory.java:347)
[info]   at java.base/java.lang.Class.newInstance(Class.java:645)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:454)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)
[info] Run completed in 4 seconds, 395 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 1
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] *** 1 SUITE ABORTED ***
[error] Error during tests:
[error] 	org.apache.spark.sql.connect.pipelines.PythonPipelineSuite
[error] (connect / Test / testOnly) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 205 s (0:03:25.0), completed 2025年11月19日 上午8:25:50
yangjie01@localhost spark-mine-sbt % git diff
diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
index 98b33c3296f..9d9aee4e994 100644
--- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
+++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
@@ -1107,10 +1107,7 @@ class PythonPipelineSuite
 
   override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
       pos: Position): Unit = {
-    if (PythonTestDepsChecker.isConnectDepsAvailable) {
-      super.test(testName, testTags: _*)(testFun)
-    } else {
-      super.ignore(testName, testTags: _*)(testFun)
-    }
+    assume(PythonTestDepsChecker.isConnectDepsAvailable)
+    super.test(testName, testTags: _*)(testFun)
   }
 }
yangjie01@localhost spark-mine-sbt % build/sbt clean "connect/testOnly org.apache.spark.sql.connect.pipelines.PythonPipelineSuite"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this pr, when there is a missing pyconnect dependency:

WARNING: Using incubator modules: jdk.incubator.vector
Traceback (most recent call last):
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/connect/utils.py", line 105, in require_minimum_zstandard_version
    import zstandard  # noqaOnly 3s
    ^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'zstandard'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/connect/utils.py", line 40, in check_dependencies
    require_minimum_zstandard_version()
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/connect/utils.py", line 107, in require_minimum_zstandard_version
    raise PySparkImportError(
pyspark.errors.exceptions.base.PySparkImportError: [PACKAGE_NOT_INSTALLED] zstandard >= 0.25.0 must be installed; however, it was not found.
[info] PythonPipelineSuite:
[info] - basic !!! IGNORED !!!
[info] - failed flow progress event has correct python source code location !!! IGNORED !!!
[info] - flow progress events have correct python source code location !!! IGNORED !!!
[info] - basic with inverted topological order !!! IGNORED !!!
[info] - flows !!! IGNORED !!!
[info] - external sink !!! IGNORED !!!
[info] - referencing internal datasets !!! IGNORED !!!
[info] - referencing external datasets !!! IGNORED !!!
[info] - referencing internal datasets failed !!! IGNORED !!!
[info] - referencing external datasets failed !!! IGNORED !!!
[info] - reading external datasets outside query function works !!! IGNORED !!!
[info] - reading internal datasets outside query function that don't trigger eager analysis or execution !!! IGNORED !!!
[info] - reading internal datasets outside query function that trigger eager analysis or execution will fail (spark.sql("SELECT * FROM src")) !!! IGNORED !!!
[info] - reading internal datasets outside query function that trigger eager analysis or execution will fail (spark.read.table("src").collect()) !!! IGNORED !!!
[info] - create dataset with the same name will fail !!! IGNORED !!!
[info] - create datasets with fully/partially qualified names !!! IGNORED !!!
[info] - create datasets with three part names !!! IGNORED !!!
[info] - temporary views works !!! IGNORED !!!
[info] - create named flow with multipart name will fail !!! IGNORED !!!
[info] - create flow with multipart target and no explicit name succeeds !!! IGNORED !!!
[info] - create named flow with multipart target succeeds !!! IGNORED !!!
[info] - groupby and rollup works with internal datasets, referencing with (col, str) !!! IGNORED !!!
[info] - MV/ST with partition columns works !!! IGNORED !!!
[info] - create pipeline without table will throw RUN_EMPTY_PIPELINE exception !!! IGNORED !!!
[info] - create pipeline with only temp view will throw RUN_EMPTY_PIPELINE exception !!! IGNORED !!!
[info] - create pipeline with only flow will throw RUN_EMPTY_PIPELINE exception !!! IGNORED !!!
[info] - table with string schema !!! IGNORED !!!
[info] - table with StructType schema !!! IGNORED !!!
[info] - string schema validation error - schema mismatch !!! IGNORED !!!
[info] - StructType schema validation error - schema mismatch !!! IGNORED !!!
[info] - empty cluster_by list should work and create table with no clustering !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (SET CATALOG some_catalog) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (USE SCHEMA some_schema) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (SET `test_conf` = `true`) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (CREATE TABLE some_table (id INT)) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (CREATE VIEW some_view AS SELECT * FROM some_table) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (INSERT INTO some_table VALUES (1)) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (ALTER TABLE some_table RENAME TO some_new_table) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (CREATE NAMESPACE some_namespace) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (DROP VIEW some_view) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (CREATE MATERIALIZED VIEW some_view AS SELECT * FROM some_table) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (CREATE STREAMING TABLE some_table AS SELECT * FROM some_table) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (SET CATALOG some_catalog) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (USE SCHEMA some_schema) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (SET `test_conf` = `true`) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (CREATE TABLE some_table (id INT)) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (CREATE VIEW some_view AS SELECT * FROM some_table) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (INSERT INTO some_table VALUES (1)) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (ALTER TABLE some_table RENAME TO some_new_table) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (CREATE NAMESPACE some_namespace) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (DROP VIEW some_view) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (CREATE MATERIALIZED VIEW some_view AS SELECT * FROM some_table) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (CREATE STREAMING TABLE some_table AS SELECT * FROM some_table) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (DESCRIBE TABLE spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW TABLES) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW TBLPROPERTIES spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW NAMESPACES) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW COLUMNS FROM spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW FUNCTIONS) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW VIEWS) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW CATALOGS) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW CREATE TABLE spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SELECT * FROM RANGE(5)) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SELECT * FROM spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (DESCRIBE TABLE spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW TABLES) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW TBLPROPERTIES spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW NAMESPACES) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW COLUMNS FROM spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW FUNCTIONS) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW VIEWS) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW CATALOGS) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW CREATE TABLE spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SELECT * FROM RANGE(5)) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SELECT * FROM spark_catalog.default.src) !!! IGNORED !!!
[info] Run completed in 3 seconds, 595 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 75, pending 0
[info] No tests were executed.
[success] Total time: 190 s (0:03:10.0), completed 2025年11月19日 上午8:40:22
yangjie01@localhost spark-mine-sbt % build/sbt clean "connect/testOnly org.apache.spark.sql.connect.pipelines.PythonPipelineSuite"

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the direction and for Apache Spark 4.1.0.

Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving in current form. I'll try to experiment more with assume if I find the time.

LuciferYang added a commit that referenced this pull request Nov 19, 2025
…rform a default check for `PythonTestDepsChecker.isConnectDepsAvailable`

### What changes were proposed in this pull request?
This pr aims to make the test cases in `PythonPipelineSuite` perform a default check for `PythonTestDepsChecker.isConnectDepsAvailable`.

### Why are the changes needed?
Simplify the dependency checks for Python modules in test cases within `PythonPipelineSuite`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass Github Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #53106 from LuciferYang/refactor-PythonPipelineSuite.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
(cherry picked from commit 1aa8d5a)
Signed-off-by: yangjie01 <yangjie01@baidu.com>
@LuciferYang
Copy link
Contributor Author

Merged into master and branch-4.1. Thanks @dongjoon-hyun @HyukjinKwon @zhengruifeng and @sryza ~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants