Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@ private CatalogProperties() {}
public static final String VIEW_OVERRIDE_PREFIX = "view-override.";
public static final String METRICS_REPORTER_IMPL = "metrics-reporter-impl";

/**
* Java regex applied to {@code tableName()} of {@link org.apache.iceberg.metrics.ScanReport} and
* {@link org.apache.iceberg.metrics.CommitReport}. When set, only reports whose table name
* matches the pattern are forwarded to the configured {@link
* org.apache.iceberg.metrics.MetricsReporter}. Empty values are treated as not set.
*/
public static final String METRICS_REPORTER_TABLE_NAME_INCLUDE =
"metrics-reporter.table-name.include";

/**
* Java regex applied to {@code tableName()} of {@link org.apache.iceberg.metrics.ScanReport} and
* {@link org.apache.iceberg.metrics.CommitReport}. When set, reports whose table name matches the
* pattern are dropped before reaching the configured {@link
* org.apache.iceberg.metrics.MetricsReporter}. When both include and exclude are set, exclude
* wins. Empty values are treated as not set.
*/
public static final String METRICS_REPORTER_TABLE_NAME_EXCLUDE =
"metrics-reporter.table-name.exclude";

/**
* Controls whether the catalog will cache table entries upon load.
*
Expand Down
54 changes: 28 additions & 26 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iceberg.io.StorageCredential;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsStorageCredentials;
import org.apache.iceberg.metrics.FilteringMetricsReporter;
import org.apache.iceberg.metrics.LoggingMetricsReporter;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -514,37 +515,38 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) {
*/
public static MetricsReporter loadMetricsReporter(Map<String, String> properties) {
String impl = properties.get(CatalogProperties.METRICS_REPORTER_IMPL);
MetricsReporter reporter;
if (impl == null) {
return LoggingMetricsReporter.instance();
}
reporter = LoggingMetricsReporter.instance();
} else {
LOG.info("Loading custom MetricsReporter implementation: {}", impl);
DynConstructors.Ctor<MetricsReporter> ctor;
try {
ctor =
DynConstructors.builder(MetricsReporter.class)
.loader(CatalogUtil.class.getClassLoader())
.impl(impl)
.buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(
String.format(
"Cannot initialize MetricsReporter, missing no-arg constructor: %s", impl),
e);
}

LOG.info("Loading custom MetricsReporter implementation: {}", impl);
DynConstructors.Ctor<MetricsReporter> ctor;
try {
ctor =
DynConstructors.builder(MetricsReporter.class)
.loader(CatalogUtil.class.getClassLoader())
.impl(impl)
.buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(
String.format("Cannot initialize MetricsReporter, missing no-arg constructor: %s", impl),
e);
}
try {
reporter = ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format(
"Cannot initialize MetricsReporter, %s does not implement MetricsReporter.", impl),
e);
}

MetricsReporter reporter;
try {
reporter = ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format(
"Cannot initialize MetricsReporter, %s does not implement MetricsReporter.", impl),
e);
reporter.initialize(properties);
}

reporter.initialize(properties);

return reporter;
return FilteringMetricsReporter.wrap(reporter, properties);
}

public static String fullTableName(String catalogName, TableIdentifier identifier) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.metrics;

import java.util.Map;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.apache.iceberg.CatalogProperties;

/**
* A {@link MetricsReporter} wrapper that drops {@link ScanReport} and {@link CommitReport}
* instances whose {@code tableName()} does not pass the configured include / exclude patterns
* before forwarding to a delegate reporter.
*
* <p>The patterns are Java regular expressions sourced from the catalog properties {@link
* CatalogProperties#METRICS_REPORTER_TABLE_NAME_INCLUDE} and {@link
* CatalogProperties#METRICS_REPORTER_TABLE_NAME_EXCLUDE}. When both are set, {@code exclude} wins
* over {@code include} (an explicit deny overrides an include). When neither is set, {@link
* #wrap(MetricsReporter, Map)} returns the delegate unchanged.
*
* <p>{@link MetricsReport} subtypes other than {@link ScanReport} and {@link CommitReport} are
* forwarded to the delegate without filtering, since they do not expose a {@code tableName()}.
*/
public class FilteringMetricsReporter implements MetricsReporter {

private final MetricsReporter delegate;
private final Pattern include;
private final Pattern exclude;

private FilteringMetricsReporter(MetricsReporter delegate, Pattern include, Pattern exclude) {
this.delegate = delegate;
this.include = include;
this.exclude = exclude;
}

/**
* Wraps the given delegate in a {@code FilteringMetricsReporter} when either include or exclude
* is configured in {@code properties}; otherwise returns the delegate unchanged so the default
* case incurs no runtime overhead.
*
* @param delegate the underlying reporter that receives forwarded reports
* @param properties catalog properties; consulted for the table-name include / exclude regex
* @return either the delegate unchanged, or a new filtering wrapper around it
*/
public static MetricsReporter wrap(MetricsReporter delegate, Map<String, String> properties) {
Pattern include =
compileIfPresent(
properties.get(CatalogProperties.METRICS_REPORTER_TABLE_NAME_INCLUDE),
CatalogProperties.METRICS_REPORTER_TABLE_NAME_INCLUDE);
Pattern exclude =
compileIfPresent(
properties.get(CatalogProperties.METRICS_REPORTER_TABLE_NAME_EXCLUDE),
CatalogProperties.METRICS_REPORTER_TABLE_NAME_EXCLUDE);
if (include == null && exclude == null) {
return delegate;
}
return new FilteringMetricsReporter(delegate, include, exclude);
}

private static Pattern compileIfPresent(String value, String propertyName) {
if (value == null || value.isEmpty()) {
return null;
}
try {
return Pattern.compile(value);
} catch (PatternSyntaxException e) {
throw new IllegalArgumentException(
String.format("Invalid regex for %s: %s", propertyName, value), e);
}
}

@Override
public void report(MetricsReport report) {
String tableName = extractTableName(report);
if (tableName == null) {
delegate.report(report);
return;
}
if (exclude != null && exclude.matcher(tableName).matches()) {
return;
}
if (include != null && !include.matcher(tableName).matches()) {
return;
}
delegate.report(report);
}

private static String extractTableName(MetricsReport report) {
if (report instanceof ScanReport) {
return ((ScanReport) report).tableName();
}
if (report instanceof CommitReport) {
return ((CommitReport) report).tableName();
}
return null;
}

@Override
public void close() {
delegate.close();
}
}
23 changes: 23 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestCatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iceberg.io.StorageCredential;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsStorageCredentials;
import org.apache.iceberg.metrics.FilteringMetricsReporter;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -266,6 +267,28 @@ public void loadCustomMetricsReporter_badClass() {
.hasMessageContaining("does not implement MetricsReporter");
}

@Test
public void loadMetricsReporter_wrappedWhenTableNameFilterPresent() {
MetricsReporter metricsReporter =
CatalogUtil.loadMetricsReporter(
ImmutableMap.of(
CatalogProperties.METRICS_REPORTER_IMPL,
TestMetricsReporterDefault.class.getName(),
CatalogProperties.METRICS_REPORTER_TABLE_NAME_INCLUDE,
"prod\\..*"));
assertThat(metricsReporter).isInstanceOf(FilteringMetricsReporter.class);
}

@Test
public void loadMetricsReporter_notWrappedWhenFilterAbsent() {
MetricsReporter metricsReporter =
CatalogUtil.loadMetricsReporter(
ImmutableMap.of(
CatalogProperties.METRICS_REPORTER_IMPL,
TestMetricsReporterDefault.class.getName()));
assertThat(metricsReporter).isInstanceOf(TestMetricsReporterDefault.class);
}

@Test
public void fullTableNameWithDifferentValues() {
String uriTypeCatalogName = "thrift://host:port/db.table";
Expand Down
Loading