New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47545][CONNECT] Dataset observe
support for the Scala client
#45701
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure this is matched with Python verison.
Thanks Hyukjin! Yes the behavior largely matches the Python version (internals and user-facing APIs). One difference though is the |
observe
support for the Scala clientobserve
support for the Scala client
observe
support for the Scala clientobserve
support for the Scala client
So |
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
Show resolved
Hide resolved
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
Show resolved
Hide resolved
Yes, similar to |
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
Outdated
Show resolved
Hide resolved
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
Outdated
Show resolved
Hide resolved
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Observation.scala
Show resolved
Hide resolved
.../src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
Outdated
Show resolved
Hide resolved
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Observation.scala
Outdated
Show resolved
Hide resolved
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Observation.scala
Outdated
Show resolved
Hide resolved
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Observation.scala
Outdated
Show resolved
Hide resolved
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala
Show resolved
Hide resolved
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
Outdated
Show resolved
Hide resolved
metrics: Option[Map[String, Any]]): Unit = { | ||
observationRegistry.get(planId).map { observation => | ||
if (observation.setMetricsAndNotify(metrics)) { | ||
observationRegistry.remove(planId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be tied to whether or not the observation has been successfully updated? Other question under what circumstance can the metrics be empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same question when I looked at the code. In Spark Core we only de-register the Observation when some non-empty metrics are set, so I decide to keep it the same in Connect. I am not sure under which circumstance the metrics can be empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at the code. It seems it's valid to check for non-empty metrics in Spark Core:
private[spark] def onFinish(qe: QueryExecution): Unit = {
...
val row: Option[Row] = qe.observedMetrics.get(name)
val metrics: Option[Map[String, Any]] = row.map(r => r.getValuesMap[Any](r.schema.fieldNames.toImmutableArraySeq))
if (setMetricsAndNotify(metrics)) {
unregister()
}
}
The option is for a case when the query finishes without metric. Is this possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But nevertheless we don't need to handle this case in Connect because we look up using Plan Id.
val observedDf = df.observe(observation, min("id"), avg("id"), max("id")) | ||
|
||
// Start a new thread to get the observation | ||
val future = Future(observation.get)(ExecutionContext.global) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the record. IMO the observation class should have been using a future from the get go.
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala
Outdated
Show resolved
Hide resolved
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
Outdated
Show resolved
Hide resolved
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
Outdated
Show resolved
Hide resolved
(0 until metric.getKeysCount).map { i => | ||
val key = metric.getKeys(i) | ||
val value = LiteralValueProtoConverter.toCatalystValue(metric.getValues(i)) | ||
schema = schema.add(key, LiteralValueProtoConverter.toDataType(value.getClass)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a bit of a twist here. So, LiteralValueProtoConverter, returns a Tuple for a nested struct. This is not really expected in a Row. We can address this in a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@xupefei there is a genuine test failure. Can you check what is going on? |
Merging! |
### What changes were proposed in this pull request? This PR adds support for `Dataset.observe` to the Spark Connect Scala client. Note that the support here does not include listener support as it runs on the serve side. This PR includes a small refactoring to the `Observation` helper class. We extracted methods that are not bound to the SparkSession to `spark-api`, and added two subclasses on both `spark-core` and `spark-jvm-client`. ### Why are the changes needed? Before this PR, the `DF.observe` method is only supported in the Python client. ### Does this PR introduce _any_ user-facing change? Yes. The user can now issue `DF.observe(name, metrics...)` or `DF.observe(observationObject, metrics...)` to get stats of columns of a dataframe. ### How was this patch tested? Added new e2e tests. ### Was this patch authored or co-authored using generative AI tooling? Nope. Closes apache#45701 from xupefei/scala-observe. Authored-by: Paddy Xu <xupaddy@gmail.com> Signed-off-by: Herman van Hovell <herman@databricks.com>
What changes were proposed in this pull request?
This PR adds support for
Dataset.observe
to the Spark Connect Scala client. Note that the support here does not include listener support as it runs on the serve side.This PR includes a small refactoring to the
Observation
helper class. We extracted methods that are not bound to the SparkSession tospark-api
, and added two subclasses on bothspark-core
andspark-jvm-client
.Why are the changes needed?
Before this PR, the
DF.observe
method is only supported in the Python client.Does this PR introduce any user-facing change?
Yes. The user can now issue
DF.observe(name, metrics...)
orDF.observe(observationObject, metrics...)
to get stats of columns of a dataframe.How was this patch tested?
Added new e2e tests.
Was this patch authored or co-authored using generative AI tooling?
Nope.