Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@
],
"sqlState" : "0A000"
},
"AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST" : {
"message" : [
"AutoCDC flow specifies both `column_list` and `except_column_list`; at most one may be provided."
],
"sqlState" : "42613"
},
"AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA" : {
"message" : [
"Using <caseSensitivity> column name comparison, the following columns are not present in the <schemaName> schema: <missingColumns>. Available columns: <availableColumns>."
Expand Down Expand Up @@ -232,18 +238,36 @@
},
"sqlState" : "22000"
},
"AUTOCDC_MISSING_SEQUENCE_BY" : {
"message" : [
"AutoCDC flow is missing a required `sequence_by` expression. Specify a `sequence_by` column or expression that orders incoming change events."
],
"sqlState" : "22023"
},
"AUTOCDC_MISSING_SOURCE" : {
"message" : [
"AutoCDC flow is missing a required `source` table name. Specify the name of the streaming source table the flow should read from."
],
"sqlState" : "22023"
},
"AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : {
"message" : [
"Expected a single column identifier; got the multi-part identifier <columnName> (parts: <nameParts>)."
],
"sqlState" : "42703"
"sqlState" : "22023"
},
"AUTOCDC_MULTIPLE_FLOWS_TO_TARGET" : {
"message" : [
"Invalid AutoCDC destination <tableName> with multiple flows: <flows>. An AutoCDC target table must have exactly one flow writing to it."
],
"sqlState" : "42000"
},
"AUTOCDC_NON_COLUMN_IDENTIFIER" : {
"message" : [
"Expected a column identifier; got the non-attribute expression `<expression>`. AutoCDC keys, sequence_by, column_list, and except_column_list must reference unqualified column names."
],
"sqlState" : "22023"
},
"AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT" : {
"message" : [
"The column `<columnName>` in the <schemaName> schema collides with the reserved AutoCDC column name prefix `<reservedColumnNamePrefix>` (using <caseSensitivity> column name comparison). Rename or remove the column."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,28 @@

package org.apache.spark.sql.connect.pipelines

import scala.collection.Seq
import scala.jdk.CollectionConverters._
import scala.util.Using

import io.grpc.stub.StreamObserver

import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{ExecutePlanResponse, PipelineCommandResult, Relation, ResolvedIdentifier}
import org.apache.spark.connect.proto.PipelineCommand.DefineFlow.AutoCdcFlowDetails
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.{AnalysisException, Column}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{Command, CreateNamespace, CreateTable, CreateTableAsSelect, CreateView, DescribeRelation, DescribeTablePartition, DropView, InsertIntoStatement, LogicalPlan, RenameTable, ShowColumns, ShowCreateTable, ShowFunctions, ShowTableProperties, ShowTables, ShowViews}
import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.connect.service.SessionHolder
import org.apache.spark.sql.execution.command.{ShowCatalogsCommand, ShowNamespacesCommand}
import org.apache.spark.sql.pipelines.Language.Python
import org.apache.spark.sql.pipelines.autocdc.{ChangeArgs, ColumnSelection, ScdType, UnqualifiedColumnName}
import org.apache.spark.sql.pipelines.common.RunState.{CANCELED, FAILED}
import org.apache.spark.sql.pipelines.graph.{AllTables, FlowAnalysis, GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables, PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter, TemporaryView, UntypedFlow}
import org.apache.spark.sql.pipelines.graph.{AllTables, AutoCdcFlow, FlowAnalysis, GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables, PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter, TemporaryView, UntypedFlow}
import org.apache.spark.sql.pipelines.logging.{PipelineEvent, RunProgress}
import org.apache.spark.sql.types.StructType

Expand All @@ -52,14 +56,17 @@ private[connect] object PipelinesHandler extends Logging {
* @param transformRelationFunc
* Function used to convert a relation to a LogicalPlan. This is used when determining the
* LogicalPlan that a flow returns.
* @param transformExpressionFunc
* Function used to convert a proto expression to a Catalyst expression.
* @return
* The response after handling the command
*/
def handlePipelinesCommand(
sessionHolder: SessionHolder,
cmd: proto.PipelineCommand,
responseObserver: StreamObserver[ExecutePlanResponse],
transformRelationFunc: Relation => LogicalPlan): PipelineCommandResult = {
transformRelationFunc: Relation => LogicalPlan,
transformExpressionFunc: proto.Expression => Expression): PipelineCommandResult = {
// Currently most commands do not include any information in the response. We just send back
// an empty response to the client to indicate that the command was handled successfully
val defaultResponse = PipelineCommandResult.getDefaultInstance
Expand Down Expand Up @@ -99,7 +106,11 @@ private[connect] object PipelinesHandler extends Logging {
case proto.PipelineCommand.CommandTypeCase.DEFINE_FLOW =>
logInfo(s"Define pipelines flow cmd received: $cmd")
val resolvedFlow =
defineFlow(cmd.getDefineFlow, transformRelationFunc, sessionHolder)
defineFlow(
cmd.getDefineFlow,
transformRelationFunc,
transformExpressionFunc,
sessionHolder)
val identifierBuilder = ResolvedIdentifier.newBuilder()
resolvedFlow.catalog.foreach(identifierBuilder.setCatalogName)
resolvedFlow.database.foreach { ns =>
Expand Down Expand Up @@ -315,6 +326,7 @@ private[connect] object PipelinesHandler extends Logging {
private def defineFlow(
flow: proto.PipelineCommand.DefineFlow,
transformRelationFunc: Relation => LogicalPlan,
transformExpressionFunc: proto.Expression => Expression,
sessionHolder: SessionHolder): TableIdentifier = {
if (flow.hasOnce) {
throw new AnalysisException(
Expand Down Expand Up @@ -379,22 +391,125 @@ private[connect] object PipelinesHandler extends Logging {
sqlConf = flow.getSqlConfMap.asScala.toMap,
once = false,
queryContext = QueryContext(Option(defaultCatalog), Option(defaultDatabase)),
origin = QueryOrigin(
filePath = Option.when(flow.getSourceCodeLocation.hasFileName)(
flow.getSourceCodeLocation.getFileName),
line = Option.when(flow.getSourceCodeLocation.hasLineNumber)(
flow.getSourceCodeLocation.getLineNumber),
objectType = Some(QueryOriginType.Flow.toString),
objectName = Option(flowIdentifier.unquotedString),
language = Some(Python()))))
origin = flowOrigin(flow, flowIdentifier)))
case proto.PipelineCommand.DefineFlow.DetailsCase.AUTO_CDC_FLOW_DETAILS =>
throw new UnsupportedOperationException("AutoCdcFlowDetails is not yet implemented.")
graphElementRegistry.registerFlow(
Comment thread
AnishMahto marked this conversation as resolved.
buildAutoCdcFlow(
autoCdcDetails = flow.getAutoCdcFlowDetails,
flow = flow,
flowIdentifier = flowIdentifier,
destinationIdentifier = destinationIdentifier,
defaultCatalog = defaultCatalog,
defaultDatabase = defaultDatabase,
sessionHolder = sessionHolder,
transformExpressionFunc = transformExpressionFunc))
case other =>
throw new UnsupportedOperationException(s"Unsupported DefineFlow details case: $other")
}
flowIdentifier
}

/**
* Build an [[AutoCdcFlow]] from the proto-supplied AutoCDC flow details.
*
* The flow's source expression is encoded by the Python client as a streaming-table name; we
* model that on the server side as a streaming [[UnresolvedRelation]] so that pipelines flow
* analysis (which already handles `STREAM(t)` references) can resolve it against the rest of
* the dataflow graph.
*/
private def buildAutoCdcFlow(
autoCdcDetails: AutoCdcFlowDetails,
flow: proto.PipelineCommand.DefineFlow,
flowIdentifier: TableIdentifier,
destinationIdentifier: TableIdentifier,
defaultCatalog: String,
defaultDatabase: String,
sessionHolder: SessionHolder,
transformExpressionFunc: proto.Expression => Expression): AutoCdcFlow = {
Comment thread
AnishMahto marked this conversation as resolved.
// TODO(SPARK-57092): apply_as_truncates is declared on AutoCdcFlowDetails but is not yet
// honored by the engine; wire it through once SCD1 truncate support lands.
// TODO(SPARK-57093): ignore_null_updates_column_list and ignore_null_updates_except_column_list
// are declared on AutoCdcFlowDetails but are not yet honored by the engine; wire them
// through once SCD1 ignore-null support lands.

if (!autoCdcDetails.hasSource) {
throw new AnalysisException("AUTOCDC_MISSING_SOURCE", Map.empty)
}
if (!autoCdcDetails.hasSequenceBy) {
throw new AnalysisException("AUTOCDC_MISSING_SEQUENCE_BY", Map.empty)
}

val sourcePlan: LogicalPlan = UnresolvedRelation(
Comment thread
AnishMahto marked this conversation as resolved.
multipartIdentifier = GraphIdentifierManager
.parseTableIdentifier(name = autoCdcDetails.getSource, spark = sessionHolder.session)
.nameParts,
isStreaming = true)

val toColumn: proto.Expression => Column = expr => Column(transformExpressionFunc(expr))

val asUnqualifiedColumnName: proto.Expression => UnqualifiedColumnName = expr =>
transformExpressionFunc(expr) match {
case a: UnresolvedAttribute => UnqualifiedColumnName(a.nameParts)
case other =>
throw new AnalysisException(
"AUTOCDC_NON_COLUMN_IDENTIFIER",
Map("expression" -> other.sql))
}

val keys = autoCdcDetails.getKeysList.asScala.toSeq.map(asUnqualifiedColumnName)

val columnSelection: Option[ColumnSelection] = {
val included = autoCdcDetails.getColumnListList.asScala.toSeq
val excluded = autoCdcDetails.getExceptColumnListList.asScala.toSeq
if (included.nonEmpty && excluded.nonEmpty) {
throw new AnalysisException("AUTOCDC_BOTH_COLUMN_LIST_AND_EXCEPT_COLUMN_LIST", Map.empty)
} else if (included.nonEmpty) {
Some(ColumnSelection.IncludeColumns(included.map(asUnqualifiedColumnName)))
} else if (excluded.nonEmpty) {
Some(ColumnSelection.ExcludeColumns(excluded.map(asUnqualifiedColumnName)))
} else {
None
}
}

// Get user specified SCD type, or default to SCD1 if unspecified.
val scdType: ScdType = autoCdcDetails.getStoredAsScdType match {
case proto.PipelineCommand.DefineFlow.SCDType.SCD_TYPE_1 |
proto.PipelineCommand.DefineFlow.SCDType.SCD_TYPE_UNSPECIFIED =>
ScdType.Type1
case other =>
throw new UnsupportedOperationException(s"Unsupported AutoCDC SCD type: $other")
}

val changeArgs = ChangeArgs(
keys = keys,
sequencing = toColumn(autoCdcDetails.getSequenceBy),
storedAsScdType = scdType,
deleteCondition =
Option.when(autoCdcDetails.hasApplyAsDeletes)(toColumn(autoCdcDetails.getApplyAsDeletes)),
columnSelection = columnSelection)

AutoCdcFlow(
identifier = flowIdentifier,
destinationIdentifier = destinationIdentifier,
func = FlowAnalysis.createFlowFunctionFromLogicalPlan(sourcePlan),
sqlConf = flow.getSqlConfMap.asScala.toMap,
queryContext = QueryContext(Option(defaultCatalog), Option(defaultDatabase)),
origin = flowOrigin(flow, flowIdentifier),
changeArgs = changeArgs)
}

private def flowOrigin(
flow: proto.PipelineCommand.DefineFlow,
flowIdentifier: TableIdentifier): QueryOrigin = QueryOrigin(
filePath =
Option.when(flow.getSourceCodeLocation.hasFileName)(flow.getSourceCodeLocation.getFileName),
line = Option.when(flow.getSourceCodeLocation.hasLineNumber)(
flow.getSourceCodeLocation.getLineNumber),
objectType = Some(QueryOriginType.Flow.toString),
objectName = Option(flowIdentifier.unquotedString),
language = Some(Python()))

private def startRun(
cmd: proto.PipelineCommand.StartRun,
responseObserver: StreamObserver[ExecutePlanResponse],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2955,7 +2955,8 @@ class SparkConnectPlanner(
sessionHolder,
command,
responseObserver,
transformRelation)
transformRelation,
transformExpression)
executeHolder.eventsManager.postFinished()
responseObserver.onNext(
proto.ExecutePlanResponse
Expand Down
Loading