Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -85,9 +86,9 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
* For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. */
private final String[] scopeComponents;

/** The metrics scope represented by this group, as a concatenated string, lazily computed.
/** Array containing the metrics scope represented by this group for each reporter, as a concatenated string, lazily computed.
* For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
private String scopeString;
private final String[] scopeStrings;

/** The metrics query service scope represented by this group, lazily computed. */
protected QueryScopeInfo queryServiceScopeInfo;
Expand All @@ -101,6 +102,7 @@ public AbstractMetricGroup(MetricRegistry registry, String[] scope, A parent) {
this.registry = checkNotNull(registry);
this.scopeComponents = checkNotNull(scope);
this.parent = parent;
this.scopeStrings = new String[registry.getReporters().size()];
}

public Map<String, String> getAllVariables() {
Expand Down Expand Up @@ -169,19 +171,7 @@ public String getMetricIdentifier(String metricName) {
* @return fully qualified metric name
*/
public String getMetricIdentifier(String metricName, CharacterFilter filter) {
if (scopeString == null) {
if (filter != null) {
scopeString = ScopeFormat.concat(filter, registry.getDelimiter(), scopeComponents);
} else {
scopeString = ScopeFormat.concat(registry.getDelimiter(), scopeComponents);
}
}

if (filter != null) {
return scopeString + registry.getDelimiter() + filter.filterCharacters(metricName);
} else {
return scopeString + registry.getDelimiter() + metricName;
}
return getMetricIdentifier(metricName, filter, -1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling this method will now always log a warning even though there is no actual problem.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain, please? What do you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you call getMetricIdentifier(...) the reporter index will be passed to MetricRegistry#getDelimiter(). For a negative index this function logs a warning, assuming that some error has occurred that caused the index to be negative. Since you pass -1 as the reporter index you will thus always trigger a warning.

Copy link
Author

@ex00 ex00 Oct 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if reporter index not correct for array then in this block not use reporterIndex and call MetricRegistry#getDelimiter()
I will edit to

if (scopeStrings.length == 0 || (reporterIndex < 0 || reporterIndex >= scopeStrings.length)) {
            if (filter != null) {
                String newScopeString = ScopeFormat.concat(filter, registry.getDelimiter(), scopeComponents);
                return newScopeString + registry.getDelimiter() + filter.filterCharacters(metricName);
            } else {
                String newScopeString = ScopeFormat.concat(registry.getDelimiter(), scopeComponents);
                return newScopeString + registry.getDelimiter() + metricName;
            }
        }

and warning message will not show

}

/**
Expand All @@ -194,12 +184,29 @@ public String getMetricIdentifier(String metricName, CharacterFilter filter) {
* @return fully qualified metric name
*/
public String getMetricIdentifier(String metricName, CharacterFilter filter, int reporterIndex) {
if (scopeStrings.length == 0 || (reporterIndex < 0 || reporterIndex >= scopeStrings.length)) {
if (filter != null) {
String newScopeString = ScopeFormat.concat(filter, registry.getDelimiter(), scopeComponents);
return newScopeString + registry.getDelimiter() + filter.filterCharacters(metricName);
} else {
String newScopeString = ScopeFormat.concat(registry.getDelimiter(), scopeComponents);
return newScopeString + registry.getDelimiter() + metricName;
}
}
//we assume that a single reporter will only pass identical filters each times
if (scopeStrings[reporterIndex] == null) {
if (filter != null) {
scopeStrings[reporterIndex] = ScopeFormat.concat(filter, registry.getDelimiter(reporterIndex), scopeComponents);
return scopeStrings[reporterIndex] + registry.getDelimiter(reporterIndex) + filter.filterCharacters(metricName);
} else {
scopeStrings[reporterIndex] = ScopeFormat.concat(registry.getDelimiter(reporterIndex), scopeComponents);
return scopeStrings[reporterIndex] + registry.getDelimiter(reporterIndex) + metricName;
}
}
if (filter != null) {
scopeString = ScopeFormat.concat(filter, registry.getDelimiter(reporterIndex), scopeComponents);
return scopeString + registry.getDelimiter(reporterIndex) + filter.filterCharacters(metricName);
return scopeStrings[reporterIndex] + registry.getDelimiter(reporterIndex) + filter.filterCharacters(metricName);
} else {
scopeString = ScopeFormat.concat(registry.getDelimiter(reporterIndex), scopeComponents);
return scopeString + registry.getDelimiter(reporterIndex) + metricName;
return scopeStrings[reporterIndex] + registry.getDelimiter(reporterIndex) + metricName;
}
}

Expand Down Expand Up @@ -312,7 +319,7 @@ protected void addMetric(String name, Metric metric) {
// we warn here, rather than failing, because metrics are tools that should not fail the
// program when used incorrectly
LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: '" +
name + "'. Metric might not get properly reported. (" + scopeString + ')');
name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents));
}

registry.register(metric, name, this);
Expand All @@ -324,7 +331,7 @@ protected void addMetric(String name, Metric metric) {
// we warn here, rather than failing, because metrics are tools that should not fail the
// program when used incorrectly
LOG.warn("Name collision: Group already contains a Metric with the name '" +
name + "'. Metric will not be reported. (" + scopeString + ')');
name + "'. Metric will not be reported." + Arrays.toString(scopeComponents));
}
}
}
Expand All @@ -348,7 +355,7 @@ public MetricGroup addGroup(String name) {
// program when used incorrectly
if (metrics.containsKey(name)) {
LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: '" +
name + "'. Metric might not get properly reported. (" + scopeString + ')');
name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents));
}

AbstractMetricGroup newGroup = new GenericMetricGroup(registry, this, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@
*/
package org.apache.flink.runtime.metrics.groups;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.TestReporter;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class AbstractMetricGroupTest {
/**
Expand All @@ -44,4 +52,138 @@ protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {

registry.shutdown();
}

// for test case: one filter for different reporters with different of scope delimiter
protected static CharacterFilter staticCharacterFilter = new CharacterFilter() {
@Override
public String filterCharacters(String input) {
return input.replace("C", "RR");
}
};

@Test
public void filteringForMultipleReporters() {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D");
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");

MetricRegistry testRegistry = new TestMetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id");
tmGroup.counter(1);
assertEquals("MetricReporters list should be contain reporters", 2, testRegistry.getReporters().size());
for (MetricReporter reporter: testRegistry.getReporters()) {
assertEquals("The number of successful checks in " + reporter.getClass() + " does not match with expected", 2, ((ScopeCheckingTestReporter)reporter).countSuccessChecks );
}
testRegistry.shutdown();
}

@Test
public void filteringForNullReporters() {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D");
TestMetricRegistry testRegistry = new TestMetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));

TaskManagerMetricGroup tmGroupForTestRegistry = new TaskManagerMetricGroup(testRegistry, "host", "id");
assertEquals("MetricReporters list should be empty", 0, testRegistry.getReporters().size());
tmGroupForTestRegistry.counter(1);
testRegistry.shutdown();
}

public static abstract class ScopeCheckingTestReporter extends TestReporter {
protected int countSuccessChecks = 0;

public abstract void checkScopes(Metric metric, String metricName, MetricGroup group);

public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
checkScopes(metric, metricName, group);
countSuccessChecks++;
}
}

public static class TestReporter1 extends ScopeCheckingTestReporter {
@Override
public String filterCharacters(String input) {
return input.replace("A", "RR");
}
@Override
public void checkScopes(Metric metric, String metricName, MetricGroup group) {
assertEquals("A-B-C-D-1", group.getMetricIdentifier(metricName));
// ignore all next filters for scope - because scopeString cached with only first filter
assertEquals("A-B-C-D-1", group.getMetricIdentifier(metricName, staticCharacterFilter));
assertEquals("A-B-C-D-1", group.getMetricIdentifier(metricName, this));
assertEquals("A-B-C-D-4", group.getMetricIdentifier(metricName, new CharacterFilter() {
@Override
public String filterCharacters(String input) {
return input.replace("B", "RR").replace("1", "4");
}
}));

}
}
public static class TestReporter2 extends ScopeCheckingTestReporter {
@Override
public String filterCharacters(String input) {
return input.replace("B", "RR");
}
@Override
public void checkScopes(Metric metric, String metricName, MetricGroup group) {
assertEquals("A!RR!C!D!1", group.getMetricIdentifier(metricName, this));
// ignore all next filters - because scopeString cached with only first filter
assertEquals("A!RR!C!D!1", group.getMetricIdentifier(metricName));
assertEquals("A!RR!C!D!1", group.getMetricIdentifier(metricName, staticCharacterFilter));
assertEquals("A!RR!C!D!3", group.getMetricIdentifier(metricName, new CharacterFilter() {
@Override
public String filterCharacters(String input) {
return input.replace("A", "RR").replace("1", "3");
}
}));
}
}

private class TestMetricRegistry extends MetricRegistry {
public TestMetricRegistry(MetricRegistryConfiguration config) {
super(config);
}
@Override
public void register(Metric metric, String metricName, AbstractMetricGroup group) {
checkApplyFilterForIncorrectReportIndex(metric, metricName, group);
super.register(metric, metricName, group);
}
private void checkApplyFilterForIncorrectReportIndex(Metric metric, String metricName, AbstractMetricGroup group) {
try {
// this filters will be use always because use incorrect of reporterIndex
assertEquals("A.B.RR.D.1", group.getMetricIdentifier(metricName, staticCharacterFilter));
assertEquals("A.B.RR.D.1", group.getMetricIdentifier(metricName, staticCharacterFilter, getReporters().size() + 2));

if (getReporters() != null) {
for (int i = 0; i < getReporters().size(); i++) {
MetricReporter reporter = getReporters().get(i);
if (reporter != null) {
if (reporter instanceof ScopeCheckingTestReporter) {
if (reporter instanceof TestReporter2) {
assertEquals("A.RR.C.D.1", group.getMetricIdentifier(metricName, (CharacterFilter) reporter));
assertEquals("A.RR.C.D.1", group.getMetricIdentifier(metricName, (CharacterFilter) reporter, getReporters().size() + 2));
assertEquals("A.B.C.D.1", group.getMetricIdentifier(metricName, null, getReporters().size() + 2));
} else if (reporter instanceof TestReporter1) {
assertEquals("RR.B.C.D.1", group.getMetricIdentifier(metricName, (CharacterFilter) reporter));
assertEquals("RR.B.C.D.1", group.getMetricIdentifier(metricName, (CharacterFilter) reporter, getReporters().size() + 2));
assertEquals("A.B.C.D.1", group.getMetricIdentifier(metricName, null, getReporters().size() + 2));
}
((ScopeCheckingTestReporter) reporter).countSuccessChecks++;
} else {
fail("Unknown reporter class: " + reporter.getClass().getSimpleName());
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
fail(e.toString());
}
}
}
}