Skip to content

Commit

Permalink
Merge pull request #14473: [BEAM-12127] Remove delegation for counter…
Browse files Browse the repository at this point in the history
…s in PCollectionConsumerRegistry where the unbound container is known at counter construction.
  • Loading branch information
kennknowles committed Apr 12, 2021
2 parents f717477 + 5974518 commit 1d5f887
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.LabeledMetrics;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
Expand Down Expand Up @@ -201,9 +200,8 @@ private class MetricTrackingFnDataReceiver<T> implements FnDataReceiver<Windowed
private final FnDataReceiver<WindowedValue<T>> delegate;
private final String pTransformId;
private final SimpleExecutionState state;
private final Counter elementCountCounter;
private final SampleByteSizeDistribution<T> sampledByteSizeDistribution;
private final MetricsContainer unboundMetricContainer;
private final Counter unboundedElementCountCounter;
private final SampleByteSizeDistribution<T> unboundSampledByteSizeDistribution;
private final Coder<T> coder;

public MetricTrackingFnDataReceiver(
Expand All @@ -213,38 +211,39 @@ public MetricTrackingFnDataReceiver(
this.pTransformId = consumerAndMetadata.getPTransformId();
HashMap<String, String> labels = new HashMap<String, String>();
labels.put(Labels.PCOLLECTION, pCollectionId);

// Collect the metric in a metric container which is not bound to the step name.
// This is required to count elements from impulse steps, which will produce elements outside
// of a pTransform context.
MetricsContainer unboundMetricContainer = metricsContainerRegistry.getUnboundContainer();

MonitoringInfoMetricName elementCountMetricName =
MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.ELEMENT_COUNT, labels);
this.elementCountCounter = LabeledMetrics.counter(elementCountMetricName);
this.unboundedElementCountCounter = unboundMetricContainer.getCounter(elementCountMetricName);

MonitoringInfoMetricName sampledByteSizeMetricName =
MonitoringInfoMetricName.named(Urns.SAMPLED_BYTE_SIZE, labels);
this.sampledByteSizeDistribution =
new SampleByteSizeDistribution<>(LabeledMetrics.distribution(sampledByteSizeMetricName));
this.coder = consumerAndMetadata.getValueCoder();
this.unboundSampledByteSizeDistribution =
new SampleByteSizeDistribution<>(
unboundMetricContainer.getDistribution(sampledByteSizeMetricName));

// Collect the metric in a metric container which is not bound to the step name.
// This is required to count elements from impulse steps, which will produce elements outside
// of a pTransform context.
this.unboundMetricContainer = metricsContainerRegistry.getUnboundContainer();
this.coder = consumerAndMetadata.getValueCoder();
}

@Override
public void accept(WindowedValue<T> input) throws Exception {
try (Closeable close =
MetricsEnvironment.scopedMetricsContainer(this.unboundMetricContainer)) {
// Increment the counter for each window the element occurs in.
this.elementCountCounter.inc(input.getWindows().size());
// TODO(BEAM-11879): Consider updating size per window when we have window optimization.
this.sampledByteSizeDistribution.tryUpdate(input.getValue(), this.coder);
// Wrap the consumer with extra logic to set the metric container with the appropriate
// PTransform context. This ensures that user metrics obtain the pTransform ID when they are
// created. Also use the ExecutionStateTracker and enter an appropriate state to track the
// Process Bundle Execution time metric.
MetricsContainerImpl container = metricsContainerRegistry.getContainer(pTransformId);
try (Closeable closeable = MetricsEnvironment.scopedMetricsContainer(container)) {
try (Closeable trackerCloseable = stateTracker.enterState(state)) {
this.delegate.accept(input);
}
// Increment the counter for each window the element occurs in.
this.unboundedElementCountCounter.inc(input.getWindows().size());
// TODO(BEAM-11879): Consider updating size per window when we have window optimization.
this.unboundSampledByteSizeDistribution.tryUpdate(input.getValue(), this.coder);
// Wrap the consumer with extra logic to set the metric container with the appropriate
// PTransform context. This ensures that user metrics obtain the pTransform ID when they are
// created. Also use the ExecutionStateTracker and enter an appropriate state to track the
// Process Bundle Execution time metric.
MetricsContainerImpl container = metricsContainerRegistry.getContainer(pTransformId);
try (Closeable closeable = MetricsEnvironment.scopedMetricsContainer(container)) {
try (Closeable trackerCloseable = stateTracker.enterState(state)) {
this.delegate.accept(input);
}
}
}
Expand All @@ -260,52 +259,51 @@ public void accept(WindowedValue<T> input) throws Exception {
private class MultiplexingMetricTrackingFnDataReceiver<T>
implements FnDataReceiver<WindowedValue<T>> {
private final List<ConsumerAndMetadata> consumerAndMetadatas;
private final Counter elementCountCounter;
private final SampleByteSizeDistribution<T> sampledByteSizeDistribution;
private final MetricsContainer unboundMetricContainer;
private final Counter unboundedElementCountCounter;
private final SampleByteSizeDistribution<T> unboundedSampledByteSizeDistribution;

public MultiplexingMetricTrackingFnDataReceiver(
String pCollectionId, List<ConsumerAndMetadata> consumerAndMetadatas) {
this.consumerAndMetadatas = consumerAndMetadatas;
HashMap<String, String> labels = new HashMap<String, String>();
labels.put(Labels.PCOLLECTION, pCollectionId);

// Collect the metric in a metric container which is not bound to the step name.
// This is required to count elements from impulse steps, which will produce elements outside
// of a pTransform context.
MetricsContainer unboundMetricContainer = metricsContainerRegistry.getUnboundContainer();
MonitoringInfoMetricName elementCountMetricName =
MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.ELEMENT_COUNT, labels);
this.elementCountCounter = LabeledMetrics.counter(elementCountMetricName);
this.unboundedElementCountCounter = unboundMetricContainer.getCounter(elementCountMetricName);

MonitoringInfoMetricName sampledByteSizeMetricName =
MonitoringInfoMetricName.named(Urns.SAMPLED_BYTE_SIZE, labels);
this.sampledByteSizeDistribution =
new SampleByteSizeDistribution<>(LabeledMetrics.distribution(sampledByteSizeMetricName));
// Collect the metric in a metric container which is not bound to the step name.
// This is required to count elements from impulse steps, which will produce elements outside
// of a pTransform context.
this.unboundMetricContainer = metricsContainerRegistry.getUnboundContainer();
this.unboundedSampledByteSizeDistribution =
new SampleByteSizeDistribution<>(
unboundMetricContainer.getDistribution(sampledByteSizeMetricName));
}

@Override
public void accept(WindowedValue<T> input) throws Exception {
try (Closeable close =
MetricsEnvironment.scopedMetricsContainer(this.unboundMetricContainer)) {
// Increment the counter for each window the element occurs in.
this.elementCountCounter.inc(input.getWindows().size());
// Wrap the consumer with extra logic to set the metric container with the appropriate
// PTransform context. This ensures that user metrics obtain the pTransform ID when they are
// created. Also use the ExecutionStateTracker and enter an appropriate state to track the
// Process Bundle Execution time metric.
for (ConsumerAndMetadata consumerAndMetadata : consumerAndMetadatas) {

if (consumerAndMetadata.getValueCoder() != null) {
// TODO(BEAM-11879): Consider updating size per window when we have window optimization.
this.sampledByteSizeDistribution.tryUpdate(
input.getValue(), consumerAndMetadata.getValueCoder());
}
MetricsContainerImpl container =
metricsContainerRegistry.getContainer(consumerAndMetadata.getPTransformId());
try (Closeable closeable = MetricsEnvironment.scopedMetricsContainer(container)) {
try (Closeable trackerCloseable =
stateTracker.enterState(consumerAndMetadata.getExecutionState())) {
consumerAndMetadata.getConsumer().accept(input);
}
// Increment the counter for each window the element occurs in.
this.unboundedElementCountCounter.inc(input.getWindows().size());
// Wrap the consumer with extra logic to set the metric container with the appropriate
// PTransform context. This ensures that user metrics obtain the pTransform ID when they are
// created. Also use the ExecutionStateTracker and enter an appropriate state to track the
// Process Bundle Execution time metric.
for (ConsumerAndMetadata consumerAndMetadata : consumerAndMetadatas) {

if (consumerAndMetadata.getValueCoder() != null) {
// TODO(BEAM-11879): Consider updating size per window when we have window optimization.
this.unboundedSampledByteSizeDistribution.tryUpdate(
input.getValue(), consumerAndMetadata.getValueCoder());
}
MetricsContainerImpl container =
metricsContainerRegistry.getContainer(consumerAndMetadata.getPTransformId());
try (Closeable closeable = MetricsEnvironment.scopedMetricsContainer(container)) {
try (Closeable trackerCloseable =
stateTracker.enterState(consumerAndMetadata.getExecutionState())) {
consumerAndMetadata.getConsumer().accept(input);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,13 @@ public void setUp() {
public void testCreatingAndProcessingBeamFnDataWriteRunner() throws Exception {
String bundleId = "57L";

PCollectionConsumerRegistry consumers =
new PCollectionConsumerRegistry(
mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class));
MetricsContainerStepMap metricMap = new MetricsContainerStepMap();
ExecutionStateTracker tracker = mock(ExecutionStateTracker.class);
PCollectionConsumerRegistry consumers = new PCollectionConsumerRegistry(metricMap, tracker);
PTransformFunctionRegistry startFunctionRegistry =
new PTransformFunctionRegistry(
mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
new PTransformFunctionRegistry(metricMap, tracker, "start");
PTransformFunctionRegistry finishFunctionRegistry =
new PTransformFunctionRegistry(
mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "finish");
new PTransformFunctionRegistry(metricMap, tracker, "finish");
List<ThrowingRunnable> teardownFunctions = new ArrayList<>();

String localInputId = "inputPC";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,18 @@ public void testFullWindowedValueMapping() throws Exception {
@Test
public void testFullWindowedValueMappingWithCompressedWindow() throws Exception {
List<WindowedValue<?>> outputConsumer = new ArrayList<>();
MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap();
PCollectionConsumerRegistry consumers =
new PCollectionConsumerRegistry(
mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class));
metricsContainerRegistry, mock(ExecutionStateTracker.class));
consumers.register("outputPC", "pTransformId", outputConsumer::add, StringUtf8Coder.of());

PTransformFunctionRegistry startFunctionRegistry =
new PTransformFunctionRegistry(
mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "start");
metricsContainerRegistry, mock(ExecutionStateTracker.class), "start");
PTransformFunctionRegistry finishFunctionRegistry =
new PTransformFunctionRegistry(
mock(MetricsContainerStepMap.class), mock(ExecutionStateTracker.class), "finish");
metricsContainerRegistry, mock(ExecutionStateTracker.class), "finish");
List<ThrowingRunnable> teardownFunctions = new ArrayList<>();

MapFnRunners.forWindowedValueMapFnFactory(this::createMapFunctionForPTransform)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.powermock.api.mockito.PowerMockito.mockStatic;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
Expand All @@ -41,6 +42,7 @@
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Labels;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns;
import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
Expand Down Expand Up @@ -268,7 +270,7 @@ public void testScopedMetricContainerInvokedUponAcceptingElement() throws Except
}

@Test
public void testScopedMetricContainerInvokedUponAccept() throws Exception {
public void testUnboundedCountersUponAccept() throws Exception {
mockStatic(MetricsEnvironment.class, withSettings().verboseLogging());
final String pCollectionA = "pCollectionA";
final String pTransformIdA = "pTransformIdA";
Expand All @@ -291,9 +293,14 @@ public void testScopedMetricContainerInvokedUponAccept() throws Exception {

verify(consumer, times(1)).accept(element);

// Verify that static scopedMetricsContainer is called with unbound container.
PowerMockito.verifyStatic(MetricsEnvironment.class, times(1));
MetricsEnvironment.scopedMetricsContainer(metricsContainerRegistry.getUnboundContainer());
HashMap<String, String> labels = new HashMap<String, String>();
labels.put(Labels.PCOLLECTION, "pCollectionA");
MonitoringInfoMetricName counterName =
MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.ELEMENT_COUNT, labels);
assertEquals(
1L,
(long)
metricsContainerRegistry.getUnboundContainer().getCounter(counterName).getCumulative());
}

@Test
Expand Down

0 comments on commit 1d5f887

Please sign in to comment.