diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index 907c6558f13ad..270173b77b2e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -85,9 +86,9 @@ public abstract class AbstractMetricGroup> impl * For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. */ private final String[] scopeComponents; - /** The metrics scope represented by this group, as a concatenated string, lazily computed. + /** Array containing the metrics scope represented by this group for each reporter, as a concatenated string, lazily computed. * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */ - private String scopeString; + private final String[] scopeStrings; /** The metrics query service scope represented by this group, lazily computed. */ protected QueryScopeInfo queryServiceScopeInfo; @@ -101,6 +102,7 @@ public AbstractMetricGroup(MetricRegistry registry, String[] scope, A parent) { this.registry = checkNotNull(registry); this.scopeComponents = checkNotNull(scope); this.parent = parent; + this.scopeStrings = new String[registry.getReporters().size()]; } public Map getAllVariables() { @@ -169,19 +171,7 @@ public String getMetricIdentifier(String metricName) { * @return fully qualified metric name */ public String getMetricIdentifier(String metricName, CharacterFilter filter) { - if (scopeString == null) { - if (filter != null) { - scopeString = ScopeFormat.concat(filter, registry.getDelimiter(), scopeComponents); - } else { - scopeString = ScopeFormat.concat(registry.getDelimiter(), scopeComponents); - } - } - - if (filter != null) { - return scopeString + registry.getDelimiter() + filter.filterCharacters(metricName); - } else { - return scopeString + registry.getDelimiter() + metricName; - } + return getMetricIdentifier(metricName, filter, -1); } /** @@ -194,12 +184,29 @@ public String getMetricIdentifier(String metricName, CharacterFilter filter) { * @return fully qualified metric name */ public String getMetricIdentifier(String metricName, CharacterFilter filter, int reporterIndex) { + if (scopeStrings.length == 0 || (reporterIndex < 0 || reporterIndex >= scopeStrings.length)) { + if (filter != null) { + String newScopeString = ScopeFormat.concat(filter, registry.getDelimiter(), scopeComponents); + return newScopeString + registry.getDelimiter() + filter.filterCharacters(metricName); + } else { + String newScopeString = ScopeFormat.concat(registry.getDelimiter(), scopeComponents); + return newScopeString + registry.getDelimiter() + metricName; + } + } + //we assume that a single reporter will only pass identical filters each times + if (scopeStrings[reporterIndex] == null) { + if (filter != null) { + scopeStrings[reporterIndex] = ScopeFormat.concat(filter, registry.getDelimiter(reporterIndex), scopeComponents); + return scopeStrings[reporterIndex] + registry.getDelimiter(reporterIndex) + filter.filterCharacters(metricName); + } else { + scopeStrings[reporterIndex] = ScopeFormat.concat(registry.getDelimiter(reporterIndex), scopeComponents); + return scopeStrings[reporterIndex] + registry.getDelimiter(reporterIndex) + metricName; + } + } if (filter != null) { - scopeString = ScopeFormat.concat(filter, registry.getDelimiter(reporterIndex), scopeComponents); - return scopeString + registry.getDelimiter(reporterIndex) + filter.filterCharacters(metricName); + return scopeStrings[reporterIndex] + registry.getDelimiter(reporterIndex) + filter.filterCharacters(metricName); } else { - scopeString = ScopeFormat.concat(registry.getDelimiter(reporterIndex), scopeComponents); - return scopeString + registry.getDelimiter(reporterIndex) + metricName; + return scopeStrings[reporterIndex] + registry.getDelimiter(reporterIndex) + metricName; } } @@ -312,7 +319,7 @@ protected void addMetric(String name, Metric metric) { // we warn here, rather than failing, because metrics are tools that should not fail the // program when used incorrectly LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: '" + - name + "'. Metric might not get properly reported. (" + scopeString + ')'); + name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents)); } registry.register(metric, name, this); @@ -324,7 +331,7 @@ protected void addMetric(String name, Metric metric) { // we warn here, rather than failing, because metrics are tools that should not fail the // program when used incorrectly LOG.warn("Name collision: Group already contains a Metric with the name '" + - name + "'. Metric will not be reported. (" + scopeString + ')'); + name + "'. Metric will not be reported." + Arrays.toString(scopeComponents)); } } } @@ -348,7 +355,7 @@ public MetricGroup addGroup(String name) { // program when used incorrectly if (metrics.containsKey(name)) { LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: '" + - name + "'. Metric might not get properly reported. (" + scopeString + ')'); + name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents)); } AbstractMetricGroup newGroup = new GenericMetricGroup(registry, this, name); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java index ca3810a81b85c..892f3ba070fa4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java @@ -17,13 +17,21 @@ */ package org.apache.flink.runtime.metrics.groups; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.metrics.util.TestReporter; import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class AbstractMetricGroupTest { /** @@ -44,4 +52,138 @@ protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) { registry.shutdown(); } + + // for test case: one filter for different reporters with different of scope delimiter + protected static CharacterFilter staticCharacterFilter = new CharacterFilter() { + @Override + public String filterCharacters(String input) { + return input.replace("C", "RR"); + } + }; + + @Test + public void filteringForMultipleReporters() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D"); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!"); + + MetricRegistry testRegistry = new TestMetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id"); + tmGroup.counter(1); + assertEquals("MetricReporters list should be contain reporters", 2, testRegistry.getReporters().size()); + for (MetricReporter reporter: testRegistry.getReporters()) { + assertEquals("The number of successful checks in " + reporter.getClass() + " does not match with expected", 2, ((ScopeCheckingTestReporter)reporter).countSuccessChecks ); + } + testRegistry.shutdown(); + } + + @Test + public void filteringForNullReporters() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D"); + TestMetricRegistry testRegistry = new TestMetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + + TaskManagerMetricGroup tmGroupForTestRegistry = new TaskManagerMetricGroup(testRegistry, "host", "id"); + assertEquals("MetricReporters list should be empty", 0, testRegistry.getReporters().size()); + tmGroupForTestRegistry.counter(1); + testRegistry.shutdown(); + } + + public static abstract class ScopeCheckingTestReporter extends TestReporter { + protected int countSuccessChecks = 0; + + public abstract void checkScopes(Metric metric, String metricName, MetricGroup group); + + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + checkScopes(metric, metricName, group); + countSuccessChecks++; + } + } + + public static class TestReporter1 extends ScopeCheckingTestReporter { + @Override + public String filterCharacters(String input) { + return input.replace("A", "RR"); + } + @Override + public void checkScopes(Metric metric, String metricName, MetricGroup group) { + assertEquals("A-B-C-D-1", group.getMetricIdentifier(metricName)); + // ignore all next filters for scope - because scopeString cached with only first filter + assertEquals("A-B-C-D-1", group.getMetricIdentifier(metricName, staticCharacterFilter)); + assertEquals("A-B-C-D-1", group.getMetricIdentifier(metricName, this)); + assertEquals("A-B-C-D-4", group.getMetricIdentifier(metricName, new CharacterFilter() { + @Override + public String filterCharacters(String input) { + return input.replace("B", "RR").replace("1", "4"); + } + })); + + } + } + public static class TestReporter2 extends ScopeCheckingTestReporter { + @Override + public String filterCharacters(String input) { + return input.replace("B", "RR"); + } + @Override + public void checkScopes(Metric metric, String metricName, MetricGroup group) { + assertEquals("A!RR!C!D!1", group.getMetricIdentifier(metricName, this)); + // ignore all next filters - because scopeString cached with only first filter + assertEquals("A!RR!C!D!1", group.getMetricIdentifier(metricName)); + assertEquals("A!RR!C!D!1", group.getMetricIdentifier(metricName, staticCharacterFilter)); + assertEquals("A!RR!C!D!3", group.getMetricIdentifier(metricName, new CharacterFilter() { + @Override + public String filterCharacters(String input) { + return input.replace("A", "RR").replace("1", "3"); + } + })); + } + } + + private class TestMetricRegistry extends MetricRegistry { + public TestMetricRegistry(MetricRegistryConfiguration config) { + super(config); + } + @Override + public void register(Metric metric, String metricName, AbstractMetricGroup group) { + checkApplyFilterForIncorrectReportIndex(metric, metricName, group); + super.register(metric, metricName, group); + } + private void checkApplyFilterForIncorrectReportIndex(Metric metric, String metricName, AbstractMetricGroup group) { + try { + // this filters will be use always because use incorrect of reporterIndex + assertEquals("A.B.RR.D.1", group.getMetricIdentifier(metricName, staticCharacterFilter)); + assertEquals("A.B.RR.D.1", group.getMetricIdentifier(metricName, staticCharacterFilter, getReporters().size() + 2)); + + if (getReporters() != null) { + for (int i = 0; i < getReporters().size(); i++) { + MetricReporter reporter = getReporters().get(i); + if (reporter != null) { + if (reporter instanceof ScopeCheckingTestReporter) { + if (reporter instanceof TestReporter2) { + assertEquals("A.RR.C.D.1", group.getMetricIdentifier(metricName, (CharacterFilter) reporter)); + assertEquals("A.RR.C.D.1", group.getMetricIdentifier(metricName, (CharacterFilter) reporter, getReporters().size() + 2)); + assertEquals("A.B.C.D.1", group.getMetricIdentifier(metricName, null, getReporters().size() + 2)); + } else if (reporter instanceof TestReporter1) { + assertEquals("RR.B.C.D.1", group.getMetricIdentifier(metricName, (CharacterFilter) reporter)); + assertEquals("RR.B.C.D.1", group.getMetricIdentifier(metricName, (CharacterFilter) reporter, getReporters().size() + 2)); + assertEquals("A.B.C.D.1", group.getMetricIdentifier(metricName, null, getReporters().size() + 2)); + } + ((ScopeCheckingTestReporter) reporter).countSuccessChecks++; + } else { + fail("Unknown reporter class: " + reporter.getClass().getSimpleName()); + } + } + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.toString()); + } + } + } }