diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java index c02027af3570..faf4c526045b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java @@ -19,17 +19,15 @@ package org.apache.beam.runners.spark.metrics; import com.google.common.base.Function; -import com.google.common.base.Objects; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; -import java.util.Set; import org.apache.beam.sdk.metrics.DistributionData; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeData; import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricFiltering; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; @@ -88,44 +86,10 @@ private Predicate> matchesFilter(final MetricsFilter filter) { return new Predicate>() { @Override public boolean apply(MetricUpdate metricResult) { - return matches(filter, metricResult.getKey()); + return MetricFiltering.matches(filter, metricResult.getKey()); } }; } - - private boolean matches(MetricsFilter filter, MetricKey key) { - return matchesName(key.metricName(), filter.names()) - && matchesScope(key.stepName(), filter.steps()); - } - - private boolean matchesName(MetricName metricName, Set nameFilters) { - if (nameFilters.isEmpty()) { - return true; - } - - for (MetricNameFilter nameFilter : nameFilters) { - if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name())) - && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) { - return true; - } - } - - return false; - } - - private boolean matchesScope(String actualScope, Set scopes) { - if (scopes.isEmpty() || scopes.contains(actualScope)) { - return true; - } - - for (String scope : scopes) { - if (actualScope.startsWith(scope)) { - return true; - } - } - - return false; - } } private static final Function, MetricResult>