Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41527][CONNECT][PYTHON] Implement DataFrame.observe #39091

Closed
wants to merge 27 commits into from

Conversation

beliefer
Copy link
Contributor

What changes were proposed in this pull request?

Implement DataFrame.observe with a proto message

Implement DataFrame.observe for scala API
Implement DataFrame.observe for python API

Why are the changes needed?

for Connect API coverage

Does this PR introduce any user-facing change?

'No'. New API

How was this patch tested?

New test cases.

Column(transformExpression(expr))
}

if (rel.getIsObservation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between the code paths?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Observation registers ObservationListener on ExecutionListenerManager.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This explanation is not fully correct. The Observation class uses event listeners to be able to fetch the metrics as soon as they appear without waiting for the DS command to finish. Since we're not having an event listener at this point in time it's not the same thing. Please simplify the PR accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need Observation here. We just need to send the observed metrics as part of the response stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need Observation here. We just need to send the observed metrics as part of the response stream.

But we should maintain the consistency of behavior between the API of spark connect and the API of pyspark. The observe of pyspark supports using Observation as parameter and the doc test checks the consistence.

Maybe we could keep the API of connect supports Observation and it will not be used at server side, but directly using CollectMetrics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the is_observation code path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell is_observation has been removed.

@hvanhovell
Copy link
Contributor

@beliefer thanks for working on this. I have one question how are we going to get the observed metrics to the client? This seems to be missing from the implementation. One of the approaches would be to send it in a similar way as the metrics in the result code path.

@@ -126,6 +126,20 @@ package object dsl {
Expression.UnresolvedFunction.newBuilder().setFunctionName("min").addArguments(e))
.build()

def proto_max(e: Expression): Expression =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is need to add proto_ prefix? Just call it max?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for below

Copy link
Contributor Author

@beliefer beliefer Dec 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just follow up the existing proto_min.

@@ -45,7 +45,7 @@ import org.apache.spark.sql.util.QueryExecutionListener
* @param name name of the metric
* @since 3.3.0
*/
class Observation(name: String) {
class Observation(val name: String) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah without val, name is treated as a method. Nice catch on this.

@beliefer
Copy link
Contributor Author

beliefer commented Dec 17, 2022

@beliefer thanks for working on this. I have one question how are we going to get the observed metrics to the client? This seems to be missing from the implementation. One of the approaches would be to send it in a similar way as the metrics in the result code path.

Good question. The result of datasets could passed by grpc server. But the ObservationListener runs on server, it seems we need another way to get.

@beliefer beliefer changed the title [SPARK-41527][CONNECT][PYTHON] Implement DataFrame.observe [SPARK-41527][CONNECT][PYTHON] Implement DataFrame.observe Dec 17, 2022
@grundprinzip
Copy link
Contributor

@beliefer thanks for working on this. I have one question how are we going to get the observed metrics to the client? This seems to be missing from the implementation. One of the approaches would be to send it in a similar way as the metrics in the result code path.

Good question. The result of datasets could passed by grpc server. But the ObservationListener runs on server, it seems we need another way to get.

I think it would be possible to add another result batch type for observed metrics and simply pass them at the end.

python/pyspark/sql/connect/dataframe.py Outdated Show resolved Hide resolved
python/pyspark/sql/connect/dataframe.py Outdated Show resolved Hide resolved
Notes
-----
When ``observation`` is :class:`Observation`, this method only supports batch queries.
When ``observation`` is a string, this method works for both batch and streaming queries.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streaming queries is out of scope now


if isinstance(observation, Observation):
return DataFrame.withPlan(
plan.CollectMetrics(self._plan, str(observation._name), list(exprs), True),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the implementation in pyspark equivalent to this?

def _on(self, df: DataFrame, *exprs: Column) -> DataFrame:
"""Attaches this observation to the given :class:`DataFrame` to observe aggregations.
Parameters
----------
df : :class:`DataFrame`
the :class:`DataFrame` to be observed
exprs : list of :class:`Column`
column expressions (:class:`Column`).
Returns
-------
:class:`DataFrame`
the observed :class:`DataFrame`.
"""
assert self._jo is None, "an Observation can be used with a DataFrame only once"
self._jvm = df._sc._jvm
assert self._jvm is not None
cls = self._jvm.org.apache.spark.sql.Observation
self._jo = cls(self._name) if self._name is not None else cls()
observed_df = self._jo.on(
df._jdf, exprs[0]._jc, column._to_seq(df._sc, [c._jc for c in exprs[1:]])
)
return DataFrame(observed_df, df.sparkSession)

@beliefer
Copy link
Contributor Author

I think it would be possible to add another result batch type for observed metrics and simply pass them at the end.

I have an idea:

  1. cache the Observation at server.
  2. create a new relation GetObservation for get Observation from cache with timeout. If we can get the metrics successfully, wrap the metrics with a local relation and return it to client.

@grundprinzip
Copy link
Contributor

2. create a new relation GetObservation for get Observation from cache with timeout. If we can get the metrics successfully, wrap the metrics with a local relation and return it to client.

Today we simply stuff the metrics in the pandas data frame into the pdf['attrs'] property. I'm wondering if we can just do the same here.

I don't have enough experience if it's worth it to do another full round trip to the server for that. Can we experiment for now in just immediately returning them? The observed metrics should be relatively small so it should not be a big deal?

@hvanhovell
Copy link
Contributor

@beliefer can we just send them as part of the ExecutePlanResponse at the end of the query? Doing another RPC seems a bit wasteful, and it means we have to track query state in the server side session.

@beliefer
Copy link
Contributor Author

@beliefer can we just send them as part of the ExecutePlanResponse at the end of the query? Doing another RPC seems a bit wasteful, and it means we have to track query state in the server side session.

This job was finished.

@beliefer
Copy link
Contributor Author

I don't have enough experience if it's worth it to do another full round trip to the server for that. Can we experiment for now in just immediately returning them? The observed metrics should be relatively small so it should not be a big deal?

This job was finished.

@beliefer
Copy link
Contributor Author

beliefer commented Dec 21, 2022

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're getting there. The PR while big becomes really nice. I think there are still a couple of things to clarify but we're closing in.

@@ -45,6 +45,7 @@
UnresolvedRegex,
)
from pyspark.sql.connect.functions import col, lit
from pyspark.sql import Observation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is going to work. The tricky part is that the Observation instance heavily depends on the _jvm object. So I'm worried using the type here because it will not be possible to even construct this when using Spark Connect.

For now, we have two ways out here:

  1. For now we just use string and do the Observation class as a follow up
  2. We're adding an Observation class now.

Personally, to get this PR moving, I would vote for 1

Copy link
Contributor Author

@beliefer beliefer Dec 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first, because users are used to using the Observation from pyspark.sql. I think we better use it too.
Second, this PR only use the name of Observation, not any action of it. So, the use is safely here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't use it for type annotations here if it's not legal to construct the type. In addition you're using it to access the name IIRC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -158,6 +158,9 @@ message ExecutePlanResponse {
// batch of results and then represent the overall state of the query execution.
Metrics metrics = 4;

// The metrics observed during the execution of the query plan.
ObservedMetrics observed_metrics = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ObservedMetrics observed_metrics = 5;
optional ObservedMetrics observed_metrics = 5;

@@ -181,6 +184,16 @@ message ExecutePlanResponse {
string metric_type = 3;
}
}

message ObservedMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc?


message ObservedMetricsObject {
string name = 1;
repeated string values = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this equivalent to what we do for regular observations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand, can you please expand your answer a bit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQLMetrics have name, value and metricType and ObservedMetrics have name and value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use values to represent the data? You can just use literals. We may need to add a schema as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A metric may have multiple different value.
I will add the schema.

repeated Expression metrics = 3;

// (Optional) The identity whether Observation are used.
bool is_observation = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this is false?

Suggested change
bool is_observation = 4;
optional bool is_observation = 4;

.observe(observation, metrics.head, metrics.tail: _*)
.logicalPlan
} else {
CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually make sense? Looking the way this class is used it seems that only .observe actually creates an instance of CollectMetrics. I'm not sure we're actually exposing this operator as such.

@hvanhovell what's your perspective?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many other api exposing operator directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please give me an example of where we're exposing the catalyst operator directly in our API (in particular in the Dataset API)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, Unpivot, UnresolvedHint and so on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As outlined above, only this branch is needed, their behavior is identical

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry, but where is the logicalplan cached in all of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell I'm sorry. I also supports df.randomSplits. The reply just now is confused. df.observe without cache behavior. We can only support string now.

@@ -179,6 +183,29 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
.build()
}

def sendObservedMetricsToResponse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def sendObservedMetricsToResponse(
private def sendObservedMetricsToResponse(

doc?

@@ -619,6 +619,50 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
comparePlans(connectPlan1, sparkPlan1)
}

test("Test observe") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add negative tests for throwing analysis exceptions when submitting non aggregation functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

repeated Expression metrics = 3;

// (Optional) The identity whether Observation are used.
optional bool is_observation = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I have checked the code for how DF.observe works. In Scala it has two different overloads, one for Observation and one for string. Both end up calling the same underlying method on the dataframe. Both end up using the CollectMetrics and wrap it around the logical plan.

There is no need to have this special type for using the Observation. The simplification for Observation should be created on the client side.

Copy link
Contributor Author

@beliefer beliefer Dec 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we not using the Observation. The connect API will not consistent with pyspark API.
cc @zhengruifeng @HyukjinKwon @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you should be able to remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@beliefer beliefer force-pushed the SPARK-41527 branch 4 times, most recently from 814a940 to a951c2c Compare December 27, 2022 01:17
@beliefer
Copy link
Contributor Author

beliefer commented Mar 6, 2023

@beliefer can you please remove the is_observation code path? And take another look at the protocol. Otherwise I think it looks good.

is_observation code path has been removed.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hvanhovell
Copy link
Contributor

Merging to master/3.4

@hvanhovell hvanhovell closed this in 0ce63f3 Mar 6, 2023
hvanhovell pushed a commit that referenced this pull request Mar 6, 2023
### What changes were proposed in this pull request?
Implement `DataFrame.observe` with a proto message

Implement `DataFrame.observe` for scala API
Implement `DataFrame.observe` for python API

### Why are the changes needed?
for Connect API coverage

### Does this PR introduce _any_ user-facing change?
'No'. New API

### How was this patch tested?
New test cases.

Closes #39091 from beliefer/SPARK-41527.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
(cherry picked from commit 0ce63f3)
Signed-off-by: Herman van Hovell <herman@databricks.com>
@beliefer
Copy link
Contributor Author

beliefer commented Mar 6, 2023

snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
### What changes were proposed in this pull request?
Implement `DataFrame.observe` with a proto message

Implement `DataFrame.observe` for scala API
Implement `DataFrame.observe` for python API

### Why are the changes needed?
for Connect API coverage

### Does this PR introduce _any_ user-facing change?
'No'. New API

### How was this patch tested?
New test cases.

Closes apache#39091 from beliefer/SPARK-41527.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
(cherry picked from commit 0ce63f3)
Signed-off-by: Herman van Hovell <herman@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants