Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion model/fn-execution/src/main/proto/beam_fn_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ message Annotation {

// Populated MonitoringInfoSpecs for specific URNs.
// Indicating the required fields to be set.
// SDKS and RunnerHarnesses can load these instances into memory and write a
// SDKs and RunnerHarnesses can load these instances into memory and write a
// validator or code generator to assist with populating and validating
// MonitoringInfo protos.
message MonitoringInfoSpecs {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class SimpleMonitoringInfoBuilder {
for (MonitoringInfoSpecs.Enum val : MonitoringInfoSpecs.Enum.values()) {
// The enum iterator inserts an UNRECOGNIZED = -1 value which isn't explicitly added in
// the proto files.
if (!((Enum) val).name().equals("UNRECOGNIZED")) {
if (!val.name().equals("UNRECOGNIZED")) {
MonitoringInfoSpec spec =
val.getValueDescriptor().getOptions().getExtension(BeamFnApi.monitoringInfoSpec);
SimpleMonitoringInfoBuilder.specs.put(spec.getUrn(), spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package org.apache.beam.runners.flink.metrics;

import static org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns.Enum.USER_COUNTER_URN_PREFIX;
import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
import static org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX;

import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -98,8 +98,8 @@ public MetricsContainer getMetricsContainer(String stepName) {
* <p>TODO: not flink-specific; where should it live?
*/
public static MetricName parseUrn(String urn) {
if (urn.startsWith(USER_COUNTER_URN_PREFIX.toString())) {
urn = urn.substring(USER_COUNTER_URN_PREFIX.toString().length());
if (urn.startsWith(USER_COUNTER_URN_PREFIX)) {
urn = urn.substring(USER_COUNTER_URN_PREFIX.length());
}
// If it is not a user counter, just use the first part of the URN, i.e. 'beam'
String[] pieces = urn.split(":", 2);
Expand All @@ -119,16 +119,17 @@ public void updateMetrics(String stepName, List<BeamFnApi.MonitoringInfo> monito
BeamFnApi.Metric metric = monitoringInfo.getMetric();
if (metric.hasCounterData()) {
BeamFnApi.CounterData counterData = metric.getCounterData();
org.apache.beam.sdk.metrics.Counter counter = metricsContainer.getCounter(metricName);
if (counterData.getValueCase() == BeamFnApi.CounterData.ValueCase.INT64_VALUE) {
org.apache.beam.sdk.metrics.Counter counter =
metricsContainer.getCounter(metricName);
counter.inc(counterData.getInt64Value());
} else {
throw new IllegalArgumentException("Unsupported CounterData type: " + counterData);
LOG.warn("Unsupported CounterData type: {}", counterData);
}
} else if (metric.hasDistributionData()) {
BeamFnApi.DistributionData distributionData = metric.getDistributionData();
Distribution distribution = metricsContainer.getDistribution(metricName);
if (distributionData.hasIntDistributionData()) {
Distribution distribution = metricsContainer.getDistribution(metricName);
BeamFnApi.IntDistributionData intDistributionData =
distributionData.getIntDistributionData();
distribution.update(
Expand All @@ -137,12 +138,11 @@ public void updateMetrics(String stepName, List<BeamFnApi.MonitoringInfo> monito
intDistributionData.getMin(),
intDistributionData.getMax());
} else {
throw new IllegalArgumentException(
"Unsupported DistributionData type: " + distributionData);
LOG.warn("Unsupported DistributionData type: {}", distributionData);
}
} else if (metric.hasExtremaData()) {
BeamFnApi.ExtremaData extremaData = metric.getExtremaData();
throw new IllegalArgumentException("Extrema metric unsupported: " + extremaData);
LOG.warn("Extrema metric unsupported: {}", extremaData);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,32 @@
*/
package org.apache.beam.runners.flink.metrics;

import static org.apache.beam.model.fnexecution.v1.BeamFnApi.labelProps;
import static org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN;
import static org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.CounterData;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.DoubleDistributionData;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.IntDistributionData;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metric;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo.MonitoringInfoLabels;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.DistributionCell;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer.FlinkDistributionGauge;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.DistributionResult;
Expand All @@ -34,11 +51,13 @@
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
Expand All @@ -49,6 +68,13 @@ public class FlinkMetricContainerTest {
@Mock private RuntimeContext runtimeContext;
@Mock private MetricGroup metricGroup;

static final String PTRANSFORM_LABEL =
MonitoringInfoLabels.forNumber(MonitoringInfoLabels.TRANSFORM_VALUE)
.getValueDescriptor()
.getOptions()
.getExtension(labelProps)
.getName();

@Before
public void beforeTest() {
MockitoAnnotations.initMocks(this);
Expand Down Expand Up @@ -106,6 +132,134 @@ public void testGauge() {
assertThat(flinkGauge.getValue().getValue(), is(42L));
}

@Test
public void testMonitoringInfoUpdate() {
FlinkMetricContainer container = new FlinkMetricContainer(runtimeContext);
MetricsContainer step = container.getMetricsContainer("step");

SimpleCounter userCounter = new SimpleCounter();
when(metricGroup.counter("ns1.metric1")).thenReturn(userCounter);

SimpleCounter elemCounter = new SimpleCounter();
when(metricGroup.counter("beam.metric:element_count:v1")).thenReturn(elemCounter);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current namespace parsing logic above just takes the everything after the first : as the "name", so this name is metric:element_count:v1.

I don't see any reason that having additional :s in the "name" is a problem, but I saw a note about it (only related to user metrics, though) in SimpleMonitoringInfoBuilder, so open to whether there's a better way to think about it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose user-defined counters should not be able to change the namespace via colons.


SimpleMonitoringInfoBuilder userCountBuilder = new SimpleMonitoringInfoBuilder();
userCountBuilder.setUrnForUserMetric("ns1", "metric1");
userCountBuilder.setInt64Value(111);
MonitoringInfo userCountMonitoringInfo = userCountBuilder.build();
assertNotNull(userCountMonitoringInfo);

SimpleMonitoringInfoBuilder elemCountBuilder = new SimpleMonitoringInfoBuilder();
elemCountBuilder.setUrn(ELEMENT_COUNT_URN);
elemCountBuilder.setInt64Value(222);
elemCountBuilder.setPTransformLabel("step");
elemCountBuilder.setPCollectionLabel("pcoll");
MonitoringInfo elemCountMonitoringInfo = elemCountBuilder.build();
assertNotNull(elemCountMonitoringInfo);

assertThat(userCounter.getCount(), is(0L));
assertThat(elemCounter.getCount(), is(0L));
container.updateMetrics(
"step", ImmutableList.of(userCountMonitoringInfo, elemCountMonitoringInfo));
assertThat(userCounter.getCount(), is(111L));
assertThat(elemCounter.getCount(), is(222L));
}

@Test
public void testDropUnexpectedMonitoringInfoTypes() {
FlinkMetricContainer flinkContainer = new FlinkMetricContainer(runtimeContext);
MetricsContainer step = flinkContainer.getMetricsContainer("step");

MonitoringInfo intCounter =
MonitoringInfo.newBuilder()
.setUrn(USER_COUNTER_URN_PREFIX + "ns1:int_counter")
.putLabels(PTRANSFORM_LABEL, "step")
.setMetric(
Metric.newBuilder().setCounterData(CounterData.newBuilder().setInt64Value(111)))
.build();

MonitoringInfo doubleCounter =
MonitoringInfo.newBuilder()
.setUrn(USER_COUNTER_URN_PREFIX + "ns2:double_counter")
.putLabels(PTRANSFORM_LABEL, "step")
.setMetric(
Metric.newBuilder().setCounterData(CounterData.newBuilder().setDoubleValue(222)))
.build();

MonitoringInfo intDistribution =
MonitoringInfo.newBuilder()
.setUrn(USER_COUNTER_URN_PREFIX + "ns3:int_distribution")
.putLabels(PTRANSFORM_LABEL, "step")
.setMetric(
Metric.newBuilder()
.setDistributionData(
BeamFnApi.DistributionData.newBuilder()
.setIntDistributionData(
IntDistributionData.newBuilder()
.setSum(30)
.setCount(10)
.setMin(1)
.setMax(5))))
.build();

MonitoringInfo doubleDistribution =
MonitoringInfo.newBuilder()
.setUrn(USER_COUNTER_URN_PREFIX + "ns4:double_distribution")
.putLabels(PTRANSFORM_LABEL, "step")
.setMetric(
Metric.newBuilder()
.setDistributionData(
BeamFnApi.DistributionData.newBuilder()
.setDoubleDistributionData(
DoubleDistributionData.newBuilder()
.setSum(30)
.setCount(10)
.setMin(1)
.setMax(5))))
.build();

// Mock out the counter that Flink returns; the distribution gets created by
// FlinkMetricContainer, not by Flink itself, so we verify it in a different way below
SimpleCounter counter = new SimpleCounter();
when(metricGroup.counter("ns1.int_counter")).thenReturn(counter);

flinkContainer.updateMetrics(
"step", ImmutableList.of(intCounter, doubleCounter, intDistribution, doubleDistribution));

// Flink's MetricGroup should only have asked for one counter (the integer-typed one) to be
// created (the double-typed one is dropped currently)
verify(metricGroup).counter(eq("ns1.int_counter"));

// Verify that the counter injected into flink has the right value
assertThat(counter.getCount(), is(111L));

// Verify the counter in the java SDK MetricsContainer
long count =
((CounterCell) step.getCounter(MetricName.named("ns1", "int_counter"))).getCumulative();
assertThat(count, is(111L));

// The one Flink distribution that gets created is a FlinkDistributionGauge; here we verify its
// initial (and in this test, final) value
verify(metricGroup)
.gauge(
eq("ns3.int_distribution"),
argThat(
new ArgumentMatcher<FlinkDistributionGauge>() {
@Override
public boolean matches(Object argument) {
DistributionResult actual = ((FlinkDistributionGauge) argument).getValue();
DistributionResult expected = DistributionResult.create(30, 10, 1, 5);
return actual.equals(expected);
}
}));

// Verify that the Java SDK MetricsContainer holds the same information
DistributionData distributionData =
((DistributionCell) step.getDistribution(MetricName.named("ns3", "int_distribution")))
.getCumulative();
assertThat(distributionData, is(DistributionData.create(30, 10, 1, 5)));
}

@Test
public void testDistribution() {
FlinkMetricContainer.FlinkDistributionGauge flinkGauge =
Expand Down