From 598f67feb7c2e37ae3831a2bca04386c63bdfc82 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 26 May 2026 17:35:12 +0000 Subject: [PATCH 1/9] register autocdc flow in pipelineshandler --- .../connect/pipelines/PipelinesHandler.scala | 123 +++++++- .../connect/planner/SparkConnectPlanner.scala | 3 +- .../pipelines/PythonPipelineSuite.scala | 284 +++++++++++++++++- 3 files changed, 403 insertions(+), 7 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index 04dbc1a455063..27579a3606bdf 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -26,15 +26,19 @@ import io.grpc.stub.StreamObserver import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{ExecutePlanResponse, PipelineCommandResult, Relation, ResolvedIdentifier} import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, Column} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.{Command, CreateNamespace, CreateTable, CreateTableAsSelect, CreateView, DescribeRelation, DescribeTablePartition, DropView, InsertIntoStatement, LogicalPlan, RenameTable, ShowColumns, ShowCreateTable, ShowFunctions, ShowTableProperties, ShowTables, ShowViews} +import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.connect.common.DataTypeProtoConverter import org.apache.spark.sql.connect.service.SessionHolder import org.apache.spark.sql.execution.command.{ShowCatalogsCommand, ShowNamespacesCommand} import org.apache.spark.sql.pipelines.Language.Python +import org.apache.spark.sql.pipelines.autocdc.{ChangeArgs, ColumnSelection, ScdType, UnqualifiedColumnName} import org.apache.spark.sql.pipelines.common.RunState.{CANCELED, FAILED} -import org.apache.spark.sql.pipelines.graph.{AllTables, FlowAnalysis, GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables, PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter, TemporaryView, UntypedFlow} +import org.apache.spark.sql.pipelines.graph.{AllTables, AutoCdcFlow, FlowAnalysis, GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables, PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter, TemporaryView, UntypedFlow} import org.apache.spark.sql.pipelines.logging.{PipelineEvent, RunProgress} import org.apache.spark.sql.types.StructType @@ -52,6 +56,10 @@ private[connect] object PipelinesHandler extends Logging { * @param transformRelationFunc * Function used to convert a relation to a LogicalPlan. This is used when determining the * LogicalPlan that a flow returns. + * @param transformExpressionFunc + * Function used to convert a proto expression to a Catalyst expression. Used for typed flows + * (e.g. AutoCDC) whose definition includes expression-shaped fields such as keys, + * sequence-by, and delete predicates. * @return * The response after handling the command */ @@ -59,7 +67,8 @@ private[connect] object PipelinesHandler extends Logging { sessionHolder: SessionHolder, cmd: proto.PipelineCommand, responseObserver: StreamObserver[ExecutePlanResponse], - transformRelationFunc: Relation => LogicalPlan): PipelineCommandResult = { + transformRelationFunc: Relation => LogicalPlan, + transformExpressionFunc: proto.Expression => Expression): PipelineCommandResult = { // Currently most commands do not include any information in the response. We just send back // an empty response to the client to indicate that the command was handled successfully val defaultResponse = PipelineCommandResult.getDefaultInstance @@ -99,7 +108,11 @@ private[connect] object PipelinesHandler extends Logging { case proto.PipelineCommand.CommandTypeCase.DEFINE_FLOW => logInfo(s"Define pipelines flow cmd received: $cmd") val resolvedFlow = - defineFlow(cmd.getDefineFlow, transformRelationFunc, sessionHolder) + defineFlow( + cmd.getDefineFlow, + transformRelationFunc, + transformExpressionFunc, + sessionHolder) val identifierBuilder = ResolvedIdentifier.newBuilder() resolvedFlow.catalog.foreach(identifierBuilder.setCatalogName) resolvedFlow.database.foreach { ns => @@ -315,6 +328,7 @@ private[connect] object PipelinesHandler extends Logging { private def defineFlow( flow: proto.PipelineCommand.DefineFlow, transformRelationFunc: Relation => LogicalPlan, + transformExpressionFunc: proto.Expression => Expression, sessionHolder: SessionHolder): TableIdentifier = { if (flow.hasOnce) { throw new AnalysisException( @@ -388,13 +402,112 @@ private[connect] object PipelinesHandler extends Logging { objectName = Option(flowIdentifier.unquotedString), language = Some(Python())))) case proto.PipelineCommand.DefineFlow.DetailsCase.AUTO_CDC_FLOW_DETAILS => - throw new UnsupportedOperationException("AutoCdcFlowDetails is not yet implemented.") + val autoCdcDetails = flow.getAutoCdcFlowDetails + graphElementRegistry.registerFlow( + buildAutoCdcFlow( + autoCdcDetails = autoCdcDetails, + flow = flow, + flowIdentifier = flowIdentifier, + destinationIdentifier = destinationIdentifier, + defaultCatalog = defaultCatalog, + defaultDatabase = defaultDatabase, + transformExpressionFunc = transformExpressionFunc)) case other => throw new UnsupportedOperationException(s"Unsupported DefineFlow details case: $other") } flowIdentifier } + /** + * Build an [[AutoCdcFlow]] from the proto-supplied AutoCDC flow details. + * + * The flow's source expression is encoded by the Python client as a streaming-table name; we + * model that on the server side as a streaming [[UnresolvedRelation]] so that pipelines flow + * analysis (which already handles `STREAM(t)` references) can resolve it against the rest of + * the dataflow graph. + */ + private def buildAutoCdcFlow( + autoCdcDetails: proto.PipelineCommand.DefineFlow.AutoCdcFlowDetails, + flow: proto.PipelineCommand.DefineFlow, + flowIdentifier: TableIdentifier, + destinationIdentifier: TableIdentifier, + defaultCatalog: String, + defaultDatabase: String, + transformExpressionFunc: proto.Expression => Expression): AutoCdcFlow = { + val sourcePlan: LogicalPlan = UnresolvedRelation( + multipartIdentifier = scala.collection.immutable.Seq(autoCdcDetails.getSource), + isStreaming = true + ) + + val toColumn: proto.Expression => Column = expr => Column(transformExpressionFunc(expr)) + + // Resolve a proto expression that the Python AutoCDC API treats as an unqualified column + // identifier (e.g. an entry in `keys`, `column_list`, or `except_column_list`) into an + // [[UnqualifiedColumnName]]. We round-trip through [[Expression.sql]] so that + // [[UnqualifiedColumnName.apply]]'s parser-based single-part validation is the authority on + // what is acceptable; non-identifier expressions are rejected by that parser. + val asUnqualifiedColumnName: proto.Expression => UnqualifiedColumnName = + expr => UnqualifiedColumnName(transformExpressionFunc(expr).sql) + + val keys = autoCdcDetails.getKeysList.asScala.toSeq.map(asUnqualifiedColumnName) + + val columnSelection: Option[ColumnSelection] = { + val included = autoCdcDetails.getColumnListList.asScala.toSeq + val excluded = autoCdcDetails.getExceptColumnListList.asScala.toSeq + if (included.nonEmpty && excluded.nonEmpty) { + // The Python API enforces the "at most one" contract; we don't expect both to be set + // here, but reject defensively to surface client/server mismatches loudly. + throw new IllegalArgumentException( + "AutoCDC flow specifies both column_list and except_column_list; at most one " + + "may be provided.") + } else if (included.nonEmpty) { + Some(ColumnSelection.IncludeColumns(included.map(asUnqualifiedColumnName))) + } else if (excluded.nonEmpty) { + Some(ColumnSelection.ExcludeColumns(excluded.map(asUnqualifiedColumnName))) + } else { + None + } + } + + // Get user specified SCD type, or default to SCD1 if unspecified. + val scdType: ScdType = autoCdcDetails.getStoredAsScdType match { + case proto.PipelineCommand.DefineFlow.SCDType.SCD_TYPE_1 | + proto.PipelineCommand.DefineFlow.SCDType.SCD_TYPE_UNSPECIFIED => + ScdType.Type1 + case other => + throw new UnsupportedOperationException(s"Unsupported AutoCDC SCD type: $other") + } + + val changeArgs = ChangeArgs( + keys = keys, + sequencing = toColumn(autoCdcDetails.getSequenceBy), + storedAsScdType = scdType, + deleteCondition = + Option.when(autoCdcDetails.hasApplyAsDeletes)( + toColumn(autoCdcDetails.getApplyAsDeletes) + ), + columnSelection = columnSelection + ) + + AutoCdcFlow( + identifier = flowIdentifier, + destinationIdentifier = destinationIdentifier, + func = FlowAnalysis.createFlowFunctionFromLogicalPlan(sourcePlan), + sqlConf = flow.getSqlConfMap.asScala.toMap, + queryContext = QueryContext(Option(defaultCatalog), Option(defaultDatabase)), + origin = QueryOrigin( + filePath = Option.when(flow.getSourceCodeLocation.hasFileName)( + flow.getSourceCodeLocation.getFileName), + line = Option.when(flow.getSourceCodeLocation.hasLineNumber)( + flow.getSourceCodeLocation.getLineNumber), + objectType = Some(QueryOriginType.Flow.toString), + objectName = Option(flowIdentifier.unquotedString), + language = Some(Python()) + ), + changeArgs = changeArgs + ) + } + private def startRun( cmd: proto.PipelineCommand.StartRun, responseObserver: StreamObserver[ExecutePlanResponse], diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index db78dc1744ec9..c84eaadaa4537 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2955,7 +2955,8 @@ class SparkConnectPlanner( sessionHolder, command, responseObserver, - transformRelation) + transformRelation, + transformExpression) executeHolder.eventsManager.postFinished() responseObserver.onNext( proto.ExecutePlanResponse diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index fd05b0cc357eb..60044260a7f02 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -32,12 +32,14 @@ import org.scalatest.Tag import org.apache.spark.api.python.PythonUtils import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.classic.ColumnConversions._ import org.apache.spark.sql.connect.PythonTestDepsChecker import org.apache.spark.sql.connect.service.SparkConnectService import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.pipelines.Language.Python +import org.apache.spark.sql.pipelines.autocdc.{ColumnSelection, ScdType, UnqualifiedColumnName} import org.apache.spark.sql.pipelines.common.FlowStatus -import org.apache.spark.sql.pipelines.graph.{DataflowGraph, PipelineUpdateContextImpl, QueryOrigin, QueryOriginType} +import org.apache.spark.sql.pipelines.graph.{AutoCdcFlow, DataflowGraph, PipelineUpdateContextImpl, QueryOrigin, QueryOriginType} import org.apache.spark.sql.pipelines.logging.EventLevel import org.apache.spark.sql.pipelines.utils.{EventVerificationTestHelpers, TestPipelineUpdateContextMixin} import org.apache.spark.sql.types.StructType @@ -935,6 +937,286 @@ class PythonPipelineSuite assert(ex.getMessage.contains("table_with_wrong_struct_schema")) } + private def buildAutoCdcFlow(pipelineSource: String): AutoCdcFlow = { + val graph = buildGraph(pipelineSource) + graph.flows + .collectFirst { case f: AutoCdcFlow => f } + .getOrElse( + throw new AssertionError( + s"Expected an AutoCdcFlow in the graph, got: ${graph.flows}")) + } + + test("AutoCDC API: minimal flow registers an AutoCdcFlow with default name and SCD1 default") { + val flow = buildAutoCdcFlow( + """ + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = ["value"], + | sequence_by = "timestamp", + |) + |""".stripMargin) + + assert(flow.identifier == graphIdentifier("target")) + assert(flow.destinationIdentifier == graphIdentifier("target")) + assert(flow.changeArgs.keys == Seq(UnqualifiedColumnName("value"))) + assert(flow.changeArgs.sequencing.expr.sql == "timestamp") + assert(flow.changeArgs.deleteCondition.isEmpty) + assert(flow.changeArgs.columnSelection.isEmpty) + assert(flow.changeArgs.storedAsScdType == ScdType.Type1) + } + + test("AutoCDC API: composite keys are forwarded to ChangeArgs in order") { + val flow = buildAutoCdcFlow( + """ + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = ["value", "timestamp"], + | sequence_by = "timestamp", + |) + |""".stripMargin) + + assert(flow.changeArgs.keys == + Seq(UnqualifiedColumnName("value"), UnqualifiedColumnName("timestamp"))) + } + + test("AutoCDC API: apply_as_deletes is forwarded as a delete condition column") { + val flow = buildAutoCdcFlow( + """ + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = ["value"], + | sequence_by = "timestamp", + | apply_as_deletes = "value % 2 = 0", + |) + |""".stripMargin) + + val deleteCondition = flow.changeArgs.deleteCondition.getOrElse( + throw new AssertionError("expected apply_as_deletes to populate deleteCondition")) + assert(deleteCondition.expr.sql.contains("value")) + assert(deleteCondition.expr.sql.contains("0")) + } + + test("AutoCDC API: column_list is forwarded as IncludeColumns") { + val flow = buildAutoCdcFlow( + """ + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = ["value"], + | sequence_by = "timestamp", + | column_list = ["value", "timestamp"], + |) + |""".stripMargin) + + assert(flow.changeArgs.columnSelection.contains( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("value"), UnqualifiedColumnName("timestamp"))))) + } + + test("AutoCDC API: except_column_list is forwarded as ExcludeColumns") { + val flow = buildAutoCdcFlow( + """ + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = ["value"], + | sequence_by = "timestamp", + | except_column_list = ["timestamp"], + |) + |""".stripMargin) + + assert(flow.changeArgs.columnSelection.contains( + ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("timestamp"))))) + } + + test("AutoCDC API: explicit `name` is honored as the flow identifier") { + val flow = buildAutoCdcFlow( + """ + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = ["value"], + | sequence_by = "timestamp", + | name = "my_flow", + |) + |""".stripMargin) + + assert(flow.identifier == graphIdentifier("my_flow")) + assert(flow.destinationIdentifier == graphIdentifier("target")) + } + + test("AutoCDC API: omitted stored_as_scd_type defaults to ScdType.Type1") { + val flow = buildAutoCdcFlow( + """ + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = ["value"], + | sequence_by = "timestamp", + |) + |""".stripMargin) + + assert(flow.changeArgs.storedAsScdType == ScdType.Type1) + } + + test("AutoCDC API: explicit stored_as_scd_type=1 is forwarded as ScdType.Type1") { + val flow = buildAutoCdcFlow( + """ + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = ["value"], + | sequence_by = "timestamp", + | stored_as_scd_type = 1, + |) + |""".stripMargin) + + assert(flow.changeArgs.storedAsScdType == ScdType.Type1) + } + + test("AutoCDC API: multi-part `keys` column is rejected at flow registration") { + val ex = intercept[RuntimeException] { + buildAutoCdcFlow( + """ + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = ["a.b"], + | sequence_by = "timestamp", + |) + |""".stripMargin) + } + assert(ex.getMessage.contains("AUTOCDC_MULTIPART_COLUMN_IDENTIFIER")) + } + + test("AutoCDC API: multi-part `column_list` entry is rejected at flow registration") { + val ex = intercept[RuntimeException] { + buildAutoCdcFlow( + """ + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = ["value"], + | sequence_by = "timestamp", + | column_list = ["nested.field"], + |) + |""".stripMargin) + } + assert(ex.getMessage.contains("AUTOCDC_MULTIPART_COLUMN_IDENTIFIER")) + } + + test("AutoCDC API: Column-object form of keys/sequence_by/apply_as_deletes is honored") { + val flow = buildAutoCdcFlow( + """ + |from pyspark.sql.functions import col, expr + | + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = [col("value")], + | sequence_by = col("timestamp"), + | apply_as_deletes = expr("value % 2 = 0"), + |) + |""".stripMargin) + + assert(flow.changeArgs.keys == Seq(UnqualifiedColumnName("value"))) + assert(flow.changeArgs.sequencing.expr.sql == "timestamp") + val deleteCondition = flow.changeArgs.deleteCondition.getOrElse( + throw new AssertionError("expected apply_as_deletes to populate deleteCondition")) + assert(deleteCondition.expr.sql.contains("value")) + assert(deleteCondition.expr.sql.contains("0")) + } + + test("AutoCDC API: graph resolves with the source streaming table as the flow's input") { + val graph = buildGraph( + """ + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = ["value"], + | sequence_by = "timestamp", + |) + |""".stripMargin).resolve() + + val resolvedFlow = graph.resolvedFlow(graphIdentifier("target")) + assert(resolvedFlow.inputs == Set(graphIdentifier("src"))) + } + /** * Executes Python code in a separate process and returns the exit code. * From 0023022372e82b888e78d119fe1f4f675f97cb9e Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 26 May 2026 17:43:44 +0000 Subject: [PATCH 2/9] cleanup scaladocs --- .../spark/sql/connect/pipelines/PipelinesHandler.scala | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index 27579a3606bdf..cd19e71557f42 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -57,9 +57,7 @@ private[connect] object PipelinesHandler extends Logging { * Function used to convert a relation to a LogicalPlan. This is used when determining the * LogicalPlan that a flow returns. * @param transformExpressionFunc - * Function used to convert a proto expression to a Catalyst expression. Used for typed flows - * (e.g. AutoCDC) whose definition includes expression-shaped fields such as keys, - * sequence-by, and delete predicates. + * Function used to convert a proto expression to a Catalyst expression. * @return * The response after handling the command */ @@ -441,11 +439,6 @@ private[connect] object PipelinesHandler extends Logging { val toColumn: proto.Expression => Column = expr => Column(transformExpressionFunc(expr)) - // Resolve a proto expression that the Python AutoCDC API treats as an unqualified column - // identifier (e.g. an entry in `keys`, `column_list`, or `except_column_list`) into an - // [[UnqualifiedColumnName]]. We round-trip through [[Expression.sql]] so that - // [[UnqualifiedColumnName.apply]]'s parser-based single-part validation is the authority on - // what is acceptable; non-identifier expressions are rejected by that parser. val asUnqualifiedColumnName: proto.Expression => UnqualifiedColumnName = expr => UnqualifiedColumnName(transformExpressionFunc(expr).sql) From ff3459b3c88ac695b290e945ff6fdc4084235706 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Tue, 26 May 2026 22:03:53 +0000 Subject: [PATCH 3/9] linting --- .../spark/sql/connect/pipelines/PythonPipelineSuite.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 60044260a7f02..6e6271b30eaff 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -941,9 +941,7 @@ class PythonPipelineSuite val graph = buildGraph(pipelineSource) graph.flows .collectFirst { case f: AutoCdcFlow => f } - .getOrElse( - throw new AssertionError( - s"Expected an AutoCdcFlow in the graph, got: ${graph.flows}")) + .getOrElse(fail(s"Expected an AutoCdcFlow in the graph, got: ${graph.flows}")) } test("AutoCDC API: minimal flow registers an AutoCdcFlow with default name and SCD1 default") { @@ -1012,7 +1010,7 @@ class PythonPipelineSuite |""".stripMargin) val deleteCondition = flow.changeArgs.deleteCondition.getOrElse( - throw new AssertionError("expected apply_as_deletes to populate deleteCondition")) + fail("expected apply_as_deletes to populate deleteCondition")) assert(deleteCondition.expr.sql.contains("value")) assert(deleteCondition.expr.sql.contains("0")) } @@ -1191,7 +1189,7 @@ class PythonPipelineSuite assert(flow.changeArgs.keys == Seq(UnqualifiedColumnName("value"))) assert(flow.changeArgs.sequencing.expr.sql == "timestamp") val deleteCondition = flow.changeArgs.deleteCondition.getOrElse( - throw new AssertionError("expected apply_as_deletes to populate deleteCondition")) + fail("expected apply_as_deletes to populate deleteCondition")) assert(deleteCondition.expr.sql.contains("value")) assert(deleteCondition.expr.sql.contains("0")) } From d517ddff69d43ca54fbd47c86def46cfaaab0012 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Wed, 27 May 2026 00:36:27 +0000 Subject: [PATCH 4/9] PR feedback --- .../resources/error/error-conditions.json | 12 ++ .../connect/pipelines/PipelinesHandler.scala | 42 +++-- .../pipelines/PythonPipelineSuite.scala | 167 +++++++++++++++--- .../sql/pipelines/autocdc/ChangeArgs.scala | 8 +- 4 files changed, 189 insertions(+), 40 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 9c9a657bc6e9f..1b0770e6c0b9f 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -191,6 +191,12 @@ ], "sqlState" : "0A000" }, + "AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST" : { + "message" : [ + "AutoCDC flow specifies both `column_list` and `except_column_list`; at most one may be provided." + ], + "sqlState" : "42613" + }, "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA" : { "message" : [ "Using column name comparison, the following columns are not present in the schema: . Available columns: ." @@ -244,6 +250,12 @@ ], "sqlState" : "42000" }, + "AUTOCDC_NON_COLUMN_IDENTIFIER" : { + "message" : [ + "Expected a column identifier; got the non-attribute expression ``. AutoCDC keys, sequence_by, column_list, and except_column_list must reference unqualified column names." + ], + "sqlState" : "42703" + }, "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT" : { "message" : [ "The column `` in the schema collides with the reserved AutoCDC column name prefix `` (using column name comparison). Rename or remove the column." diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index cd19e71557f42..b03434ac9c6a7 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.connect.pipelines -import scala.collection.Seq import scala.jdk.CollectionConverters._ import scala.util.Using @@ -25,10 +24,11 @@ import io.grpc.stub.StreamObserver import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{ExecutePlanResponse, PipelineCommandResult, Relation, ResolvedIdentifier} +import org.apache.spark.connect.proto.PipelineCommand.DefineFlow.AutoCdcFlowDetails import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Column} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.{Command, CreateNamespace, CreateTable, CreateTableAsSelect, CreateView, DescribeRelation, DescribeTablePartition, DropView, InsertIntoStatement, LogicalPlan, RenameTable, ShowColumns, ShowCreateTable, ShowFunctions, ShowTableProperties, ShowTables, ShowViews} import org.apache.spark.sql.classic.ClassicConversions._ @@ -400,15 +400,15 @@ private[connect] object PipelinesHandler extends Logging { objectName = Option(flowIdentifier.unquotedString), language = Some(Python())))) case proto.PipelineCommand.DefineFlow.DetailsCase.AUTO_CDC_FLOW_DETAILS => - val autoCdcDetails = flow.getAutoCdcFlowDetails graphElementRegistry.registerFlow( buildAutoCdcFlow( - autoCdcDetails = autoCdcDetails, + autoCdcDetails = flow.getAutoCdcFlowDetails, flow = flow, flowIdentifier = flowIdentifier, destinationIdentifier = destinationIdentifier, defaultCatalog = defaultCatalog, defaultDatabase = defaultDatabase, + sessionHolder = sessionHolder, transformExpressionFunc = transformExpressionFunc)) case other => throw new UnsupportedOperationException(s"Unsupported DefineFlow details case: $other") @@ -425,22 +425,39 @@ private[connect] object PipelinesHandler extends Logging { * the dataflow graph. */ private def buildAutoCdcFlow( - autoCdcDetails: proto.PipelineCommand.DefineFlow.AutoCdcFlowDetails, + autoCdcDetails: AutoCdcFlowDetails, flow: proto.PipelineCommand.DefineFlow, flowIdentifier: TableIdentifier, destinationIdentifier: TableIdentifier, defaultCatalog: String, defaultDatabase: String, + sessionHolder: SessionHolder, transformExpressionFunc: proto.Expression => Expression): AutoCdcFlow = { + // TODO(SPARK-57092): apply_as_truncates is declared on AutoCdcFlowDetails but is not yet + // honored by the engine; wire it through once SCD1 truncate support lands. + // TODO(SPARK-57093): ignore_null_updates_column_list and ignore_null_updates_except_column_list + // are declared on AutoCdcFlowDetails but are not yet honored by the engine; wire them + // through once SCD1 ignore-null support lands. val sourcePlan: LogicalPlan = UnresolvedRelation( - multipartIdentifier = scala.collection.immutable.Seq(autoCdcDetails.getSource), + multipartIdentifier = GraphIdentifierManager + .parseTableIdentifier( + name = autoCdcDetails.getSource, + spark = sessionHolder.session + ).nameParts, isStreaming = true ) val toColumn: proto.Expression => Column = expr => Column(transformExpressionFunc(expr)) - val asUnqualifiedColumnName: proto.Expression => UnqualifiedColumnName = - expr => UnqualifiedColumnName(transformExpressionFunc(expr).sql) + val asUnqualifiedColumnName: proto.Expression => UnqualifiedColumnName = expr => + transformExpressionFunc(expr) match { + case a: UnresolvedAttribute => UnqualifiedColumnName(a.nameParts) + case other => + throw new AnalysisException( + "AUTOCDC_NON_COLUMN_IDENTIFIER", + Map("expression" -> other.sql) + ) + } val keys = autoCdcDetails.getKeysList.asScala.toSeq.map(asUnqualifiedColumnName) @@ -448,11 +465,10 @@ private[connect] object PipelinesHandler extends Logging { val included = autoCdcDetails.getColumnListList.asScala.toSeq val excluded = autoCdcDetails.getExceptColumnListList.asScala.toSeq if (included.nonEmpty && excluded.nonEmpty) { - // The Python API enforces the "at most one" contract; we don't expect both to be set - // here, but reject defensively to surface client/server mismatches loudly. - throw new IllegalArgumentException( - "AutoCDC flow specifies both column_list and except_column_list; at most one " + - "may be provided.") + throw new AnalysisException( + "AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST", + Map.empty + ) } else if (included.nonEmpty) { Some(ColumnSelection.IncludeColumns(included.map(asUnqualifiedColumnName))) } else if (excluded.nonEmpty) { diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 6e6271b30eaff..e1e5eef3a8370 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.pipelines.Language.Python import org.apache.spark.sql.pipelines.autocdc.{ColumnSelection, ScdType, UnqualifiedColumnName} import org.apache.spark.sql.pipelines.common.FlowStatus -import org.apache.spark.sql.pipelines.graph.{AutoCdcFlow, DataflowGraph, PipelineUpdateContextImpl, QueryOrigin, QueryOriginType} +import org.apache.spark.sql.pipelines.graph.{AutoCdcFlow, AutoCdcMergeFlow, DataflowGraph, PipelineUpdateContextImpl, QueryOrigin, QueryOriginType} import org.apache.spark.sql.pipelines.logging.EventLevel import org.apache.spark.sql.pipelines.utils.{EventVerificationTestHelpers, TestPipelineUpdateContextMixin} import org.apache.spark.sql.types.StructType @@ -53,10 +53,15 @@ class PythonPipelineSuite with TestPipelineUpdateContextMixin with EventVerificationTestHelpers { - def buildGraph(pythonText: String): DataflowGraph = { + def buildGraph( + pythonText: String, + defaultCatalog: Option[String] = None, + defaultDatabase: Option[String] = None): DataflowGraph = { val indentedPythonText = pythonText.linesIterator.map(" " + _).mkString("\n") // create a unique identifier to allow identifying the session and dataflow graph val customSessionIdentifier = UUID.randomUUID().toString + val defaultCatalogPyExpr = defaultCatalog.map(c => s""""$c"""").getOrElse("None") + val defaultDatabasePyExpr = defaultDatabase.map(d => s""""$d"""").getOrElse("None") val pythonCode = s""" |from pyspark.sql import SparkSession @@ -80,8 +85,8 @@ class PythonPipelineSuite | |dataflow_graph_id = create_dataflow_graph( | spark, - | default_catalog=None, - | default_database=None, + | default_catalog=$defaultCatalogPyExpr, + | default_database=$defaultDatabasePyExpr, | sql_conf={}, |) | @@ -1082,26 +1087,6 @@ class PythonPipelineSuite assert(flow.destinationIdentifier == graphIdentifier("target")) } - test("AutoCDC API: omitted stored_as_scd_type defaults to ScdType.Type1") { - val flow = buildAutoCdcFlow( - """ - |@dp.table - |def src(): - | return spark.readStream.format("rate").load() - | - |dp.create_streaming_table("target") - | - |dp.create_auto_cdc_flow( - | target = "target", - | source = "src", - | keys = ["value"], - | sequence_by = "timestamp", - |) - |""".stripMargin) - - assert(flow.changeArgs.storedAsScdType == ScdType.Type1) - } - test("AutoCDC API: explicit stored_as_scd_type=1 is forwarded as ScdType.Type1") { val flow = buildAutoCdcFlow( """ @@ -1215,6 +1200,140 @@ class PythonPipelineSuite assert(resolvedFlow.inputs == Set(graphIdentifier("src"))) } + test("AutoCDC API: single-part `source` inherits the pipeline's default catalog/database") { + // Source registers with the pipeline's defaults; the AutoCDC flow references it by single-part + // name. The resolved input should carry the qualified TableIdentifier, demonstrating that + // FlowAnalysis qualifies the UnresolvedRelation against the flow's QueryContext (same path + // used by RelationFlowDetails flows). + val graph = buildGraph( + """ + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = ["value"], + | sequence_by = "timestamp", + |) + |""".stripMargin, + defaultCatalog = Some("spark_catalog"), + defaultDatabase = Some("default")).resolve() + + val resolvedFlow = graph.resolvedFlow( + TableIdentifier("target", Some("default"), Some("spark_catalog"))) + assert( + resolvedFlow.inputs == + Set(TableIdentifier("src", Some("default"), Some("spark_catalog")))) + } + + test("AutoCDC API: multi-part `source` resolves to the corresponding qualified dataset") { + val graph = buildGraph( + """ + |@dp.table(name = "some_catalog.some_schema.src") + |def irrelevant(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table(name = "some_catalog.some_schema.target") + | + |dp.create_auto_cdc_flow( + | target = "some_catalog.some_schema.target", + | source = "some_catalog.some_schema.src", + | keys = ["value"], + | sequence_by = "timestamp", + |) + |""".stripMargin).resolve() + + val targetIdent = TableIdentifier("target", Some("some_schema"), Some("some_catalog")) + val srcIdent = TableIdentifier("src", Some("some_schema"), Some("some_catalog")) + val resolvedFlow = graph.resolvedFlow(targetIdent) + assert(resolvedFlow.inputs == Set(srcIdent)) + } + + test("AutoCDC API: non-attribute expression in keys is rejected") { + val ex = intercept[RuntimeException] { + buildGraph( + """ + |from pyspark.sql.functions import expr + | + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = [expr("value + 1")], + | sequence_by = "timestamp", + |) + |""".stripMargin) + } + assert(ex.getMessage.contains("AUTOCDC_NON_COLUMN_IDENTIFIER")) + } + + test("AutoCDC API: specifying both column_list and except_column_list is rejected") { + // The Python create_auto_cdc_flow API does not currently enforce the "at most one" contract + // client-side, so the proto carries both lists to the server, where the structured error is + // raised. If/when a Python-side check is added, this test guards against the server-side + // defense being silently bypassed. + val ex = intercept[RuntimeException] { + buildGraph( + """ + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = ["value"], + | sequence_by = "timestamp", + | column_list = ["value"], + | except_column_list = ["timestamp"], + |) + |""".stripMargin) + } + assert(ex.getMessage.contains("AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST")) + } + + test("AutoCDC API: registered flow survives graph resolution and validation end-to-end") { + val graph = buildGraph( + """ + |@dp.table + |def src(): + | return spark.readStream.format("rate").load() + | + |dp.create_streaming_table("target") + | + |dp.create_auto_cdc_flow( + | target = "target", + | source = "src", + | keys = ["value"], + | sequence_by = "timestamp", + | apply_as_deletes = "value % 2 = 0", + | column_list = ["value", "timestamp"], + |) + |""".stripMargin).resolve().validate() + + val resolvedFlow = graph.resolvedFlow(graphIdentifier("target")) + assert(resolvedFlow.isInstanceOf[AutoCdcMergeFlow]) + val mergeFlow = resolvedFlow.asInstanceOf[AutoCdcMergeFlow] + assert(mergeFlow.changeArgs.keys == Seq(UnqualifiedColumnName("value"))) + assert(mergeFlow.changeArgs.sequencing.expr.sql == "timestamp") + assert(mergeFlow.changeArgs.deleteCondition.isDefined) + assert(mergeFlow.changeArgs.columnSelection.contains( + ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("value"), UnqualifiedColumnName("timestamp"))))) + assert(mergeFlow.changeArgs.storedAsScdType == ScdType.Type1) + } + /** * Executes Python code in a separate process and returns the exit code. * diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala index b975e06807f57..915423fc6f0e2 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala @@ -32,14 +32,16 @@ case class UnqualifiedColumnName private (name: String) { } object UnqualifiedColumnName { - def apply(input: String): UnqualifiedColumnName = { - val nameParts = CatalystSqlParser.parseMultipartIdentifier(input) + def apply(nameParts: Seq[String]): UnqualifiedColumnName = { if (nameParts.length != 1) { - throw multipartColumnIdentifierError(input, nameParts) + throw multipartColumnIdentifierError(nameParts.mkString("."), nameParts) } new UnqualifiedColumnName(nameParts.head) } + def apply(input: String): UnqualifiedColumnName = + apply(CatalystSqlParser.parseMultipartIdentifier(input)) + private def multipartColumnIdentifierError( columnName: String, nameParts: Seq[String] From e51b3cb445cd9fae0fb64011bacde113f1bb8d60 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Wed, 27 May 2026 04:14:12 +0000 Subject: [PATCH 5/9] PR feedback --- .../resources/error/error-conditions.json | 16 +++- .../connect/pipelines/PipelinesHandler.scala | 38 ++++---- .../pipelines/PythonPipelineSuite.scala | 95 ++++++------------- .../pipelines/autocdc/ChangeArgsSuite.scala | 6 +- 4 files changed, 63 insertions(+), 92 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 1b0770e6c0b9f..81dba20e2a9be 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -238,11 +238,23 @@ }, "sqlState" : "22000" }, + "AUTOCDC_MISSING_SEQUENCE_BY" : { + "message" : [ + "AutoCDC flow is missing a required `sequence_by` expression. Specify a `sequence_by` column or expression that orders incoming change events." + ], + "sqlState" : "22023" + }, + "AUTOCDC_MISSING_SOURCE" : { + "message" : [ + "AutoCDC flow is missing a required `source` table name. Specify the name of the streaming source table the flow should read from." + ], + "sqlState" : "22023" + }, "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : { "message" : [ "Expected a single column identifier; got the multi-part identifier (parts: )." ], - "sqlState" : "42703" + "sqlState" : "22023" }, "AUTOCDC_MULTIPLE_FLOWS_TO_TARGET" : { "message" : [ @@ -254,7 +266,7 @@ "message" : [ "Expected a column identifier; got the non-attribute expression ``. AutoCDC keys, sequence_by, column_list, and except_column_list must reference unqualified column names." ], - "sqlState" : "42703" + "sqlState" : "22023" }, "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT" : { "message" : [ diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index b03434ac9c6a7..fd21691f80cd6 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -438,14 +438,19 @@ private[connect] object PipelinesHandler extends Logging { // TODO(SPARK-57093): ignore_null_updates_column_list and ignore_null_updates_except_column_list // are declared on AutoCdcFlowDetails but are not yet honored by the engine; wire them // through once SCD1 ignore-null support lands. + + if (!autoCdcDetails.hasSource) { + throw new AnalysisException("AUTOCDC_MISSING_SOURCE", Map.empty) + } + if (!autoCdcDetails.hasSequenceBy) { + throw new AnalysisException("AUTOCDC_MISSING_SEQUENCE_BY", Map.empty) + } + val sourcePlan: LogicalPlan = UnresolvedRelation( multipartIdentifier = GraphIdentifierManager - .parseTableIdentifier( - name = autoCdcDetails.getSource, - spark = sessionHolder.session - ).nameParts, - isStreaming = true - ) + .parseTableIdentifier(name = autoCdcDetails.getSource, spark = sessionHolder.session) + .nameParts, + isStreaming = true) val toColumn: proto.Expression => Column = expr => Column(transformExpressionFunc(expr)) @@ -455,8 +460,7 @@ private[connect] object PipelinesHandler extends Logging { case other => throw new AnalysisException( "AUTOCDC_NON_COLUMN_IDENTIFIER", - Map("expression" -> other.sql) - ) + Map("expression" -> other.sql)) } val keys = autoCdcDetails.getKeysList.asScala.toSeq.map(asUnqualifiedColumnName) @@ -465,10 +469,7 @@ private[connect] object PipelinesHandler extends Logging { val included = autoCdcDetails.getColumnListList.asScala.toSeq val excluded = autoCdcDetails.getExceptColumnListList.asScala.toSeq if (included.nonEmpty && excluded.nonEmpty) { - throw new AnalysisException( - "AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST", - Map.empty - ) + throw new AnalysisException("AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST", Map.empty) } else if (included.nonEmpty) { Some(ColumnSelection.IncludeColumns(included.map(asUnqualifiedColumnName))) } else if (excluded.nonEmpty) { @@ -492,11 +493,8 @@ private[connect] object PipelinesHandler extends Logging { sequencing = toColumn(autoCdcDetails.getSequenceBy), storedAsScdType = scdType, deleteCondition = - Option.when(autoCdcDetails.hasApplyAsDeletes)( - toColumn(autoCdcDetails.getApplyAsDeletes) - ), - columnSelection = columnSelection - ) + Option.when(autoCdcDetails.hasApplyAsDeletes)(toColumn(autoCdcDetails.getApplyAsDeletes)), + columnSelection = columnSelection) AutoCdcFlow( identifier = flowIdentifier, @@ -511,10 +509,8 @@ private[connect] object PipelinesHandler extends Logging { flow.getSourceCodeLocation.getLineNumber), objectType = Some(QueryOriginType.Flow.toString), objectName = Option(flowIdentifier.unquotedString), - language = Some(Python()) - ), - changeArgs = changeArgs - ) + language = Some(Python())), + changeArgs = changeArgs) } private def startRun( diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index e1e5eef3a8370..1e18abbcf9cd9 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -950,8 +950,7 @@ class PythonPipelineSuite } test("AutoCDC API: minimal flow registers an AutoCdcFlow with default name and SCD1 default") { - val flow = buildAutoCdcFlow( - """ + val flow = buildAutoCdcFlow(""" |@dp.table |def src(): | return spark.readStream.format("rate").load() @@ -976,8 +975,7 @@ class PythonPipelineSuite } test("AutoCDC API: composite keys are forwarded to ChangeArgs in order") { - val flow = buildAutoCdcFlow( - """ + val flow = buildAutoCdcFlow(""" |@dp.table |def src(): | return spark.readStream.format("rate").load() @@ -992,13 +990,13 @@ class PythonPipelineSuite |) |""".stripMargin) - assert(flow.changeArgs.keys == - Seq(UnqualifiedColumnName("value"), UnqualifiedColumnName("timestamp"))) + assert( + flow.changeArgs.keys == + Seq(UnqualifiedColumnName("value"), UnqualifiedColumnName("timestamp"))) } test("AutoCDC API: apply_as_deletes is forwarded as a delete condition column") { - val flow = buildAutoCdcFlow( - """ + val flow = buildAutoCdcFlow(""" |@dp.table |def src(): | return spark.readStream.format("rate").load() @@ -1021,8 +1019,7 @@ class PythonPipelineSuite } test("AutoCDC API: column_list is forwarded as IncludeColumns") { - val flow = buildAutoCdcFlow( - """ + val flow = buildAutoCdcFlow(""" |@dp.table |def src(): | return spark.readStream.format("rate").load() @@ -1038,14 +1035,13 @@ class PythonPipelineSuite |) |""".stripMargin) - assert(flow.changeArgs.columnSelection.contains( - ColumnSelection.IncludeColumns( + assert( + flow.changeArgs.columnSelection.contains(ColumnSelection.IncludeColumns( Seq(UnqualifiedColumnName("value"), UnqualifiedColumnName("timestamp"))))) } test("AutoCDC API: except_column_list is forwarded as ExcludeColumns") { - val flow = buildAutoCdcFlow( - """ + val flow = buildAutoCdcFlow(""" |@dp.table |def src(): | return spark.readStream.format("rate").load() @@ -1061,13 +1057,13 @@ class PythonPipelineSuite |) |""".stripMargin) - assert(flow.changeArgs.columnSelection.contains( - ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("timestamp"))))) + assert( + flow.changeArgs.columnSelection.contains( + ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("timestamp"))))) } test("AutoCDC API: explicit `name` is honored as the flow identifier") { - val flow = buildAutoCdcFlow( - """ + val flow = buildAutoCdcFlow(""" |@dp.table |def src(): | return spark.readStream.format("rate").load() @@ -1087,31 +1083,9 @@ class PythonPipelineSuite assert(flow.destinationIdentifier == graphIdentifier("target")) } - test("AutoCDC API: explicit stored_as_scd_type=1 is forwarded as ScdType.Type1") { - val flow = buildAutoCdcFlow( - """ - |@dp.table - |def src(): - | return spark.readStream.format("rate").load() - | - |dp.create_streaming_table("target") - | - |dp.create_auto_cdc_flow( - | target = "target", - | source = "src", - | keys = ["value"], - | sequence_by = "timestamp", - | stored_as_scd_type = 1, - |) - |""".stripMargin) - - assert(flow.changeArgs.storedAsScdType == ScdType.Type1) - } - test("AutoCDC API: multi-part `keys` column is rejected at flow registration") { val ex = intercept[RuntimeException] { - buildAutoCdcFlow( - """ + buildAutoCdcFlow(""" |@dp.table |def src(): | return spark.readStream.format("rate").load() @@ -1131,8 +1105,7 @@ class PythonPipelineSuite test("AutoCDC API: multi-part `column_list` entry is rejected at flow registration") { val ex = intercept[RuntimeException] { - buildAutoCdcFlow( - """ + buildAutoCdcFlow(""" |@dp.table |def src(): | return spark.readStream.format("rate").load() @@ -1152,8 +1125,7 @@ class PythonPipelineSuite } test("AutoCDC API: Column-object form of keys/sequence_by/apply_as_deletes is honored") { - val flow = buildAutoCdcFlow( - """ + val flow = buildAutoCdcFlow(""" |from pyspark.sql.functions import col, expr | |@dp.table @@ -1180,8 +1152,7 @@ class PythonPipelineSuite } test("AutoCDC API: graph resolves with the source streaming table as the flow's input") { - val graph = buildGraph( - """ + val graph = buildGraph(""" |@dp.table |def src(): | return spark.readStream.format("rate").load() @@ -1201,10 +1172,6 @@ class PythonPipelineSuite } test("AutoCDC API: single-part `source` inherits the pipeline's default catalog/database") { - // Source registers with the pipeline's defaults; the AutoCDC flow references it by single-part - // name. The resolved input should carry the qualified TableIdentifier, demonstrating that - // FlowAnalysis qualifies the UnresolvedRelation against the flow's QueryContext (same path - // used by RelationFlowDetails flows). val graph = buildGraph( """ |@dp.table @@ -1220,19 +1187,18 @@ class PythonPipelineSuite | sequence_by = "timestamp", |) |""".stripMargin, - defaultCatalog = Some("spark_catalog"), - defaultDatabase = Some("default")).resolve() + defaultCatalog = Some("my_catalog"), + defaultDatabase = Some("my_db")).resolve() - val resolvedFlow = graph.resolvedFlow( - TableIdentifier("target", Some("default"), Some("spark_catalog"))) + val resolvedFlow = + graph.resolvedFlow(TableIdentifier("target", Some("my_db"), Some("my_catalog"))) assert( resolvedFlow.inputs == - Set(TableIdentifier("src", Some("default"), Some("spark_catalog")))) + Set(TableIdentifier("src", Some("my_db"), Some("my_catalog")))) } test("AutoCDC API: multi-part `source` resolves to the corresponding qualified dataset") { - val graph = buildGraph( - """ + val graph = buildGraph(""" |@dp.table(name = "some_catalog.some_schema.src") |def irrelevant(): | return spark.readStream.format("rate").load() @@ -1255,8 +1221,7 @@ class PythonPipelineSuite test("AutoCDC API: non-attribute expression in keys is rejected") { val ex = intercept[RuntimeException] { - buildGraph( - """ + buildGraph(""" |from pyspark.sql.functions import expr | |@dp.table @@ -1282,8 +1247,7 @@ class PythonPipelineSuite // raised. If/when a Python-side check is added, this test guards against the server-side // defense being silently bypassed. val ex = intercept[RuntimeException] { - buildGraph( - """ + buildGraph(""" |@dp.table |def src(): | return spark.readStream.format("rate").load() @@ -1304,8 +1268,7 @@ class PythonPipelineSuite } test("AutoCDC API: registered flow survives graph resolution and validation end-to-end") { - val graph = buildGraph( - """ + val graph = buildGraph(""" |@dp.table |def src(): | return spark.readStream.format("rate").load() @@ -1328,8 +1291,8 @@ class PythonPipelineSuite assert(mergeFlow.changeArgs.keys == Seq(UnqualifiedColumnName("value"))) assert(mergeFlow.changeArgs.sequencing.expr.sql == "timestamp") assert(mergeFlow.changeArgs.deleteCondition.isDefined) - assert(mergeFlow.changeArgs.columnSelection.contains( - ColumnSelection.IncludeColumns( + assert( + mergeFlow.changeArgs.columnSelection.contains(ColumnSelection.IncludeColumns( Seq(UnqualifiedColumnName("value"), UnqualifiedColumnName("timestamp"))))) assert(mergeFlow.changeArgs.storedAsScdType == ScdType.Type1) } diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala index 1de2120a8f915..7be111003762f 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala @@ -326,7 +326,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { UnqualifiedColumnName("a.b") }, condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", - sqlState = "42703", + sqlState = "22023", parameters = Map( "columnName" -> "a.b", "nameParts" -> "a, b" @@ -340,7 +340,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { UnqualifiedColumnName("src.x") }, condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", - sqlState = "42703", + sqlState = "22023", parameters = Map( "columnName" -> "src.x", "nameParts" -> "src, x" @@ -354,7 +354,7 @@ class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession { UnqualifiedColumnName("a.b.c") }, condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER", - sqlState = "42703", + sqlState = "22023", parameters = Map( "columnName" -> "a.b.c", "nameParts" -> "a, b, c" From ddf76ec98ad8f32e63f37d9101c35a0a3aa1cca0 Mon Sep 17 00:00:00 2001 From: Anish Mahto Date: Wed, 27 May 2026 07:09:17 +0000 Subject: [PATCH 6/9] test fix; can't change in-memory catalog --- .../sql/connect/pipelines/PythonPipelineSuite.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 1e18abbcf9cd9..403d019a71958 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -1171,7 +1171,10 @@ class PythonPipelineSuite assert(resolvedFlow.inputs == Set(graphIdentifier("src"))) } - test("AutoCDC API: single-part `source` inherits the pipeline's default catalog/database") { + test("AutoCDC API: single-part `source` inherits the pipeline's default database") { + // Pick a non-default database (`my_db`) inside the registered `spark_catalog` so the + // pipeline-default database differs from the connect session's current database (`default`). + spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.my_db") val graph = buildGraph( """ |@dp.table @@ -1187,14 +1190,14 @@ class PythonPipelineSuite | sequence_by = "timestamp", |) |""".stripMargin, - defaultCatalog = Some("my_catalog"), + defaultCatalog = Some("spark_catalog"), defaultDatabase = Some("my_db")).resolve() val resolvedFlow = - graph.resolvedFlow(TableIdentifier("target", Some("my_db"), Some("my_catalog"))) + graph.resolvedFlow(TableIdentifier("target", Some("my_db"), Some("spark_catalog"))) assert( resolvedFlow.inputs == - Set(TableIdentifier("src", Some("my_db"), Some("my_catalog")))) + Set(TableIdentifier("src", Some("my_db"), Some("spark_catalog")))) } test("AutoCDC API: multi-part `source` resolves to the corresponding qualified dataset") { From 793a2b1c6f575dd78d3e8605ace15a51f0a48d8e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 27 May 2026 11:44:48 +0000 Subject: [PATCH 7/9] Address review comments: extract flowOrigin helper, exercise catalog inheritance - Factor the duplicated QueryOrigin block in PipelinesHandler into a private flowOrigin helper shared by the RELATION_FLOW_DETAILS and AUTO_CDC_FLOW_DETAILS paths. - Rewrite the pipeline-default inheritance test to exercise both catalog and database halves: register a V2 in-memory `my_catalog` via sparkConf, plumb a setupSql parameter through buildGraph so the test creates the `my_db` namespace on the same Connect session that subsequently creates the dataflow graph, and assert against `my_catalog.my_db.{src,target}` (neither of which is the session default). --- .../connect/pipelines/PipelinesHandler.scala | 29 +++++++------- .../pipelines/PythonPipelineSuite.scala | 38 ++++++++++++++----- 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index fd21691f80cd6..15da12237e441 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -391,14 +391,7 @@ private[connect] object PipelinesHandler extends Logging { sqlConf = flow.getSqlConfMap.asScala.toMap, once = false, queryContext = QueryContext(Option(defaultCatalog), Option(defaultDatabase)), - origin = QueryOrigin( - filePath = Option.when(flow.getSourceCodeLocation.hasFileName)( - flow.getSourceCodeLocation.getFileName), - line = Option.when(flow.getSourceCodeLocation.hasLineNumber)( - flow.getSourceCodeLocation.getLineNumber), - objectType = Some(QueryOriginType.Flow.toString), - objectName = Option(flowIdentifier.unquotedString), - language = Some(Python())))) + origin = flowOrigin(flow, flowIdentifier))) case proto.PipelineCommand.DefineFlow.DetailsCase.AUTO_CDC_FLOW_DETAILS => graphElementRegistry.registerFlow( buildAutoCdcFlow( @@ -502,17 +495,21 @@ private[connect] object PipelinesHandler extends Logging { func = FlowAnalysis.createFlowFunctionFromLogicalPlan(sourcePlan), sqlConf = flow.getSqlConfMap.asScala.toMap, queryContext = QueryContext(Option(defaultCatalog), Option(defaultDatabase)), - origin = QueryOrigin( - filePath = Option.when(flow.getSourceCodeLocation.hasFileName)( - flow.getSourceCodeLocation.getFileName), - line = Option.when(flow.getSourceCodeLocation.hasLineNumber)( - flow.getSourceCodeLocation.getLineNumber), - objectType = Some(QueryOriginType.Flow.toString), - objectName = Option(flowIdentifier.unquotedString), - language = Some(Python())), + origin = flowOrigin(flow, flowIdentifier), changeArgs = changeArgs) } + private def flowOrigin( + flow: proto.PipelineCommand.DefineFlow, + flowIdentifier: TableIdentifier): QueryOrigin = QueryOrigin( + filePath = Option.when(flow.getSourceCodeLocation.hasFileName)( + flow.getSourceCodeLocation.getFileName), + line = Option.when(flow.getSourceCodeLocation.hasLineNumber)( + flow.getSourceCodeLocation.getLineNumber), + objectType = Some(QueryOriginType.Flow.toString), + objectName = Option(flowIdentifier.unquotedString), + language = Some(Python())) + private def startRun( cmd: proto.PipelineCommand.StartRun, responseObserver: StreamObserver[ExecutePlanResponse], diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 403d019a71958..6431105d4583d 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -29,13 +29,14 @@ import scala.util.Try import org.scalactic.source.Position import org.scalatest.Tag +import org.apache.spark.SparkConf import org.apache.spark.api.python.PythonUtils import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.classic.ColumnConversions._ import org.apache.spark.sql.connect.PythonTestDepsChecker import org.apache.spark.sql.connect.service.SparkConnectService -import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, TableCatalog} import org.apache.spark.sql.pipelines.Language.Python import org.apache.spark.sql.pipelines.autocdc.{ColumnSelection, ScdType, UnqualifiedColumnName} import org.apache.spark.sql.pipelines.common.FlowStatus @@ -53,15 +54,27 @@ class PythonPipelineSuite with TestPipelineUpdateContextMixin with EventVerificationTestHelpers { + // Register a V2 in-memory catalog so AutoCDC tests can exercise pipeline-default-catalog + // inheritance against a name that is never the session default `spark_catalog`. The V2 in-memory + // catalog doesn't support streaming reads, but the AutoCDC tests that touch it only run graph + // resolution -- not pipeline execution -- so this is sufficient. + override def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.catalog.my_catalog", classOf[InMemoryTableCatalog].getName) + def buildGraph( pythonText: String, defaultCatalog: Option[String] = None, - defaultDatabase: Option[String] = None): DataflowGraph = { + defaultDatabase: Option[String] = None, + setupSql: Seq[String] = Nil): DataflowGraph = { val indentedPythonText = pythonText.linesIterator.map(" " + _).mkString("\n") // create a unique identifier to allow identifying the session and dataflow graph val customSessionIdentifier = UUID.randomUUID().toString val defaultCatalogPyExpr = defaultCatalog.map(c => s""""$c"""").getOrElse("None") val defaultDatabasePyExpr = defaultDatabase.map(d => s""""$d"""").getOrElse("None") + val setupSqlLines = setupSql + .map(stmt => s"""spark.sql(\"\"\"$stmt\"\"\")""") + .map("|" + _) + .mkString("\n") val pythonCode = s""" |from pyspark.sql import SparkSession @@ -83,6 +96,8 @@ class PythonPipelineSuite | .config("spark.custom.identifier", "$customSessionIdentifier") \\ | .create() | + $setupSqlLines + | |dataflow_graph_id = create_dataflow_graph( | spark, | default_catalog=$defaultCatalogPyExpr, @@ -1171,10 +1186,12 @@ class PythonPipelineSuite assert(resolvedFlow.inputs == Set(graphIdentifier("src"))) } - test("AutoCDC API: single-part `source` inherits the pipeline's default database") { - // Pick a non-default database (`my_db`) inside the registered `spark_catalog` so the - // pipeline-default database differs from the connect session's current database (`default`). - spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.my_db") + test("AutoCDC API: single-part `source` inherits the pipeline's default catalog and database") { + // Use `my_catalog` (registered in `sparkConf`) so the pipeline-default catalog differs from + // the session default (`spark_catalog`), and a non-default namespace `my_db` so the + // pipeline-default database differs from the session default (`default`). The CREATE NAMESPACE + // runs on the same Connect session that subsequently creates the dataflow graph, so the + // namespace is visible to that session's per-session V2 catalog instance. val graph = buildGraph( """ |@dp.table @@ -1190,14 +1207,15 @@ class PythonPipelineSuite | sequence_by = "timestamp", |) |""".stripMargin, - defaultCatalog = Some("spark_catalog"), - defaultDatabase = Some("my_db")).resolve() + defaultCatalog = Some("my_catalog"), + defaultDatabase = Some("my_db"), + setupSql = Seq("CREATE NAMESPACE IF NOT EXISTS my_catalog.my_db")).resolve() val resolvedFlow = - graph.resolvedFlow(TableIdentifier("target", Some("my_db"), Some("spark_catalog"))) + graph.resolvedFlow(TableIdentifier("target", Some("my_db"), Some("my_catalog"))) assert( resolvedFlow.inputs == - Set(TableIdentifier("src", Some("my_db"), Some("spark_catalog")))) + Set(TableIdentifier("src", Some("my_db"), Some("my_catalog")))) } test("AutoCDC API: multi-part `source` resolves to the corresponding qualified dataset") { From fe9c02dd484a31b19459340e70a7f9f8b424e558 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 27 May 2026 11:50:54 +0000 Subject: [PATCH 8/9] Simplify setupSql interpolation in buildGraph Move the stripMargin `|` marker from each generated statement back into the template, so we only need one mkString("\n") instead of mapping each statement to "|" + _. --- .../sql/connect/pipelines/PythonPipelineSuite.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 6431105d4583d..1858e2cde51a7 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -65,16 +65,13 @@ class PythonPipelineSuite pythonText: String, defaultCatalog: Option[String] = None, defaultDatabase: Option[String] = None, - setupSql: Seq[String] = Nil): DataflowGraph = { + setupSql: Option[String] = None): DataflowGraph = { val indentedPythonText = pythonText.linesIterator.map(" " + _).mkString("\n") // create a unique identifier to allow identifying the session and dataflow graph val customSessionIdentifier = UUID.randomUUID().toString val defaultCatalogPyExpr = defaultCatalog.map(c => s""""$c"""").getOrElse("None") val defaultDatabasePyExpr = defaultDatabase.map(d => s""""$d"""").getOrElse("None") - val setupSqlLines = setupSql - .map(stmt => s"""spark.sql(\"\"\"$stmt\"\"\")""") - .map("|" + _) - .mkString("\n") + val setupSqlLine = setupSql.map(stmt => s"""spark.sql(\"\"\"$stmt\"\"\")""").getOrElse("") val pythonCode = s""" |from pyspark.sql import SparkSession @@ -96,7 +93,7 @@ class PythonPipelineSuite | .config("spark.custom.identifier", "$customSessionIdentifier") \\ | .create() | - $setupSqlLines + |$setupSqlLine | |dataflow_graph_id = create_dataflow_graph( | spark, @@ -1209,7 +1206,7 @@ class PythonPipelineSuite |""".stripMargin, defaultCatalog = Some("my_catalog"), defaultDatabase = Some("my_db"), - setupSql = Seq("CREATE NAMESPACE IF NOT EXISTS my_catalog.my_db")).resolve() + setupSql = Some("CREATE NAMESPACE IF NOT EXISTS my_catalog.my_db")).resolve() val resolvedFlow = graph.resolvedFlow(TableIdentifier("target", Some("my_db"), Some("my_catalog"))) From e5ae73f9c4d4328b16dcbf286d5905906d0b655d Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 27 May 2026 15:16:03 +0000 Subject: [PATCH 9/9] Fix CI: scalafmt formatting and stale line-number assertions The setupSqlLine/blank line added to the python template in buildGraph shifts every subsequent generated line by +2, so update the source-location assertions in PythonPipelineSuite accordingly. Also apply scalafmt to PipelinesHandler. --- .../spark/sql/connect/pipelines/PipelinesHandler.scala | 4 ++-- .../sql/connect/pipelines/PythonPipelineSuite.scala | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index 15da12237e441..f8edbc9928000 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -502,8 +502,8 @@ private[connect] object PipelinesHandler extends Logging { private def flowOrigin( flow: proto.PipelineCommand.DefineFlow, flowIdentifier: TableIdentifier): QueryOrigin = QueryOrigin( - filePath = Option.when(flow.getSourceCodeLocation.hasFileName)( - flow.getSourceCodeLocation.getFileName), + filePath = + Option.when(flow.getSourceCodeLocation.hasFileName)(flow.getSourceCodeLocation.getFileName), line = Option.when(flow.getSourceCodeLocation.hasLineNumber)( flow.getSourceCodeLocation.getLineNumber), objectType = Some(QueryOriginType.Flow.toString), diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 1858e2cde51a7..834e2d8144e13 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -170,7 +170,7 @@ class PythonPipelineSuite QueryOrigin( language = Option(Python()), filePath = Option(""), - line = Option(34), + line = Option(36), objectName = Option("spark_catalog.default.table1"), objectType = Option(QueryOriginType.Flow.toString))), errorChecker = ex => @@ -222,7 +222,7 @@ class PythonPipelineSuite QueryOrigin( language = Option(Python()), filePath = Option(""), - line = Option(40), + line = Option(42), objectName = Option("spark_catalog.default.mv2"), objectType = Option(QueryOriginType.Flow.toString))), expectedEventLevel = EventLevel.INFO) @@ -236,7 +236,7 @@ class PythonPipelineSuite QueryOrigin( language = Option(Python()), filePath = Option(""), - line = Option(44), + line = Option(46), objectName = Option("spark_catalog.default.mv"), objectType = Option(QueryOriginType.Flow.toString))), expectedEventLevel = EventLevel.INFO) @@ -254,7 +254,7 @@ class PythonPipelineSuite QueryOrigin( language = Option(Python()), filePath = Option(""), - line = Option(34), + line = Option(36), objectName = Option("spark_catalog.default.table1"), objectType = Option(QueryOriginType.Flow.toString))), expectedEventLevel = EventLevel.INFO) @@ -268,7 +268,7 @@ class PythonPipelineSuite QueryOrigin( language = Option(Python()), filePath = Option(""), - line = Option(49), + line = Option(51), objectName = Option("spark_catalog.default.standalone_flow1"), objectType = Option(QueryOriginType.Flow.toString))), expectedEventLevel = EventLevel.INFO)