Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34366][SQL] Add interface for DS v2 metrics #31476

Closed
wants to merge 8 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Feb 4, 2021

What changes were proposed in this pull request?

This patch proposes to add a few public API change to DS v2, to make DS v2 scan can report metrics to Spark.

Two public interfaces are added.

  • CustomMetric: metric interface at the driver side. It basically defines how Spark aggregates task metrics with the same metric name.
  • CustomTaskMetric: task metric reported at executors. It includes a name and long value. Spark will collect these metric values and update internal metrics.

There are two public methods added to existing public interfaces. They are optional to DS v2 implementations.

  • PartitionReader.currentMetricsValues(): returns an array of CustomTaskMetric. Here is where the actual metrics values are collected. Empty array by default.
  • Scan.supportedCustomMetrics(): returns an array of supported custom metrics CustomMetric. Empty array by default.

Why are the changes needed?

In order to report custom metrics, we need some public API change in DS v2 to make it possible.

Does this PR introduce any user-facing change?

No

How was this patch tested?

This only adds interfaces. In follow-up PRs where adding implementation there will be tests added. See #31451 and #31398 for some details and manual test there.

@viirya
Copy link
Member Author

viirya commented Feb 4, 2021

cc @rdblue @Ngone51 @cloud-fan @sunchao @dongjoon-hyun this is separated from #31451 and only includes interface changes.

* @since 3.2.0
*/
@Evolving
public interface CustomMetric {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment #31451 (comment) suggested to name it LongMetric. But later I think if we may need keep flexibility in the base interface.

So I left it as general as possible and add a LongMetric which reports a long value.

@SparkQA
Copy link

SparkQA commented Feb 4, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39474/

@SparkQA
Copy link

SparkQA commented Feb 4, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39474/

import org.apache.spark.annotation.Evolving;

/**
* A general custom metric.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we remove general?

@SparkQA
Copy link

SparkQA commented Feb 4, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39478/

@SparkQA
Copy link

SparkQA commented Feb 4, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39478/

@SparkQA
Copy link

SparkQA commented Feb 5, 2021

Test build #134886 has finished for PR 31476 at commit a0f738d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 5, 2021

Test build #134891 has finished for PR 31476 at commit 623f193.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

import org.apache.spark.annotation.Evolving;

/**
* A custom metric.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a public API, it would be great to add more information to explain how to use APIs as a developer. For example,

  • If I define my own CustomMetric, how does Spark use it?
  • If I define a metric type that doesn't support sum, for example, measure the executor jvm heap size, how does Spark handle it?
  • If my PartitionReaderWithMetrics returns metrics for a partition, will Spark combine them for partitions of a Spark job?
  • In the streaming case, how does Spark combine metrics from different micro batches?
  • If I would like to report metrics in driver, how do I do it?

I feel you might need to build an interface similar to the private SQLMetric.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add more information to the comment.

Basically I will add a few metric classes based on CustomMetric. They are correspond to sum, size, timing metrics in SQLMetric.

@SparkQA
Copy link

SparkQA commented Feb 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39482/

@SparkQA
Copy link

SparkQA commented Feb 5, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39482/

@SparkQA
Copy link

SparkQA commented Feb 5, 2021

Test build #134896 has finished for PR 31476 at commit 38fb966.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan . At the first commit, the following functions are added to Scan and ``PartitionReader` directly.

Scan.supportedCustomMetrics()
PartitionReader<T>.getCustomMetrics()

And, I thought that we can do like SupportsReportStatistics. That's the reasoning of suggestion.

public interface SupportsReportStatistics extends Scan {  Statistics estimateStatistics(); }

Now, @viirya added the following.

public interface SupportsReportMetrics { default CustomMetric[] supportedCustomMetrics() }

In terms of simplicity, we may include it to Scan directly of course of course as we did at the first commit. And, thank you for agreeing both approaches, @rdblue . I also still think separation is better, but I'll not be against this PR further in the direction-wise.

I'll leave this to you, the author @viirya and others people. Please choose whatever you want. I trust you.

* SUM: Spark sums up metrics from partitions as the final result.
*/
enum MetricType {
SUM
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only use sum metric for now for the Kafka scan purpose. So I leave other possible metric type (size, timing, average) out for now to make it simpler at the beginning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we do not allow users to define the combine method instead? The current API means if a user needs to use a different combine behavior, they need to submit a PR to Spark to add it and wait for a new Spark release.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, regarding the naming, I would prefer MergeType to make it clear if we did go to this direction.

Copy link
Member Author

@viirya viirya Feb 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because these metrics are logical representation and are collected to be SQL metrics internally. They are used by DS v2 to report SQL metrics easily without dealing with internal SQLMetric. So SQL metrics define how metrics are combined/aggregated. A similar case is public expression APIs for predicate pushdown, as they are converted from catalyst expressions so are matched to catalyst expressions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can design a better API. This would be a public API and hard to change in future. We don't have to build it in a way just because of what SQLMetric can do today. When I added SQLMetric to Spark SQL, we didn't pay much attention to this since it's an internal API. But making a new public API is a different story and we need to think about how to use it from the user perspective.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, these metrics are not general metrics that could be defined and used by end users. It sounds like we already have Accumulator for the purpose. Defining a public API of general metrics for end users is overlapping with Accumulator, IMHO.

The metrics here are used by DS v2 implementations to report metrics to Spark SQL. That being said, it is not exposed for end users as general metrics. I think the purpose for the metric API is clear.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defining a public API of general metrics for end users is overlapping with Accumulator, IMHO.

Totally agree. There would be 3 types of metrics APIs (including the internal SQLMetric) if we added this. That's really confusing to users. Is it possible to make SQLMetric support general Accmulator instead so that we don't need to re-invent a new metric API?

That being said, it is not exposed for end users as general metrics.

Developers building spark data sources are also critical to the whole Spark ecosystem and these APIs are designed for them. IMHO, we should try our best to build a better API if possible.

Copy link
Member Author

@viirya viirya Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you can see, the metric API here is a logical representation of metrics from DS v2. We are not going to re-invent a whole metric API. SQLMetrics are internal to Spark. It is not exposed to end users and data source developers, so I don't think it worries me too much.

I'm not saying that we should not build a good API for DS v2 developers. Seems to me some points in above comments are from end user perspective, I'd like to point out this is for different scenarios.

As this is used for DS v2 purpose, it is for SQL metrics and internally it is converted to SQL metrics. To make SQLMetric support Accmulator and let DS v2 reports Accmulator does not sound bad idea to me. But I'd doubt if it is worth.

One argued point is to define arbitrary combine behavior. Once making SQLMetric support Accmulator, does it mean that we can use arbitrary Accmulator? No, basically SQLMetric allows certain types of metrics, we still need to change SQLMetric to support new metrics.

So the only benefit I thought is to not have another metric API. And I don't think it is serious for this case at the beginning.

This API is pretty simple as it is just logical representation and we only need small change internally to convert collected metrics from DS v2 to SQL metrics.

I just read through the code path to be touched in order to make SQLMetric support Accmulator. Seems it involves more changes not only in DS v2 but maybe also other stuff in sql/core, etc.

Although I doubt if it is worth, I'm open to the suggested Accmulator approach. Let's gather more thoughts from others?

cc @cloud-fan @dongjoon-hyun @rdblue @Ngone51 @sunchao WDYT?

@viirya
Copy link
Member Author

viirya commented Feb 22, 2021

@cloud-fan @rdblue @Ngone51 @sunchao @zsxwing If you have more comments, please let me know. Thanks.

@Ngone51
Copy link
Member

Ngone51 commented Feb 22, 2021

LGTM

@cloud-fan
Copy link
Contributor

cloud-fan commented Feb 24, 2021

I think it's too much work to support arbitrary accumulators in SQLMetrics. During these years, it seems sufficient to support various metrics (size, time, average, etc.) with simple long values. So I agree with the design here that the v2 metrics API should leverage SQLMetrics under the hood.

However, I share the same concern from @zsxwing that using enum to indicate the metrics merging logic is too limited. I don't think it's hard to design an API to merge a bunch of long values.

First, let's look at the task side. The read/write task needs to update the value of executor-side SQLMetrics, and Spark will send back the metrics update to the driver via heartbeat event or task complete event. The current PR adds a new method to PartitionReader to report the current metrics values, which looks good. Spark needs to call the new method at the end of task (at the end of each epoch for continuous mode), get the metrics value, find the corresponding SQLMetrics and update its value. We can polish the API from the current PR a little bit:

interface CustomTaskMetrics {
  def name: String
  def value: Long
}

interface PartitionReader {
  ...
  def currentMetricsValues: Array[CustomTaskMetrics] = Nil
}

Then, let's look at the driver side. The driver side needs to aggregate the task metrics and produce a string that will be displayed in the UI. The current PR adds a new API CustomMetrics which is returned by Scan. This looks good and we just need to replace the enum MetricsType with a real aggregate method

interface CustomMetrics {
  def name: String
  def description: String
  def initialValue: Long = 0
  def aggregateTaskMetrics(taskMetrics: Array[Long]): String
}

Internally, we can add a new V2_CUSTOM metrics type in SQLMetrics. For this metrics type, we delegate the aggregating logic to the corresponding CustomMetrics.

What do you think?

@viirya
Copy link
Member Author

viirya commented Feb 25, 2021

I share the direction to have custom merging behavior. So that being said, I agree with the polished idea from the API perspective.

I don't have question about the task side. It is basically the same except for the enum metric type.

My only question is how to delegate the aggregating logic to the aggregate method. As we cannot access CustomMetrics during processing execution live information. I think we need to change SQLMetrics to include the aggregate method if it is V2_CUSTOM metrics type, e.g.

class SQLMetric(val metricType: String, initValue: Long = 0L, customAggregateMethod: Option[Array[Long] -> String]) extends AccumulatorV2[Long, Long] {
  ...
}

When we aggregate the metrics, for V2_CUSTOM metric type, we ask original SQLMetric from AccumulatorContext and call the aggregate method.

Does it make sense?

@cloud-fan
Copy link
Contributor

Yes, SQLMetrics is the one to define the aggregating logic. Previously it's decided by metricsType, now we can probably replace metricsType with a general aggregating function: aggregateMethod: (Array[Long], Array[Long]) => String. For builtin metrics it's SQLMetrics.stringValue(metricsType, _, _). For v2 metrics we ignore the second parameter. Seems we don't need V2_CUSTOM :)

It's the SQL UI component that collects the task metrics and aggregates them, so we should let the SQL UI component knows the custom aggregating logic. We can propagate it through SQLMetricInfo, SQLPlanMetric, etc.

@viirya
Copy link
Member Author

viirya commented Feb 25, 2021

I think we have a couple choices.

  • When we aggregate the metrics, for V2_CUSTOM metric type, we access the SQLMetric from AccumulatorContext and call its aggregate method. For built-in metrics, just call SQLMetrics.stringValue as usual.
  • Replace metricsType with a general aggregating function. We propagate it through SQLMetricInfo, SQLPlanMetric, etc. In SQL UI component, we call the aggregating.

But these are implementation details, I think the choice doesn't affect the public API here.

Do you prefer to make the choice here and put it into same PR, or we polish the public API only in this PR and then create another PR for implementation?

@cloud-fan
Copy link
Contributor

cloud-fan commented Feb 26, 2021

Let's make this PR API only, as we know it's implementable. We can discuss the implementation details in the followup PR.

I'd avoid using AccumulatorContext as it uses weak references and is not robust. That's probably why we propagate the metrics type through UI objects, not AccumulatorContext.

@dongjoon-hyun
Copy link
Member

Hi, @viirya . What is the next step for this PR?

@viirya
Copy link
Member Author

viirya commented Mar 20, 2021

@dongjoon-hyun Sorry for late. I will update this based on above discussion with @cloud-fan and @zsxwing. Thanks.

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40953/

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40954/

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40953/

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40954/

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Test build #136369 has finished for PR 31476 at commit a35c056.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Test build #136370 has finished for PR 31476 at commit f46b733.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*
* The metrics will be gathered during query execution back to the driver and then combined. How
* the task metrics are combined is defined by corresponding {@link CustomMetric} with same metric
* name. The final result will be shown up in the physical operator in Spark UI.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

physical operator -> data source scan operator

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40979/

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40979/

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 115ed89 Mar 23, 2021
@dongjoon-hyun
Copy link
Member

Thank you, @viirya and all!

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Test build #136396 has finished for PR 31476 at commit eb9d94a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
8 participants