Skip to content

[SPARK-47290][SQL] Extend CustomTaskMetric to allow metric values from multiple sources#45505

Closed
parthchandra wants to merge 2 commits intoapache:masterfrom
parthchandra:SPARK-47290
Closed

[SPARK-47290][SQL] Extend CustomTaskMetric to allow metric values from multiple sources#45505
parthchandra wants to merge 2 commits intoapache:masterfrom
parthchandra:SPARK-47290

Conversation

@parthchandra
Copy link
Contributor

What changes were proposed in this pull request?

Provides a new interface CustomFileTaskMetric that extends the CustomTaskMetric and allows updating of values.

Why are the changes needed?

The current interface to provide custom metrics does not work for adding file based metrics for the parquet reader where a single FilePartitionReader may need to collect metrics from multiple parquet file readers

Does this PR introduce any user-facing change?

No

How was this patch tested?

This is just adding the interface. The implementation and tests will be done in a follow up PR that addresses https://issues.apache.org/jira/browse/SPARK-47291

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Mar 13, 2024
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.spark.annotation.Evolving;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a new line before this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* A custom file task metric. This allows file based data source V2 implementations
* to use a single PartitionReader with multiple file readers. Each file reader can
* provide its own metrics values and they can be added into the parent PartitionReader
* @since 4.0.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Please add a new empty line before this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/*
Merge(add) the values of the corresponding CustomTaskMetric from src array into target array
adding a new element if it doesn't already exist. Returns a new array without modifying the
target array
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reformatted

adding a new element if it doesn't already exist. Returns a new array without modifying the
target array
*/
static List<CustomFileTaskMetric> mergeMetricValues(List<CustomTaskMetric> src,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we move this static method outside of this interface? I guess you couldn't find a proper existing utility class for this, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. This is a utility method and there really isn't any other good place to put this. Anyway, I've moved it to org.apache.spark.sql.execution.metric.CustomMetrics. One difference is that the function was in a Java class earlier and is now in a Scala class. In general, that makes it slightly harder for a DSV2 implementation to use it because calling Scala from Java is still not perfect.

@HyukjinKwon
Copy link
Member

cc @viirya

Comment on lines +74 to +77
/**
* Merge(add) the values of the corresponding CustomTaskMetric from src array into target array
* adding a new element if it doesn't already exist.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* Merge(add) the values of the corresponding CustomTaskMetric from src array into target array
* adding a new element if it doesn't already exist.
*/
/**
* Update the values of the corresponding CustomTaskMetric with same metric name from src array into target array.
* If the source metric doesn't already exist, adding it to target metric array.
*/

* @since 4.0.0
*/
@Evolving
public interface CustomFileTaskMetric extends CustomTaskMetric {
Copy link
Member

@viirya viirya Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I am wondering why we need this interface? I think we can definitely do the same thing with CustomTaskMetric?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure if it is OK to change an existing public interface. If it is acceptable to simply add this to CustomTaskMetric that would be perfect.

Copy link
Member

@viirya viirya Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean it seems feasible to use CustomTaskMetric to achieve same goal without adding new method.

The new API is update method. You can definitely update the current value of CustomTaskMetric without problem. CustomTaskMetric is simply an interface used by the instances collecting metrics to report task level metrics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for @viirya 's advice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DSV1 uses SQLMetric which has a set(v: Long) and an add(v: Long) so a data source can set an initial value for a metric and then keep updating it. DSV2 uses CustomTaskMetric which really should have an equivalent but doesn't. That's what this interface is trying to provide.

In this case, FilePartitionReader needs to update the value of a CustomTaskMetric every time a file reader completes. So FilePartitionReader can have a metric say ReadTime which it updates every time a file reader completes. But, only the Parquet file reader provides this metric, so now FilePartitionReader can query the file reader and gets the metrics that are supported. But it has no way to update them. FilePartitionReader can of course create its own set of metrics corresponding to the supported ones, keep its own set of values and update them as needed, but having an interface to update the value which the metric supports seems so much more elegant.

Here's the usage of this interface - https://github.com/parthchandra/spark/blob/SPARK-47291/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala

And the corresponding metrics -
https://github.com/parthchandra/spark/blob/SPARK-47291/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetMetricsV2.scala

If you still think this is not reasonable, I will try to re-work it without this interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, let me rework it without this interface

@parthchandra
Copy link
Contributor Author

Closing this PR. As @viirya pointed out, it is possible to achieve the update to CustomTaskMetric without a new interface

@viirya
Copy link
Member

viirya commented Mar 19, 2024

Thank you @parthchandra

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants