Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to add multiple metrics reporters to scan #6919

Merged
merged 12 commits into from
Mar 29, 2023

Conversation

karuppayya
Copy link
Contributor

@karuppayya karuppayya commented Feb 23, 2023

Adds ability to add multiple reporter to the Scan

@karuppayya
Copy link
Contributor Author

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be exposing ScanMetrics but rather introduce a custom MetricsReporter that receives a ScanReport once a scan is complete. This custom reporter can then add the results from the scan to the Spark UI.

api/src/main/java/org/apache/iceberg/BatchScanAdapter.java Outdated Show resolved Hide resolved
core/src/main/java/org/apache/iceberg/SnapshotScan.java Outdated Show resolved Hide resolved
@github-actions github-actions bot added the spark label Mar 13, 2023
@@ -171,4 +172,7 @@ default ThisT select(String... columns) {

/** Returns the split open file cost for this scan. */
long splitOpenFileCost();

/** Create a new scan that will report the scan metrics to the {@code reporter} */
ThisT withMetricsReporter(Collection<MetricsReporter> reporter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we use with prefixes throughout the Scan API. Also, we should accept a single reporter rather than a collection given the method name. Under the hood, it would be a list and we should be able to call this method more than once to add different reporters.

scan
    .filter(cold)
    .metricsReporter(reporter1)
    .metricsReporter(reporter2)
    .planFiles()

We may add a new method that accepts an iterable in the future if we have a use case for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to that suggestion

}

MetricsReporter metricsReporter() {
return metricsReporter;
Collection<MetricsReporter> metricsReporter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the method name change too?

}

TableScanContext reportWith(MetricsReporter reporter) {
metricsReporter().add(reporter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, this class is immutable and we create a new instance on every call. We should probably follow what is done for updating properties in withOption, where we copy existing options into a new list before adding.

TableScanContext reportWith(MetricsReporter reporter) {
  ImmutableList.Builder<MetricsReporter> builder = ImmutableList.builder();
  builder.addAll(metricsReporters);
  builder.add(reporter);
  List<MetricsReporter> newMetricsReporters = builder.build();
  return new TableScanContext(
      snapshotId,
      rowFilter,
      ignoreResiduals,
      caseSensitive,
      colStats,
      projectedSchema,
      selectedColumns,
      options,
      fromSnapshotId,
      toSnapshotId,
      planExecutor,
      fromSnapshotInclusive,
      newMetricsReporters);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this seems like the better approach

metricsReporter());
}

TableScanContext reportWith(Collection<MetricsReporter> reporters) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is OK to just have a method that accepts one reporter for now.

@@ -385,10 +388,6 @@ acceptedBreaks:
old: "method void org.apache.iceberg.SnapshotProducer<ThisT>::validate(org.apache.iceberg.TableMetadata)\
\ @ org.apache.iceberg.StreamingDelete"
justification: "Removing deprecations for 1.2.0"
- code: "java.method.returnTypeChangedCovariantly"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changed?

@@ -66,6 +66,9 @@ acceptedBreaks:
old: "method void org.apache.iceberg.io.DataWriter<T>::add(T)"
justification: "Removing deprecated method"
"1.1.0":
org.apache.iceberg:iceberg-api:
- code: "java.method.addedToInterface"
justification: "Add metricsreporter to Scan"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of modifying checks, we should throw UnsupportedOperationException in the default implementation. Just like we do in some methods in TableScan today, which were added later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to that

@@ -144,7 +144,9 @@ public CloseableIterable<T> planFiles() {
.scanMetrics(ScanMetricsResult.fromScanMetrics(scanMetrics()))
.metadata(metadata)
.build();
context().metricsReporter().report(scanReport);
context()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What about an explicit for each in this case? I am not sure such statements are easy to read since they are split on multiple lines.

for (MetricsReporter reporter : context().metricsReporters()) {
  reporter.report(scanReport);
}

this.metricsReport = report;
}

MetricsReport getMetricsReport() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We don't use getXXX prefixes for getters, just metricsReport.

I also don't think this reporter is specific to Spark. We may think of a good name and put it in core. I believe it would be a common use case to intercept scan reports.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe @karuppayya meant to report from here directly to Spark metrics. @karuppayya were you planning to add that functionality as part of this PR?

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karuppayya it's a bit unclear to me whether you're planning to do the metrics reporting to Spark as part of this PR or not. Could you clarify please?
Also it would be good to add some tests that make sure reporting via multiple metrics reporters properly works

}

TableScanContext reportWith(MetricsReporter reporter) {
metricsReporter().add(reporter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this seems like the better approach

@@ -66,6 +66,9 @@ acceptedBreaks:
old: "method void org.apache.iceberg.io.DataWriter<T>::add(T)"
justification: "Removing deprecated method"
"1.1.0":
org.apache.iceberg:iceberg-api:
- code: "java.method.addedToInterface"
justification: "Add metricsreporter to Scan"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to that

@@ -171,4 +172,7 @@ default ThisT select(String... columns) {

/** Returns the split open file cost for this scan. */
long splitOpenFileCost();

/** Create a new scan that will report the scan metrics to the {@code reporter} */
ThisT withMetricsReporter(Collection<MetricsReporter> reporter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to that suggestion

this.metricsReport = report;
}

MetricsReport getMetricsReport() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe @karuppayya meant to report from here directly to Spark metrics. @karuppayya were you planning to add that functionality as part of this PR?

@karuppayya
Copy link
Contributor Author

I believe @karuppayya meant to report from here directly to Spark metrics. @karuppayya were you planning to add that functionality as part of this PR?

Yes, that was the idea.To access the metrics in SparkScanBuilder to use here

@karuppayya
Copy link
Contributor Author

I am not sure of the reason for workflow needing approval to start. @RussellSpitzer @aokolnychyi @nastra @rdblue any idea why this would happen, any recent change?

@nastra
Copy link
Contributor

nastra commented Mar 22, 2023

I believe @karuppayya meant to report from here directly to Spark metrics. @karuppayya were you planning to add that functionality as part of this PR?

Yes, that was the idea.To access the metrics in SparkScanBuilder to use here

Ok I thought that the SparkMetricsReporter would have some custom code to report to Spark directly, but if that's not required, then I don't think we need that class. In that case you could just have a lambda similar to

report -> reportMetrics(tableIdentifier, report, session::headers));

Something like

private void reportMetricsToSpark(MetricsReport report) {
    // report to spark
  }
BatchScan scan =
        table
            .newBatchScan()
            .caseSensitive(caseSensitive)
            .filter(filterExpression())
            .project(expectedSchema)
            .withMetricsReporter(this::reportMetricsToSpark);

@aokolnychyi
Copy link
Contributor

Since we don't know how our metrics reporting will look like until we support Spark 3.4, what about focusing only on the core logic for adding custom reporters?

/** Create a new scan that will report the scan metrics to the {@code reporter} */
default ThisT metricsReporter(MetricsReporter reporter) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement metricReporter");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in the method name inside the comment (should be metricsReporter).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this comment was missed. The exception message should include the correct method name.

@@ -171,4 +172,10 @@ default ThisT select(String... columns) {

/** Returns the split open file cost for this scan. */
long splitOpenFileCost();

/** Create a new scan that will report the scan metrics to the {@code reporter} */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc should indicate that this adds a reporter to the list of existing reporters, not overrides it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -67,45 +67,9 @@ acceptedBreaks:
justification: "Removing deprecated method"
"1.1.0":
org.apache.iceberg:iceberg-core:
- code: "java.class.noLongerImplementsInterface"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these changes needed?

@@ -420,12 +421,14 @@ private Scan buildBatchScan() {
private Scan buildBatchScan(Long snapshotId, Long asOfTimestamp, String branch, String tag) {
Schema expectedSchema = schemaWithMetadataColumns();

SparkMetricsReporter reporter = new SparkMetricsReporter();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is any value in this class at this point. Let's remove Spark classes from this PR and add them when we know how the reporting logic will look like.

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

almost there, just a few small things to address and then I think we can get this merged

@@ -171,4 +172,10 @@ default ThisT select(String... columns) {

/** Returns the split open file cost for this scan. */
long splitOpenFileCost();

/** Create a new scan that will report the scan metrics to the {@code reporter} */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -42,6 +43,33 @@ public TestScanPlanningAndReporting() {
super(2);
}

@Test
public void scanningWithMutipleReporters() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void scanningWithMutipleReporters() throws IOException {
public void scanningWithMultipleReporters() throws IOException {

Comment on lines 59 to 66
.metricsReporter(
(MetricsReporter) -> {
reportedCount.getAndIncrement();
})
.metricsReporter(
(MetricsReporter) -> {
reportedCount.getAndIncrement();
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.metricsReporter(
(MetricsReporter) -> {
reportedCount.getAndIncrement();
})
.metricsReporter(
(MetricsReporter) -> {
reportedCount.getAndIncrement();
});
.metricsReporter((MetricsReporter) -> reportedCount.getAndIncrement())
.metricsReporter((MetricsReporter) -> reportedCount.getAndIncrement());

Comment on lines 67 to 70
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
fileScanTasks.forEach(task -> {});
}
assertThat(reportedCount.get()).isEqualTo(2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
fileScanTasks.forEach(task -> {});
}
assertThat(reportedCount.get()).isEqualTo(2);
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
fileScanTasks.forEach(task -> {});
}
assertThat(reportedCount.get()).isEqualTo(2);
// make sure default metrics reporter is still reporting
ScanReport scanReport = reporter.lastReport();
assertThat(scanReport).isNotNull();
assertThat(scanReport.tableName()).isEqualTo(tableName);
assertThat(scanReport.snapshotId()).isEqualTo(1L);
ScanMetricsResult result = scanReport.scanMetrics();
assertThat(result.totalPlanningDuration().totalDuration()).isGreaterThan(Duration.ZERO);
assertThat(result.resultDataFiles().value()).isEqualTo(1);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also to make sure the default metrics reporter still reports

@@ -171,4 +172,13 @@ default ThisT select(String... columns) {

/** Returns the split open file cost for this scan. */
long splitOpenFileCost();

/**
* Create a new scan that will report the scan metrics to the {@code reporter} {@code reporter} is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Duplicate {@code reporter}?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd actually consider adapting the message as below.

Create a new scan that will report scan metrics to the provided reporter in addition to reporters maintained by the scan.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two minor nits and should be good to go.

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM once the two nits are fixed. Also we should probably adjust the PR title / commit msg when merging to better reflect the scope of this work

@karuppayya karuppayya changed the title Add HasScan metrics interface Ability to add multiple metrics reporters to scan Mar 29, 2023
@aokolnychyi aokolnychyi merged commit f536c84 into apache:master Mar 29, 2023
33 checks passed
@aokolnychyi
Copy link
Contributor

Thank you, @karuppayya! Thanks for reviewing, @nastra!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants