From a5260459e3d32b4b8233eb9e46c715b612475498 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 18 Nov 2025 11:16:32 +0800 Subject: [PATCH 1/2] test --- .../pipelines/PythonPipelineSuite.scala | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 826e2338589d..74dc9d560e6e 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -26,6 +26,9 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable.ArrayBuffer import scala.util.Try +import org.scalactic.source.Position +import org.scalatest.Tag + import org.apache.spark.api.python.PythonUtils import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier @@ -39,6 +42,7 @@ import org.apache.spark.sql.pipelines.logging.EventLevel import org.apache.spark.sql.pipelines.utils.{EventVerificationTestHelpers, TestPipelineUpdateContextMixin} import org.apache.spark.sql.types.StructType + /** * Test suite that starts a Spark Connect server and executes Spark Declarative Pipelines Python * code to define tables in the pipeline. @@ -49,7 +53,6 @@ class PythonPipelineSuite with EventVerificationTestHelpers { def buildGraph(pythonText: String): DataflowGraph = { - assume(PythonTestDepsChecker.isConnectDepsAvailable) val indentedPythonText = pythonText.linesIterator.map(" " + _).mkString("\n") // create a unique identifier to allow identifying the session and dataflow graph val customSessionIdentifier = UUID.randomUUID().toString @@ -530,7 +533,6 @@ class PythonPipelineSuite "eager analysis or execution will fail")( Seq("""spark.sql("SELECT * FROM src")""", """spark.read.table("src").collect()""")) { command => - assume(PythonTestDepsChecker.isConnectDepsAvailable) val ex = intercept[RuntimeException] { buildGraph(s""" |@dp.materialized_view @@ -549,7 +551,6 @@ class PythonPipelineSuite } test("create dataset with the same name will fail") { - assume(PythonTestDepsChecker.isConnectDepsAvailable) val ex = intercept[AnalysisException] { buildGraph(s""" |@dp.materialized_view @@ -623,7 +624,6 @@ class PythonPipelineSuite } test("create datasets with three part names") { - assume(PythonTestDepsChecker.isConnectDepsAvailable) val graphTry = Try { buildGraph(s""" |@dp.table(name = "some_catalog.some_schema.mv") @@ -676,7 +676,6 @@ class PythonPipelineSuite } test("create named flow with multipart name will fail") { - assume(PythonTestDepsChecker.isConnectDepsAvailable) val ex = intercept[RuntimeException] { buildGraph(s""" |@dp.table @@ -825,7 +824,6 @@ class PythonPipelineSuite } test("create pipeline without table will throw RUN_EMPTY_PIPELINE exception") { - assume(PythonTestDepsChecker.isConnectDepsAvailable) checkError( exception = intercept[AnalysisException] { buildGraph(s""" @@ -837,7 +835,6 @@ class PythonPipelineSuite } test("create pipeline with only temp view will throw RUN_EMPTY_PIPELINE exception") { - assume(PythonTestDepsChecker.isConnectDepsAvailable) checkError( exception = intercept[AnalysisException] { buildGraph(s""" @@ -851,7 +848,6 @@ class PythonPipelineSuite } test("create pipeline with only flow will throw RUN_EMPTY_PIPELINE exception") { - assume(PythonTestDepsChecker.isConnectDepsAvailable) checkError( exception = intercept[AnalysisException] { buildGraph(s""" @@ -1048,7 +1044,6 @@ class PythonPipelineSuite gridTest("Unsupported SQL command outside query function should result in a failure")( unsupportedSqlCommandList) { unsupportedSqlCommand => - assume(PythonTestDepsChecker.isConnectDepsAvailable) val ex = intercept[RuntimeException] { buildGraph(s""" |spark.sql("$unsupportedSqlCommand") @@ -1063,7 +1058,6 @@ class PythonPipelineSuite gridTest("Unsupported SQL command inside query function should result in a failure")( unsupportedSqlCommandList) { unsupportedSqlCommand => - assume(PythonTestDepsChecker.isConnectDepsAvailable) val ex = intercept[RuntimeException] { buildGraph(s""" |@dp.materialized_view() @@ -1111,4 +1105,13 @@ class PythonPipelineSuite | return spark.range(5) |""".stripMargin) } + + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + if (PythonTestDepsChecker.isConnectDepsAvailable) { + super.test(testName, testTags: _*)(testFun) + } else { + super.ignore(testName, testTags: _*)(testFun) + } + } } From 02a8e74b579ba98ae273104025f0b34f336069b4 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 18 Nov 2025 11:19:14 +0800 Subject: [PATCH 2/2] format --- .../apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 74dc9d560e6e..98b33c3296fa 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -42,7 +42,6 @@ import org.apache.spark.sql.pipelines.logging.EventLevel import org.apache.spark.sql.pipelines.utils.{EventVerificationTestHelpers, TestPipelineUpdateContextMixin} import org.apache.spark.sql.types.StructType - /** * Test suite that starts a Spark Connect server and executes Spark Declarative Pipelines Python * code to define tables in the pipeline.