Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable.ArrayBuffer
import scala.util.Try

import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.spark.api.python.PythonUtils
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -49,7 +52,6 @@ class PythonPipelineSuite
with EventVerificationTestHelpers {

def buildGraph(pythonText: String): DataflowGraph = {
assume(PythonTestDepsChecker.isConnectDepsAvailable)
val indentedPythonText = pythonText.linesIterator.map(" " + _).mkString("\n")
// create a unique identifier to allow identifying the session and dataflow graph
val customSessionIdentifier = UUID.randomUUID().toString
Expand Down Expand Up @@ -530,7 +532,6 @@ class PythonPipelineSuite
"eager analysis or execution will fail")(
Seq("""spark.sql("SELECT * FROM src")""", """spark.read.table("src").collect()""")) {
command =>
assume(PythonTestDepsChecker.isConnectDepsAvailable)
val ex = intercept[RuntimeException] {
buildGraph(s"""
|@dp.materialized_view
Expand All @@ -549,7 +550,6 @@ class PythonPipelineSuite
}

test("create dataset with the same name will fail") {
assume(PythonTestDepsChecker.isConnectDepsAvailable)
val ex = intercept[AnalysisException] {
buildGraph(s"""
|@dp.materialized_view
Expand Down Expand Up @@ -623,7 +623,6 @@ class PythonPipelineSuite
}

test("create datasets with three part names") {
assume(PythonTestDepsChecker.isConnectDepsAvailable)
val graphTry = Try {
buildGraph(s"""
|@dp.table(name = "some_catalog.some_schema.mv")
Expand Down Expand Up @@ -676,7 +675,6 @@ class PythonPipelineSuite
}

test("create named flow with multipart name will fail") {
assume(PythonTestDepsChecker.isConnectDepsAvailable)
val ex = intercept[RuntimeException] {
buildGraph(s"""
|@dp.table
Expand Down Expand Up @@ -825,7 +823,6 @@ class PythonPipelineSuite
}

test("create pipeline without table will throw RUN_EMPTY_PIPELINE exception") {
assume(PythonTestDepsChecker.isConnectDepsAvailable)
checkError(
exception = intercept[AnalysisException] {
buildGraph(s"""
Expand All @@ -837,7 +834,6 @@ class PythonPipelineSuite
}

test("create pipeline with only temp view will throw RUN_EMPTY_PIPELINE exception") {
assume(PythonTestDepsChecker.isConnectDepsAvailable)
checkError(
exception = intercept[AnalysisException] {
buildGraph(s"""
Expand All @@ -851,7 +847,6 @@ class PythonPipelineSuite
}

test("create pipeline with only flow will throw RUN_EMPTY_PIPELINE exception") {
assume(PythonTestDepsChecker.isConnectDepsAvailable)
checkError(
exception = intercept[AnalysisException] {
buildGraph(s"""
Expand Down Expand Up @@ -1048,7 +1043,6 @@ class PythonPipelineSuite

gridTest("Unsupported SQL command outside query function should result in a failure")(
unsupportedSqlCommandList) { unsupportedSqlCommand =>
assume(PythonTestDepsChecker.isConnectDepsAvailable)
val ex = intercept[RuntimeException] {
buildGraph(s"""
|spark.sql("$unsupportedSqlCommand")
Expand All @@ -1063,7 +1057,6 @@ class PythonPipelineSuite

gridTest("Unsupported SQL command inside query function should result in a failure")(
unsupportedSqlCommandList) { unsupportedSqlCommand =>
assume(PythonTestDepsChecker.isConnectDepsAvailable)
val ex = intercept[RuntimeException] {
buildGraph(s"""
|@dp.materialized_view()
Expand Down Expand Up @@ -1111,4 +1104,13 @@ class PythonPipelineSuite
| return spark.range(5)
|""".stripMargin)
}

override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
if (PythonTestDepsChecker.isConnectDepsAvailable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would using assume here work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me have a try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried the following modification methods:

  override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
      pos: Position): Unit = {
    assume(PythonTestDepsChecker.isConnectDepsAvailable)
    super.test(testName, testTags: _*)(testFun)
  }

then run build/sbt clean "connect/testOnly org.apache.spark.sql.connect.pipelines.PythonPipelineSuite" but failed:

[info] PythonPipelineSuite:
[info] org.apache.spark.sql.connect.pipelines.PythonPipelineSuite *** ABORTED *** (9 milliseconds)
[info]   org.apache.spark.sql.connect.PythonTestDepsChecker.isConnectDepsAvailable was false (PythonPipelineSuite.scala:121)
[info]   org.scalatest.exceptions.TestCanceledException:
[info]   at org.scalatest.Assertions.newTestCanceledException(Assertions.scala:475)
[info]   at org.scalatest.Assertions.newTestCanceledException$(Assertions.scala:474)
[info]   at org.scalatest.Assertions$.newTestCanceledException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssume(Assertions.scala:1310)
[info]   at org.apache.spark.sql.connect.pipelines.PythonPipelineSuite.test(PythonPipelineSuite.scala:1110)
[info]   at org.apache.spark.sql.connect.pipelines.PythonPipelineSuite.<init>(PythonPipelineSuite.scala:121)
[info]   at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
[info]   at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
[info]   at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
[info]   at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
[info]   at java.base/java.lang.reflect.ReflectAccess.newInstance(ReflectAccess.java:128)
[info]   at java.base/jdk.internal.reflect.ReflectionFactory.newInstance(ReflectionFactory.java:347)
[info]   at java.base/java.lang.Class.newInstance(Class.java:645)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:454)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)
[info] Run completed in 1 second, 500 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 1
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] *** 1 SUITE ABORTED ***
[error] Error during tests:
[error] 	org.apache.spark.sql.connect.pipelines.PythonPipelineSuite
[error] (connect / Test / testOnly) sbt.TestsFailedException: Tests unsuccessful

It seems that assume cannot be used directly. @sryza

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a better way of this? @sryza

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm when I tried this it succeeded (and actually ran the tests). Is there something you need to do to make the connect deps not available?

Copy link
Contributor Author

@LuciferYang LuciferYang Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you verified the scenario where at least one pyconnect dependency is missing to ensure that PythonTestDepsChecker.isConnectDepsAvailable returns false? In this scenario, all the tests in this file should be skipped rather than executed. However, if we switch to the pattern you proposed, an exception will be thrown in my test environment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WARNING: Using incubator modules: jdk.incubator.vector
Traceback (most recent call last):
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/connect/utils.py", line 105, in require_minimum_zstandard_version
    import zstandard  # noqaOnly 3s
    ^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'zstandard'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/connect/utils.py", line 40, in check_dependencies
    require_minimum_zstandard_version()
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/connect/utils.py", line 107, in require_minimum_zstandard_version
    raise PySparkImportError(
pyspark.errors.exceptions.base.PySparkImportError: [PACKAGE_NOT_INSTALLED] zstandard >= 0.25.0 must be installed; however, it was not found.
[info] PythonPipelineSuite:
[info] org.apache.spark.sql.connect.pipelines.PythonPipelineSuite *** ABORTED *** (8 milliseconds)
[info]   org.apache.spark.sql.connect.PythonTestDepsChecker.isConnectDepsAvailable was false (PythonPipelineSuite.scala:121)
[info]   org.scalatest.exceptions.TestCanceledException:
[info]   at org.scalatest.Assertions.newTestCanceledException(Assertions.scala:475)
[info]   at org.scalatest.Assertions.newTestCanceledException$(Assertions.scala:474)
[info]   at org.scalatest.Assertions$.newTestCanceledException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssume(Assertions.scala:1310)
[info]   at org.apache.spark.sql.connect.pipelines.PythonPipelineSuite.test(PythonPipelineSuite.scala:1110)
[info]   at org.apache.spark.sql.connect.pipelines.PythonPipelineSuite.<init>(PythonPipelineSuite.scala:121)
[info]   at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
[info]   at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
[info]   at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
[info]   at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
[info]   at java.base/java.lang.reflect.ReflectAccess.newInstance(ReflectAccess.java:128)
[info]   at java.base/jdk.internal.reflect.ReflectionFactory.newInstance(ReflectionFactory.java:347)
[info]   at java.base/java.lang.Class.newInstance(Class.java:645)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:454)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)
[info] Run completed in 4 seconds, 395 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 1
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] *** 1 SUITE ABORTED ***
[error] Error during tests:
[error] 	org.apache.spark.sql.connect.pipelines.PythonPipelineSuite
[error] (connect / Test / testOnly) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 205 s (0:03:25.0), completed 2025年11月19日 上午8:25:50
yangjie01@localhost spark-mine-sbt % git diff
diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
index 98b33c3296f..9d9aee4e994 100644
--- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
+++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
@@ -1107,10 +1107,7 @@ class PythonPipelineSuite
 
   override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
       pos: Position): Unit = {
-    if (PythonTestDepsChecker.isConnectDepsAvailable) {
-      super.test(testName, testTags: _*)(testFun)
-    } else {
-      super.ignore(testName, testTags: _*)(testFun)
-    }
+    assume(PythonTestDepsChecker.isConnectDepsAvailable)
+    super.test(testName, testTags: _*)(testFun)
   }
 }
yangjie01@localhost spark-mine-sbt % build/sbt clean "connect/testOnly org.apache.spark.sql.connect.pipelines.PythonPipelineSuite"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this pr, when there is a missing pyconnect dependency:

WARNING: Using incubator modules: jdk.incubator.vector
Traceback (most recent call last):
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/connect/utils.py", line 105, in require_minimum_zstandard_version
    import zstandard  # noqaOnly 3s
    ^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'zstandard'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/connect/utils.py", line 40, in check_dependencies
    require_minimum_zstandard_version()
  File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/connect/utils.py", line 107, in require_minimum_zstandard_version
    raise PySparkImportError(
pyspark.errors.exceptions.base.PySparkImportError: [PACKAGE_NOT_INSTALLED] zstandard >= 0.25.0 must be installed; however, it was not found.
[info] PythonPipelineSuite:
[info] - basic !!! IGNORED !!!
[info] - failed flow progress event has correct python source code location !!! IGNORED !!!
[info] - flow progress events have correct python source code location !!! IGNORED !!!
[info] - basic with inverted topological order !!! IGNORED !!!
[info] - flows !!! IGNORED !!!
[info] - external sink !!! IGNORED !!!
[info] - referencing internal datasets !!! IGNORED !!!
[info] - referencing external datasets !!! IGNORED !!!
[info] - referencing internal datasets failed !!! IGNORED !!!
[info] - referencing external datasets failed !!! IGNORED !!!
[info] - reading external datasets outside query function works !!! IGNORED !!!
[info] - reading internal datasets outside query function that don't trigger eager analysis or execution !!! IGNORED !!!
[info] - reading internal datasets outside query function that trigger eager analysis or execution will fail (spark.sql("SELECT * FROM src")) !!! IGNORED !!!
[info] - reading internal datasets outside query function that trigger eager analysis or execution will fail (spark.read.table("src").collect()) !!! IGNORED !!!
[info] - create dataset with the same name will fail !!! IGNORED !!!
[info] - create datasets with fully/partially qualified names !!! IGNORED !!!
[info] - create datasets with three part names !!! IGNORED !!!
[info] - temporary views works !!! IGNORED !!!
[info] - create named flow with multipart name will fail !!! IGNORED !!!
[info] - create flow with multipart target and no explicit name succeeds !!! IGNORED !!!
[info] - create named flow with multipart target succeeds !!! IGNORED !!!
[info] - groupby and rollup works with internal datasets, referencing with (col, str) !!! IGNORED !!!
[info] - MV/ST with partition columns works !!! IGNORED !!!
[info] - create pipeline without table will throw RUN_EMPTY_PIPELINE exception !!! IGNORED !!!
[info] - create pipeline with only temp view will throw RUN_EMPTY_PIPELINE exception !!! IGNORED !!!
[info] - create pipeline with only flow will throw RUN_EMPTY_PIPELINE exception !!! IGNORED !!!
[info] - table with string schema !!! IGNORED !!!
[info] - table with StructType schema !!! IGNORED !!!
[info] - string schema validation error - schema mismatch !!! IGNORED !!!
[info] - StructType schema validation error - schema mismatch !!! IGNORED !!!
[info] - empty cluster_by list should work and create table with no clustering !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (SET CATALOG some_catalog) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (USE SCHEMA some_schema) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (SET `test_conf` = `true`) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (CREATE TABLE some_table (id INT)) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (CREATE VIEW some_view AS SELECT * FROM some_table) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (INSERT INTO some_table VALUES (1)) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (ALTER TABLE some_table RENAME TO some_new_table) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (CREATE NAMESPACE some_namespace) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (DROP VIEW some_view) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (CREATE MATERIALIZED VIEW some_view AS SELECT * FROM some_table) !!! IGNORED !!!
[info] - Unsupported SQL command outside query function should result in a failure (CREATE STREAMING TABLE some_table AS SELECT * FROM some_table) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (SET CATALOG some_catalog) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (USE SCHEMA some_schema) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (SET `test_conf` = `true`) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (CREATE TABLE some_table (id INT)) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (CREATE VIEW some_view AS SELECT * FROM some_table) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (INSERT INTO some_table VALUES (1)) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (ALTER TABLE some_table RENAME TO some_new_table) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (CREATE NAMESPACE some_namespace) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (DROP VIEW some_view) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (CREATE MATERIALIZED VIEW some_view AS SELECT * FROM some_table) !!! IGNORED !!!
[info] - Unsupported SQL command inside query function should result in a failure (CREATE STREAMING TABLE some_table AS SELECT * FROM some_table) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (DESCRIBE TABLE spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW TABLES) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW TBLPROPERTIES spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW NAMESPACES) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW COLUMNS FROM spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW FUNCTIONS) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW VIEWS) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW CATALOGS) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SHOW CREATE TABLE spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SELECT * FROM RANGE(5)) !!! IGNORED !!!
[info] - Supported SQL command outside query function should work (SELECT * FROM spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (DESCRIBE TABLE spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW TABLES) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW TBLPROPERTIES spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW NAMESPACES) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW COLUMNS FROM spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW FUNCTIONS) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW VIEWS) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW CATALOGS) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SHOW CREATE TABLE spark_catalog.default.src) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SELECT * FROM RANGE(5)) !!! IGNORED !!!
[info] - Supported SQL command inside query function should work (SELECT * FROM spark_catalog.default.src) !!! IGNORED !!!
[info] Run completed in 3 seconds, 595 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 75, pending 0
[info] No tests were executed.
[success] Total time: 190 s (0:03:10.0), completed 2025年11月19日 上午8:40:22
yangjie01@localhost spark-mine-sbt % build/sbt clean "connect/testOnly org.apache.spark.sql.connect.pipelines.PythonPipelineSuite"

super.test(testName, testTags: _*)(testFun)
} else {
super.ignore(testName, testTags: _*)(testFun)
}
}
}