Skip to content

[SPARK-52348][CONNECT] Add support for Spark Connect handlers for pipeline commands #51057

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from

Conversation

jonmio
Copy link

@jonmio jonmio commented May 30, 2025

What changes were proposed in this pull request?

  • Introduces a PipelinesHandler which handles SparkConnect PipelineCommands. This follows the pattern of MLHandler where the SparkConnectPlanner delegates any ML commands to the MLHandler
  • Stream PipelineEvents that are emitted during pipeline execution back to the SparkConnect client
  • Rethrow exceptions that occur during pipeline execution in the StartRun handler so that they are automatically propagated back to the SC client

This is PR builds off changes in a few open PRs. I have squashed those changes into a single commit at the top of this PR - 49626fb. When reviewing please ignore that commit and just review all commits after that one.

Misc changes:

  • Convert to timestamp field in PipelineEvent proto from String to google.protobuf.Timestamp
  • Remove references to SerializedException and ErrorDetail in favor of representing errors just as Throwable

Why are the changes needed?

This change is needed to support Spark Declarative Pipelines.

Does this PR introduce any user-facing change?

Yes

How was this patch tested?

New unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@jonmio jonmio changed the title Sc pipelines [WIP] [DRAFT] Sc pipelines May 30, 2025
// The graph to attach this dataset to.
optional string dataflow_graph_id = 1;
// Parses the SQL file and registers all datasets and flows.
message DefineSqlGraphElements {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should live under the PipelineCommand message?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a PR for this: #51044

@jonmio jonmio changed the title [WIP] [DRAFT] Sc pipelines [WIP] [DRAFT] Add support for Spark Connect handlers for pipeline commands May 30, 2025
@sryza sryza changed the title [WIP] [DRAFT] Add support for Spark Connect handlers for pipeline commands [SPARK-52348] [WIP] [DRAFT] Add support for Spark Connect handlers for pipeline commands Jun 1, 2025
@sryza sryza self-assigned this Jun 1, 2025
@HyukjinKwon HyukjinKwon changed the title [SPARK-52348] [WIP] [DRAFT] Add support for Spark Connect handlers for pipeline commands [SPARK-52348][CONNECT] [WIP] [DRAFT] Add support for Spark Connect handlers for pipeline commands Jun 1, 2025
logInfo(s"Start pipeline cmd received: $cmd")
startRun(cmd.getStartRun, responseObserver, sessionHolder)
defaultResponse
// case proto.PipelineCommand.CommandTypeCase.DEFINE_SQL_GRAPH_ELEMENTS =>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: add this back once dependent PR is merged

Comment on lines 176 to 181
// filePath = Option.when(dataset.getSourceCodeLocation.hasFileName)(
// dataset.getSourceCodeLocation.getFileName
// ),
// line = Option.when(dataset.getSourceCodeLocation.hasLineNumber)(
// dataset.getSourceCodeLocation.getLineNumber
// ),
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: add this back once dependent PR is merged

@@ -31,12 +33,12 @@ import org.apache.spark.sql.pipelines.graph.QueryOrigin
*/
case class PipelineEvent(
id: String,
timestamp: String,
timestamp: Timestamp,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing to a timestamp type in favor of representing the timestamp as a string. This allows for easier formatting and better support for timezones

origin: PipelineEventOrigin,
level: EventLevel,
message: String,
details: EventDetails,
error: Option[ErrorDetail]
error: Option[Throwable]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this to a Throwable in favor of a custom class.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also removing a bunch of tests around the custom class and error serialization since that is no longer needed


package org.apache.spark.sql.pipelines

object QueryOriginType extends Enumeration {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is included in an upstream PR and can be removed once that is merged

}

// TODO: renable when dependency on SQL registration is merged
ignore(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will reenable this once the SQL registration PR is merged

@jonmio jonmio changed the title [SPARK-52348][CONNECT] [WIP] [DRAFT] Add support for Spark Connect handlers for pipeline commands [SPARK-52348][CONNECT] Add support for Spark Connect handlers for pipeline commands Jun 2, 2025
def withSqlConf[T](spark: SparkSession, pairs: (String, String)*)(f: => T): T = {
val conf = spark.conf
val (keys, values) = pairs.unzip
val currentValues = keys.map(conf.getOption)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This returns a default value if the conf is not set and the conf has a default defined for it. This means that in the finally block you are actually setting keys that were unset before. That gets a bit dicey when you use spark.conf.get(key, default) later on.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this is actually from this dependent PR - #51050 cc: @SCHJonathan

* Holds the latest pipeline execution for each graph ID. This is used to manage the lifecycle of
* pipeline executions.
*/
object PipelineExecutionHolder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this tie in with Connect's life cycle management? If a session get's killed, any pipeline execution associated with that session should also be killed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is that handled outside of pipelines, for example with streaming queries?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chatted about this with @hvanhovell, and it sounds like there's a stop method in SessionHolder. He also suggested it could make sense to track the executions inside SessionHolder instead of a global object.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added this in the latest commit


def stopPipelineExecution(graphId: String): Unit = {
executions.compute(graphId, (_, context) => {
context.pipelineExecution.stopPipeline()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How expensive is it to stop a pipeline? This will still block a part of the ConcurrentHashMap.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that it's not extremely cheap but reasonably cheap. Stop will unregister listeners that were created to monitor pipeline execution and then it will interrupt the graph execution thread.

Can we punt on addressing any perf issues around concurrent requests in a followup PR?

jon-mio added 3 commits June 7, 2025 19:39
copied tests and protos

fix imports

save before copying event protos and event helpers

connect module building but python is not working

regenerated protos and going to rebase on sandy's python changes

regen protos

save

mostly green

nits

herman

fix
Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making the requested changes @jon-mio. One stylistic nitpick; otherwise LGTM!

@@ -1007,12 +992,10 @@ class TriggeredGraphExecutionSuite extends ExecutionTest {
"Failed to resolve flow due to upstream failure: 'spark_catalog.test_db.table3'"
),
errorChecker = { ex =>
ex.exceptions.exists { ex =>
ex.message.contains(
ex.getMessage.contains(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nitpick: should the message be indented a block back?

.getOrElse {
logInfo(
s"No default catalog was supplied. Falling back to the session catalog `spark_catalog`).")
"spark_catalog"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually want to fallback to this catalog? Or to the one that is currently the default? The same question applies to the defaultDatabase...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to fallback to the current one

baseOrigin = QueryOrigin(
objectType = Option(QueryOriginType.Table.toString),
objectName = Option(tableIdentifier.unquotedString),
language = Option(Python())),
Copy link
Contributor

@hvanhovell hvanhovell Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python? You technically don't know that. To what end do we need to record this information?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently all the SQL code goes through defineSqlGraphElement so anything going through this path is Python. However, it's not being used right now so I'm happy to remove it

val graphElementRegistry = DataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)
// We will use this variable to store the run failure event if it occurs. This will be set
// by the event callback.
var runFailureEvent = Option.empty[PipelineEvent]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By which threads is the var accessed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT it should be marked volatile...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup marked as volatile since this can be called by other threads that add events to the buffer causing the callback to be invoked

.newBuilder()
.setTimestamp(ProtoTimestamp
.newBuilder()
.setSeconds(event.timestamp.getTime / 1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we have to document this, initially it seemed that this is was arbitrarily dropping/adding milliseconds. However java.sql.Timestamp normalizes its time and nanos fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is neat!

@@ -426,6 +434,58 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
listenerCache.keySet().asScala.toSeq
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a follow-up let's just put this in a different class. It is fine for now.

@hvanhovell
Copy link
Contributor

Merging to master. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants