From 4faa3d4f6a437b61f5973b9c839204ae8b5a0ff9 Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Wed, 27 May 2026 10:24:22 +0900 Subject: [PATCH] Core: Add table-name filter for MetricsReporter Add an optional filtering layer above any MetricsReporter implementation that drops ScanReports and CommitReports whose tableName() does not pass the configured include / exclude regex. Two new catalog properties control the filter: metrics-reporter.table-name.include and metrics-reporter.table-name.exclude. Both are Java regex patterns matched against the table name; when both are set, exclude wins over include. When neither property is set, CatalogUtil.loadMetricsReporter returns the underlying reporter unchanged, so the default code path incurs no runtime overhead. Empty values are treated as not set to avoid accidentally silencing all metrics on misconfiguration. Invalid regex values fail fast at catalog initialization with a clear error pointing at the offending property. The filter applies uniformly across all reporter implementations (LoggingMetricsReporter, RESTMetricsReporter, and custom user-supplied ones). Reports whose subtype does not expose a table name are forwarded without filtering. Closes #16573 --- .../org/apache/iceberg/CatalogProperties.java | 19 ++ .../java/org/apache/iceberg/CatalogUtil.java | 54 +++--- .../metrics/FilteringMetricsReporter.java | 118 +++++++++++ .../org/apache/iceberg/TestCatalogUtil.java | 23 +++ .../metrics/TestFilteringMetricsReporter.java | 183 ++++++++++++++++++ docs/docs/metrics-reporting.md | 21 ++ 6 files changed, 392 insertions(+), 26 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/metrics/FilteringMetricsReporter.java create mode 100644 core/src/test/java/org/apache/iceberg/metrics/TestFilteringMetricsReporter.java diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index 6b85ccbc87bc..79ea4525d6cc 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -33,6 +33,25 @@ private CatalogProperties() {} public static final String VIEW_OVERRIDE_PREFIX = "view-override."; public static final String METRICS_REPORTER_IMPL = "metrics-reporter-impl"; + /** + * Java regex applied to {@code tableName()} of {@link org.apache.iceberg.metrics.ScanReport} and + * {@link org.apache.iceberg.metrics.CommitReport}. When set, only reports whose table name + * matches the pattern are forwarded to the configured {@link + * org.apache.iceberg.metrics.MetricsReporter}. Empty values are treated as not set. + */ + public static final String METRICS_REPORTER_TABLE_NAME_INCLUDE = + "metrics-reporter.table-name.include"; + + /** + * Java regex applied to {@code tableName()} of {@link org.apache.iceberg.metrics.ScanReport} and + * {@link org.apache.iceberg.metrics.CommitReport}. When set, reports whose table name matches the + * pattern are dropped before reaching the configured {@link + * org.apache.iceberg.metrics.MetricsReporter}. When both include and exclude are set, exclude + * wins. Empty values are treated as not set. + */ + public static final String METRICS_REPORTER_TABLE_NAME_EXCLUDE = + "metrics-reporter.table-name.exclude"; + /** * Controls whether the catalog will cache table entries upon load. * diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index 2b400ccebc8b..1032494c0a18 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -40,6 +40,7 @@ import org.apache.iceberg.io.StorageCredential; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.io.SupportsStorageCredentials; +import org.apache.iceberg.metrics.FilteringMetricsReporter; import org.apache.iceberg.metrics.LoggingMetricsReporter; import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -514,37 +515,38 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) { */ public static MetricsReporter loadMetricsReporter(Map properties) { String impl = properties.get(CatalogProperties.METRICS_REPORTER_IMPL); + MetricsReporter reporter; if (impl == null) { - return LoggingMetricsReporter.instance(); - } + reporter = LoggingMetricsReporter.instance(); + } else { + LOG.info("Loading custom MetricsReporter implementation: {}", impl); + DynConstructors.Ctor ctor; + try { + ctor = + DynConstructors.builder(MetricsReporter.class) + .loader(CatalogUtil.class.getClassLoader()) + .impl(impl) + .buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize MetricsReporter, missing no-arg constructor: %s", impl), + e); + } - LOG.info("Loading custom MetricsReporter implementation: {}", impl); - DynConstructors.Ctor ctor; - try { - ctor = - DynConstructors.builder(MetricsReporter.class) - .loader(CatalogUtil.class.getClassLoader()) - .impl(impl) - .buildChecked(); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException( - String.format("Cannot initialize MetricsReporter, missing no-arg constructor: %s", impl), - e); - } + try { + reporter = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize MetricsReporter, %s does not implement MetricsReporter.", impl), + e); + } - MetricsReporter reporter; - try { - reporter = ctor.newInstance(); - } catch (ClassCastException e) { - throw new IllegalArgumentException( - String.format( - "Cannot initialize MetricsReporter, %s does not implement MetricsReporter.", impl), - e); + reporter.initialize(properties); } - reporter.initialize(properties); - - return reporter; + return FilteringMetricsReporter.wrap(reporter, properties); } public static String fullTableName(String catalogName, TableIdentifier identifier) { diff --git a/core/src/main/java/org/apache/iceberg/metrics/FilteringMetricsReporter.java b/core/src/main/java/org/apache/iceberg/metrics/FilteringMetricsReporter.java new file mode 100644 index 000000000000..c5c14feca5f7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/metrics/FilteringMetricsReporter.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.metrics; + +import java.util.Map; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; +import org.apache.iceberg.CatalogProperties; + +/** + * A {@link MetricsReporter} wrapper that drops {@link ScanReport} and {@link CommitReport} + * instances whose {@code tableName()} does not pass the configured include / exclude patterns + * before forwarding to a delegate reporter. + * + *

The patterns are Java regular expressions sourced from the catalog properties {@link + * CatalogProperties#METRICS_REPORTER_TABLE_NAME_INCLUDE} and {@link + * CatalogProperties#METRICS_REPORTER_TABLE_NAME_EXCLUDE}. When both are set, {@code exclude} wins + * over {@code include} (an explicit deny overrides an include). When neither is set, {@link + * #wrap(MetricsReporter, Map)} returns the delegate unchanged. + * + *

{@link MetricsReport} subtypes other than {@link ScanReport} and {@link CommitReport} are + * forwarded to the delegate without filtering, since they do not expose a {@code tableName()}. + */ +public class FilteringMetricsReporter implements MetricsReporter { + + private final MetricsReporter delegate; + private final Pattern include; + private final Pattern exclude; + + private FilteringMetricsReporter(MetricsReporter delegate, Pattern include, Pattern exclude) { + this.delegate = delegate; + this.include = include; + this.exclude = exclude; + } + + /** + * Wraps the given delegate in a {@code FilteringMetricsReporter} when either include or exclude + * is configured in {@code properties}; otherwise returns the delegate unchanged so the default + * case incurs no runtime overhead. + * + * @param delegate the underlying reporter that receives forwarded reports + * @param properties catalog properties; consulted for the table-name include / exclude regex + * @return either the delegate unchanged, or a new filtering wrapper around it + */ + public static MetricsReporter wrap(MetricsReporter delegate, Map properties) { + Pattern include = + compileIfPresent( + properties.get(CatalogProperties.METRICS_REPORTER_TABLE_NAME_INCLUDE), + CatalogProperties.METRICS_REPORTER_TABLE_NAME_INCLUDE); + Pattern exclude = + compileIfPresent( + properties.get(CatalogProperties.METRICS_REPORTER_TABLE_NAME_EXCLUDE), + CatalogProperties.METRICS_REPORTER_TABLE_NAME_EXCLUDE); + if (include == null && exclude == null) { + return delegate; + } + return new FilteringMetricsReporter(delegate, include, exclude); + } + + private static Pattern compileIfPresent(String value, String propertyName) { + if (value == null || value.isEmpty()) { + return null; + } + try { + return Pattern.compile(value); + } catch (PatternSyntaxException e) { + throw new IllegalArgumentException( + String.format("Invalid regex for %s: %s", propertyName, value), e); + } + } + + @Override + public void report(MetricsReport report) { + String tableName = extractTableName(report); + if (tableName == null) { + delegate.report(report); + return; + } + if (exclude != null && exclude.matcher(tableName).matches()) { + return; + } + if (include != null && !include.matcher(tableName).matches()) { + return; + } + delegate.report(report); + } + + private static String extractTableName(MetricsReport report) { + if (report instanceof ScanReport) { + return ((ScanReport) report).tableName(); + } + if (report instanceof CommitReport) { + return ((CommitReport) report).tableName(); + } + return null; + } + + @Override + public void close() { + delegate.close(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java index 84e79e35c9b5..6a5a8dc52d99 100644 --- a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java +++ b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java @@ -43,6 +43,7 @@ import org.apache.iceberg.io.StorageCredential; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.io.SupportsStorageCredentials; +import org.apache.iceberg.metrics.FilteringMetricsReporter; import org.apache.iceberg.metrics.MetricsReport; import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -266,6 +267,28 @@ public void loadCustomMetricsReporter_badClass() { .hasMessageContaining("does not implement MetricsReporter"); } + @Test + public void loadMetricsReporter_wrappedWhenTableNameFilterPresent() { + MetricsReporter metricsReporter = + CatalogUtil.loadMetricsReporter( + ImmutableMap.of( + CatalogProperties.METRICS_REPORTER_IMPL, + TestMetricsReporterDefault.class.getName(), + CatalogProperties.METRICS_REPORTER_TABLE_NAME_INCLUDE, + "prod\\..*")); + assertThat(metricsReporter).isInstanceOf(FilteringMetricsReporter.class); + } + + @Test + public void loadMetricsReporter_notWrappedWhenFilterAbsent() { + MetricsReporter metricsReporter = + CatalogUtil.loadMetricsReporter( + ImmutableMap.of( + CatalogProperties.METRICS_REPORTER_IMPL, + TestMetricsReporterDefault.class.getName())); + assertThat(metricsReporter).isInstanceOf(TestMetricsReporterDefault.class); + } + @Test public void fullTableNameWithDifferentValues() { String uriTypeCatalogName = "thrift://host:port/db.table"; diff --git a/core/src/test/java/org/apache/iceberg/metrics/TestFilteringMetricsReporter.java b/core/src/test/java/org/apache/iceberg/metrics/TestFilteringMetricsReporter.java new file mode 100644 index 000000000000..8259315d3207 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/metrics/TestFilteringMetricsReporter.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.metrics; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.List; +import java.util.regex.PatternSyntaxException; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.Test; + +public class TestFilteringMetricsReporter { + + private static final ScanReport SCAN_PROD = newScanReport("prod_db.orders"); + private static final ScanReport SCAN_TMP = newScanReport("prod_db.tmp_staging"); + private static final ScanReport SCAN_DEV = newScanReport("dev_db.orders"); + private static final CommitReport COMMIT_PROD = newCommitReport("prod_db.orders"); + + @Test + public void wrapReturnsDelegateWhenNoPropertiesSet() { + CapturingMetricsReporter delegate = new CapturingMetricsReporter(); + MetricsReporter wrapped = FilteringMetricsReporter.wrap(delegate, ImmutableMap.of()); + assertThat(wrapped).isSameAs(delegate); + } + + @Test + public void wrapReturnsDelegateWhenPropertiesAreEmpty() { + CapturingMetricsReporter delegate = new CapturingMetricsReporter(); + MetricsReporter wrapped = + FilteringMetricsReporter.wrap( + delegate, + ImmutableMap.of( + CatalogProperties.METRICS_REPORTER_TABLE_NAME_INCLUDE, "", + CatalogProperties.METRICS_REPORTER_TABLE_NAME_EXCLUDE, "")); + assertThat(wrapped).isSameAs(delegate); + } + + @Test + public void includeOnlyForwardsMatchingTableNames() { + CapturingMetricsReporter delegate = new CapturingMetricsReporter(); + MetricsReporter wrapped = + FilteringMetricsReporter.wrap( + delegate, + ImmutableMap.of(CatalogProperties.METRICS_REPORTER_TABLE_NAME_INCLUDE, "prod_db\\..*")); + + wrapped.report(SCAN_PROD); + wrapped.report(SCAN_DEV); + wrapped.report(COMMIT_PROD); + + assertThat(delegate.reports).containsExactly(SCAN_PROD, COMMIT_PROD); + } + + @Test + public void excludeOnlyDropsMatchingTableNames() { + CapturingMetricsReporter delegate = new CapturingMetricsReporter(); + MetricsReporter wrapped = + FilteringMetricsReporter.wrap( + delegate, + ImmutableMap.of(CatalogProperties.METRICS_REPORTER_TABLE_NAME_EXCLUDE, ".*\\.tmp_.*")); + + wrapped.report(SCAN_PROD); + wrapped.report(SCAN_TMP); + + assertThat(delegate.reports).containsExactly(SCAN_PROD); + } + + @Test + public void excludeWinsOverInclude() { + CapturingMetricsReporter delegate = new CapturingMetricsReporter(); + MetricsReporter wrapped = + FilteringMetricsReporter.wrap( + delegate, + ImmutableMap.of( + CatalogProperties.METRICS_REPORTER_TABLE_NAME_INCLUDE, "prod_db\\..*", + CatalogProperties.METRICS_REPORTER_TABLE_NAME_EXCLUDE, ".*\\.tmp_.*")); + + wrapped.report(SCAN_PROD); + wrapped.report(SCAN_TMP); + wrapped.report(SCAN_DEV); + + assertThat(delegate.reports).containsExactly(SCAN_PROD); + } + + @Test + public void unknownReportSubtypeIsForwardedWithoutFiltering() { + CapturingMetricsReporter delegate = new CapturingMetricsReporter(); + MetricsReporter wrapped = + FilteringMetricsReporter.wrap( + delegate, + ImmutableMap.of(CatalogProperties.METRICS_REPORTER_TABLE_NAME_INCLUDE, "no_such\\..*")); + + MetricsReport unknown = new MetricsReport() {}; + wrapped.report(unknown); + + assertThat(delegate.reports).containsExactly(unknown); + } + + @Test + public void wrapThrowsClearErrorForInvalidRegex() { + assertThatThrownBy( + () -> + FilteringMetricsReporter.wrap( + new CapturingMetricsReporter(), + ImmutableMap.of( + CatalogProperties.METRICS_REPORTER_TABLE_NAME_INCLUDE, "[invalid"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining(CatalogProperties.METRICS_REPORTER_TABLE_NAME_INCLUDE) + .hasMessageContaining("[invalid") + .hasCauseInstanceOf(PatternSyntaxException.class); + } + + @Test + public void closeIsDelegated() { + CapturingMetricsReporter delegate = new CapturingMetricsReporter(); + MetricsReporter wrapped = + FilteringMetricsReporter.wrap( + delegate, ImmutableMap.of(CatalogProperties.METRICS_REPORTER_TABLE_NAME_INCLUDE, ".*")); + + wrapped.close(); + + assertThat(delegate.closed).isTrue(); + } + + private static ScanReport newScanReport(String tableName) { + return ImmutableScanReport.builder() + .tableName(tableName) + .snapshotId(1L) + .filter(Expressions.alwaysTrue()) + .schemaId(1) + .projectedFieldIds(ImmutableList.of()) + .projectedFieldNames(ImmutableList.of()) + .scanMetrics(ImmutableScanMetricsResult.builder().build()) + .metadata(ImmutableMap.of()) + .build(); + } + + private static CommitReport newCommitReport(String tableName) { + return ImmutableCommitReport.builder() + .tableName(tableName) + .snapshotId(1L) + .sequenceNumber(1L) + .operation("append") + .commitMetrics(ImmutableCommitMetricsResult.builder().build()) + .metadata(ImmutableMap.of()) + .build(); + } + + private static class CapturingMetricsReporter implements MetricsReporter { + private final List reports = Lists.newArrayList(); + private boolean closed = false; + + @Override + public void report(MetricsReport report) { + reports.add(report); + } + + @Override + public void close() { + this.closed = true; + } + } +} diff --git a/docs/docs/metrics-reporting.md b/docs/docs/metrics-reporting.md index 4ca452b0d503..7e1ac1dfc402 100644 --- a/docs/docs/metrics-reporting.md +++ b/docs/docs/metrics-reporting.md @@ -147,6 +147,27 @@ public class InMemoryMetricsReporter implements MetricsReporter { The [catalog property](catalog-properties.md) `metrics-reporter-impl` allows registering a given [`MetricsReporter`](https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java) by specifying its fully-qualified class name, e.g. `metrics-reporter-impl=org.apache.iceberg.metrics.InMemoryMetricsReporter`. +### Table-name filtering + +Reports forwarded to the configured `MetricsReporter` can be filtered by table name using two additional catalog properties. Both accept Java regular expressions matched against `ScanReport.tableName()` and `CommitReport.tableName()`: + +| Property | Effect | +|---|---| +| `metrics-reporter.table-name.include` | Forward only reports whose table name matches; drop the rest. | +| `metrics-reporter.table-name.exclude` | Drop reports whose table name matches; forward the rest. | + +When both are set, `exclude` wins over `include` (an explicit deny overrides an include). When neither is set, behavior is identical to today (every report is forwarded, with no runtime overhead). Empty values are treated as not set to avoid accidentally silencing all metrics on misconfiguration. + +For example, to forward metrics only for tables in the `prod_db` namespace while still dropping any temporary tables under it: + +``` +metrics-reporter-impl=org.apache.iceberg.metrics.LoggingMetricsReporter +metrics-reporter.table-name.include=prod_db\..* +metrics-reporter.table-name.exclude=.*\.tmp_.* +``` + +The filter applies uniformly to all `MetricsReporter` implementations (`LoggingMetricsReporter`, `RESTMetricsReporter`, and custom user-supplied ones). Reports whose subtype does not expose a table name (i.e. anything other than `ScanReport` and `CommitReport`) are forwarded without filtering. + ### Via the Java API during Scan planning Independently of the [`MetricsReporter`](https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java) being registered at the catalog level via the `metrics-reporter-impl` property, it is also possible to supply additional reporters during scan planning as shown below: