-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-52348][CONNECT] Add support for Spark Connect handlers for pipeline commands #51057
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
// The graph to attach this dataset to. | ||
optional string dataflow_graph_id = 1; | ||
// Parses the SQL file and registers all datasets and flows. | ||
message DefineSqlGraphElements { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should live under the PipelineCommand message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a PR for this: #51044
logInfo(s"Start pipeline cmd received: $cmd") | ||
startRun(cmd.getStartRun, responseObserver, sessionHolder) | ||
defaultResponse | ||
// case proto.PipelineCommand.CommandTypeCase.DEFINE_SQL_GRAPH_ELEMENTS => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Todo: add this back once dependent PR is merged
// filePath = Option.when(dataset.getSourceCodeLocation.hasFileName)( | ||
// dataset.getSourceCodeLocation.getFileName | ||
// ), | ||
// line = Option.when(dataset.getSourceCodeLocation.hasLineNumber)( | ||
// dataset.getSourceCodeLocation.getLineNumber | ||
// ), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Todo: add this back once dependent PR is merged
@@ -31,12 +33,12 @@ import org.apache.spark.sql.pipelines.graph.QueryOrigin | |||
*/ | |||
case class PipelineEvent( | |||
id: String, | |||
timestamp: String, | |||
timestamp: Timestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing to a timestamp type in favor of representing the timestamp as a string. This allows for easier formatting and better support for timezones
origin: PipelineEventOrigin, | ||
level: EventLevel, | ||
message: String, | ||
details: EventDetails, | ||
error: Option[ErrorDetail] | ||
error: Option[Throwable] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing this to a Throwable in favor of a custom class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also removing a bunch of tests around the custom class and error serialization since that is no longer needed
|
||
package org.apache.spark.sql.pipelines | ||
|
||
object QueryOriginType extends Enumeration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is included in an upstream PR and can be removed once that is merged
} | ||
|
||
// TODO: renable when dependency on SQL registration is merged | ||
ignore( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will reenable this once the SQL registration PR is merged
def withSqlConf[T](spark: SparkSession, pairs: (String, String)*)(f: => T): T = { | ||
val conf = spark.conf | ||
val (keys, values) = pairs.unzip | ||
val currentValues = keys.map(conf.getOption) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This returns a default value if the conf is not set and the conf has a default defined for it. This means that in the finally block you are actually setting keys that were unset before. That gets a bit dicey when you use spark.conf.get(key, default)
later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah this is actually from this dependent PR - #51050 cc: @SCHJonathan
* Holds the latest pipeline execution for each graph ID. This is used to manage the lifecycle of | ||
* pipeline executions. | ||
*/ | ||
object PipelineExecutionHolder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this tie in with Connect's life cycle management? If a session get's killed, any pipeline execution associated with that session should also be killed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is that handled outside of pipelines, for example with streaming queries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chatted about this with @hvanhovell, and it sounds like there's a stop
method in SessionHolder
. He also suggested it could make sense to track the executions inside SessionHolder
instead of a global object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added this in the latest commit
|
||
def stopPipelineExecution(graphId: String): Unit = { | ||
executions.compute(graphId, (_, context) => { | ||
context.pipelineExecution.stopPipeline() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How expensive is it to stop a pipeline? This will still block a part of the ConcurrentHashMap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say that it's not extremely cheap but reasonably cheap. Stop
will unregister listeners that were created to monitor pipeline execution and then it will interrupt the graph execution thread.
Can we punt on addressing any perf issues around concurrent requests in a followup PR?
copied tests and protos fix imports save before copying event protos and event helpers connect module building but python is not working regenerated protos and going to rebase on sandy's python changes regen protos save mostly green nits herman fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making the requested changes @jon-mio. One stylistic nitpick; otherwise LGTM!
@@ -1007,12 +992,10 @@ class TriggeredGraphExecutionSuite extends ExecutionTest { | |||
"Failed to resolve flow due to upstream failure: 'spark_catalog.test_db.table3'" | |||
), | |||
errorChecker = { ex => | |||
ex.exceptions.exists { ex => | |||
ex.message.contains( | |||
ex.getMessage.contains( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style nitpick: should the message be indented a block back?
...ect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/DataflowGraphRegistry.scala
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
Outdated
Show resolved
Hide resolved
.getOrElse { | ||
logInfo( | ||
s"No default catalog was supplied. Falling back to the session catalog `spark_catalog`).") | ||
"spark_catalog" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually want to fallback to this catalog? Or to the one that is currently the default? The same question applies to the defaultDatabase
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to fallback to the current one
...ect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/DataflowGraphRegistry.scala
Outdated
Show resolved
Hide resolved
baseOrigin = QueryOrigin( | ||
objectType = Option(QueryOriginType.Table.toString), | ||
objectName = Option(tableIdentifier.unquotedString), | ||
language = Option(Python())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Python? You technically don't know that. To what end do we need to record this information?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently all the SQL code goes through defineSqlGraphElement
so anything going through this path is Python. However, it's not being used right now so I'm happy to remove it
val graphElementRegistry = DataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId) | ||
// We will use this variable to store the run failure event if it occurs. This will be set | ||
// by the event callback. | ||
var runFailureEvent = Option.empty[PipelineEvent] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By which threads is the var accessed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT it should be marked volatile...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup marked as volatile since this can be called by other threads that add events to the buffer causing the callback to be invoked
.newBuilder() | ||
.setTimestamp(ProtoTimestamp | ||
.newBuilder() | ||
.setSeconds(event.timestamp.getTime / 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if we have to document this, initially it seemed that this is was arbitrarily dropping/adding milliseconds. However java.sql.Timestamp normalizes its time and nanos fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is neat!
@@ -426,6 +434,58 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio | |||
listenerCache.keySet().asScala.toSeq | |||
} | |||
|
|||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a follow-up let's just put this in a different class. It is fine for now.
Merging to master. Thanks! |
What changes were proposed in this pull request?
PipelinesHandler
which handles SparkConnect PipelineCommands. This follows the pattern ofMLHandler
where theSparkConnectPlanner
delegates any ML commands to theMLHandler
PipelineEvent
s that are emitted during pipeline execution back to the SparkConnect clientStartRun
handler so that they are automatically propagated back to the SC clientThis is PR builds off changes in a few open PRs. I have squashed those changes into a single commit at the top of this PR - 49626fb. When reviewing please ignore that commit and just review all commits after that one.
Misc changes:
PipelineEvent
proto fromString
togoogle.protobuf.Timestamp
SerializedException
andErrorDetail
in favor of representing errors just asThrowable
Why are the changes needed?
This change is needed to support Spark Declarative Pipelines.
Does this PR introduce any user-facing change?
Yes
How was this patch tested?
New unit tests
Was this patch authored or co-authored using generative AI tooling?
No