diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto index 948d07805d831..e089e03763501 100644 --- a/model/fn-execution/src/main/proto/beam_fn_api.proto +++ b/model/fn-execution/src/main/proto/beam_fn_api.proto @@ -682,6 +682,7 @@ message Metrics { // User defined metrics message User { + // A key for identifying a metric at the most granular level. message MetricName { // (Required): The namespace of this metric. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java index c8e7390ea20a3..cdebefcf6fa3a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java @@ -17,12 +17,8 @@ */ package org.apache.beam.runners.core.metrics; -import com.google.common.base.Splitter; import java.time.Instant; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo; @@ -75,6 +71,8 @@ public class SimpleMonitoringInfoBuilder { private MonitoringInfo.Builder builder; + private SpecMonitoringInfoValidator validator = new SpecMonitoringInfoValidator(); + static { for (MonitoringInfoSpecs.Enum val : MonitoringInfoSpecs.Enum.values()) { // The enum iterator inserts an UNRECOGNIZED = -1 value which isn't explicitly added in @@ -96,56 +94,6 @@ public SimpleMonitoringInfoBuilder(boolean validateAndDropInvalid) { this.validateAndDropInvalid = validateAndDropInvalid; } - /** @return True if the MonitoringInfo has valid fields set, matching the spec */ - private boolean validate() { - String urn = this.builder.getUrn(); - if (urn == null || urn.isEmpty()) { - LOG.warn("Dropping MonitoringInfo since no URN was specified."); - return false; - } - - MonitoringInfoSpec spec; - // If it's a user counter, and it has this prefix. - if (urn.startsWith(USER_COUNTER_URN_PREFIX)) { - spec = SimpleMonitoringInfoBuilder.specs.get(USER_COUNTER_URN_PREFIX); - List split = Splitter.on(':').splitToList(urn); - if (split.size() != 5) { - LOG.warn( - "Dropping MonitoringInfo for URN {}, UserMetric namespaces and " - + "name cannot contain ':' characters.", - urn); - return false; - } - } else if (!SimpleMonitoringInfoBuilder.specs.containsKey(urn)) { - // Succeed for unknown URNs, this is an extensible metric. - return true; - } else { - spec = SimpleMonitoringInfoBuilder.specs.get(urn); - } - - if (!this.builder.getType().equals(spec.getTypeUrn())) { - LOG.warn( - "Dropping MonitoringInfo since for URN {} with invalid type field. Expected: {}" - + " Actual: {}", - this.builder.getUrn(), - spec.getTypeUrn(), - this.builder.getType()); - return false; - } - - Set requiredLabels = new HashSet(spec.getRequiredLabelsList()); - if (!this.builder.getLabels().keySet().equals(requiredLabels)) { - LOG.warn( - "Dropping MonitoringInfo since for URN {} with invalid labels. Expected: {}" - + " Actual: {}", - this.builder.getUrn(), - requiredLabels, - this.builder.getLabels().keySet()); - return false; - } - return true; - } - /** @return The metric URN for a user metric, with a proper URN prefix. */ private static String userMetricUrn(String metricNamespace, String metricName) { String fixedMetricNamespace = metricNamespace.replace(':', '_'); @@ -163,8 +111,9 @@ private static String userMetricUrn(String metricNamespace, String metricName) { * * @param urn The urn of the MonitoringInfo */ - public void setUrn(String urn) { + public SimpleMonitoringInfoBuilder setUrn(String urn) { this.builder.setUrn(urn); + return this; } /** @@ -173,36 +122,42 @@ public void setUrn(String urn) { * @param namespace * @param name */ - public void setUrnForUserMetric(String namespace, String name) { + public SimpleMonitoringInfoBuilder setUrnForUserMetric(String namespace, String name) { this.builder.setUrn(userMetricUrn(namespace, name)); + return this; } /** Sets the timestamp of the MonitoringInfo to the current time. */ - public void setTimestampToNow() { + public SimpleMonitoringInfoBuilder setTimestampToNow() { Instant time = Instant.now(); this.builder.getTimestampBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()); + return this; } /** Sets the int64Value of the CounterData in the MonitoringInfo, and the appropraite type URN. */ - public void setInt64Value(long value) { + public SimpleMonitoringInfoBuilder setInt64Value(long value) { this.builder.getMetricBuilder().getCounterDataBuilder().setInt64Value(value); this.builder.setType(SUM_INT64_TYPE_URN); + return this; } /** Sets the PTRANSFORM MonitoringInfo label to the given param. */ - public void setPTransformLabel(String pTransform) { + public SimpleMonitoringInfoBuilder setPTransformLabel(String pTransform) { // TODO(ajamato): Add validation that it is a valid pTransform name in the bundle descriptor. setLabel("PTRANSFORM", pTransform); + return this; } /** Sets the PCOLLECTION MonitoringInfo label to the given param. */ - public void setPCollectionLabel(String pCollection) { + public SimpleMonitoringInfoBuilder setPCollectionLabel(String pCollection) { setLabel("PCOLLECTION", pCollection); + return this; } /** Sets the MonitoringInfo label to the given name and value. */ - public void setLabel(String labelName, String labelValue) { + public SimpleMonitoringInfoBuilder setLabel(String labelName, String labelValue) { this.builder.putLabels(labelName, labelValue); + return this; } /** @@ -211,9 +166,12 @@ public void setLabel(String labelName, String labelValue) { */ @Nullable public MonitoringInfo build() { - if (validateAndDropInvalid && !validate()) { + final MonitoringInfo result = this.builder.build(); + + if (validateAndDropInvalid && this.validator.validate(result).isPresent()) { return null; } - return this.builder.build(); + + return result; } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java new file mode 100644 index 0000000000000..8f990a1bc5223 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidator.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.metrics; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpec; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpecs; + +/** Class implements validation of MonitoringInfos against MonitoringInfoSpecs. */ +public class SpecMonitoringInfoValidator { + protected final MonitoringInfoSpec[] specs; + + public SpecMonitoringInfoValidator() { + specs = + Arrays.stream(MonitoringInfoSpecs.Enum.values()) + .filter(x -> !(x).name().equals("UNRECOGNIZED")) + .map( + x -> x.getValueDescriptor().getOptions().getExtension(BeamFnApi.monitoringInfoSpec)) + .toArray(size -> new MonitoringInfoSpec[size]); + } + + /** + * Validates provided {link MonitoringInfo} against relevant {link MonitoringInfoSpecs} if + * present. + * + * @return error string if validation fails. + */ + public Optional validate(MonitoringInfo monitoringInfo) { + MonitoringInfoSpec spec = null; + + for (MonitoringInfoSpec specIterator : specs) { + if (monitoringInfo.getUrn().startsWith(specIterator.getUrn())) { + spec = specIterator; + break; + } + } + + // Skip checking unknown MonitoringInfos + if (spec == null) { + return Optional.empty(); + } + + if (!monitoringInfo.getType().equals(spec.getTypeUrn())) { + return Optional.of( + String.format( + "Monitoring info with urn: %s should have type: %s, received %s", + monitoringInfo.getUrn(), spec.getTypeUrn(), monitoringInfo.getType())); + } + + Set requiredLabels = new HashSet<>(spec.getRequiredLabelsList()); + if (!monitoringInfo.getLabelsMap().keySet().containsAll(requiredLabels)) { + return Optional.of( + String.format( + "MonitoringInfo with urn: %s should have labels: %s, actual: %s", + monitoringInfo.getUrn(), requiredLabels, monitoringInfo.getLabelsMap())); + } + + return Optional.empty(); + } +} diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java new file mode 100644 index 0000000000000..4bceb571cb3c0 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/SpecMonitoringInfoValidatorTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.metrics; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo; +import org.junit.Before; +import org.junit.Test; + +/** Relevant tests. */ +public class SpecMonitoringInfoValidatorTest { + + SpecMonitoringInfoValidator testObject = null; + + @Before + public void setUp() throws Exception { + testObject = new SpecMonitoringInfoValidator(); + } + + @Test + public void validateReturnsErrorOnInvalidMonitoringInfoType() { + MonitoringInfo testInput = + MonitoringInfo.newBuilder() + .setUrn("beam:metric:user:someCounter") + .setType("beam:metrics:bad_value") + .build(); + assertTrue(testObject.validate(testInput).isPresent()); + } + + @Test + public void validateReturnsNoErrorOnValidMonitoringInfo() { + MonitoringInfo testInput = + MonitoringInfo.newBuilder() + .setUrn("beam:metric:user:someCounter") + .setType("beam:metrics:sum_int_64") + .putLabels("dummy", "value") + .build(); + assertFalse(testObject.validate(testInput).isPresent()); + + testInput = + MonitoringInfo.newBuilder() + .setUrn("beam:metric:element_count:v1") + .setType("beam:metrics:sum_int_64") + .putLabels("PTRANSFORM", "value") + .putLabels("PCOLLECTION", "anotherValue") + .build(); + assertFalse(testObject.validate(testInput).isPresent()); + } + + @Test + public void validateReturnsErrorOnInvalidMonitoringInfoLabels() { + MonitoringInfo testInput = + MonitoringInfo.newBuilder() + .setUrn("beam:metric:element_count:v1") + .setType("beam:metrics:sum_int_64") + .putLabels("PCOLLECTION", "anotherValue") + .build(); + assertTrue(testObject.validate(testInput).isPresent()); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java index 00b203c884719..d574c2744b4b0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java @@ -266,27 +266,41 @@ private synchronized WorkItemStatus createStatusUpdate(boolean isFinal) { return status; } - // todo(migryz) this method should return List instead of updating member variable @VisibleForTesting synchronized void populateCounterUpdates(WorkItemStatus status) { if (worker == null) { return; } + // Checking against boolean, because getCompleted can return null? boolean isFinalUpdate = Boolean.TRUE.equals(status.getCompleted()); - ImmutableList.Builder counterUpdatesListBuilder = ImmutableList.builder(); + Map counterUpdatesMap = new HashMap<>(); // Output counters - counterUpdatesListBuilder.addAll(extractCounters(worker.getOutputCounters())); + extractCounters(worker.getOutputCounters()).forEach(x -> counterUpdatesMap.put( + x.getStructuredNameAndMetadata() == null ? x.getNameAndKind() + : x.getStructuredNameAndMetadata(), x)); + // User metrics reported in Worker - counterUpdatesListBuilder.addAll(extractMetrics(isFinalUpdate)); + extractMetrics(isFinalUpdate).forEach(x -> counterUpdatesMap.put( + x.getStructuredNameAndMetadata() == null ? x.getNameAndKind() + : x.getStructuredNameAndMetadata(), x)); + + // counterUpdatesListBuilder.addAll(extractMetrics(isFinalUpdate)); // MSec counters reported in worker - counterUpdatesListBuilder.addAll(extractMsecCounters(isFinalUpdate)); - // Metrics reported in SDK runner. - counterUpdatesListBuilder.addAll(worker.extractMetricUpdates()); + extractMsecCounters(isFinalUpdate).forEach(x -> counterUpdatesMap.put( + x.getStructuredNameAndMetadata() == null ? x.getNameAndKind() + : x.getStructuredNameAndMetadata(), x)); - ImmutableList counterUpdates = counterUpdatesListBuilder.build(); - status.setCounterUpdates(counterUpdates); + // Metrics reported in SDK runner. + // This includes all different kinds of metrics coming from SDK. + // Keep in mind that these metrics might contain different types of counter names: + // i.e. structuredNameAndMetadata and nameAndKind + worker.extractMetricUpdates().forEach(x -> counterUpdatesMap.put( + x.getStructuredNameAndMetadata() == null ? x.getNameAndKind() + : x.getStructuredNameAndMetadata(), x)); + + status.setCounterUpdates(ImmutableList.copyOf(counterUpdatesMap.values())); } private synchronized Iterable extractCounters(@Nullable CounterSet counters) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java index 1127a17242a5b..ea2eef9af414c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java @@ -17,9 +17,6 @@ */ package org.apache.beam.runners.dataflow.worker.fn.control; -import com.google.api.services.dataflow.model.CounterMetadata; -import com.google.api.services.dataflow.model.CounterStructuredName; -import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata; import com.google.api.services.dataflow.model.CounterUpdate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -57,12 +54,9 @@ import org.apache.beam.runners.core.metrics.MetricUpdates; import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate; import org.apache.beam.runners.core.metrics.MetricsTranslation; -import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext; import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor; import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter; -import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Origin; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; -import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; import org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation; import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitRequest; @@ -74,6 +68,7 @@ import org.apache.beam.sdk.util.MoreFutures; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * A {@link WorkExecutor} that processes a list of {@link Operation}s. * @@ -143,6 +138,12 @@ public Progress getWorkerProgress() throws Exception { return progressTracker.getWorkerProgress(); } + private static String counterUpdateListToString(Iterable src) { + StringBuilder sb = new StringBuilder(); + src.forEach(x -> sb.append(x.toString() + "\n")); + return sb.toString(); + } + /** * {@inheritDoc} * @@ -151,6 +152,7 @@ public Progress getWorkerProgress() throws Exception { @Override public Iterable extractMetricUpdates() { List result = progressTracker.extractCounterUpdates(); + LOG.error("migryz extractMetricsUpdates\n" + counterUpdateListToString(result)); if ((result != null) && (result.size() > 0)) { return result; } @@ -173,6 +175,8 @@ public Iterable extractMetricUpdates() { update.getKey(), true, update.getUpdate())) .collect(Collectors.toList())); + LOG.error("migryz extractMetricUpdates falling back to metrics\n" + counterUpdateListToString( + deprecatedMetrics)); return deprecatedMetrics; } @@ -356,6 +360,7 @@ void updateProgress() { updateMetricsDeprecated(metrics); // todo(migryz): utilize monitoringInfos here. + // Requires Count metrics to be implemented to test. double elementsConsumed = bundleProcessOperation.getInputElementsConsumed(metrics); grpcWriteOperationElementsProcessed.accept((int) elementsConsumed); @@ -396,102 +401,21 @@ void updateProgress() { } } - // Will extract to separate file and generalize when more counter types are added. - // todo(migryz): define counter transformer factory - // that can provide respective counter transformer for different type of counters. - // (ie RowCountCounterTranformer, MSecCounterTransformer, UserCounterTransformer, etc) - private static class MonitoringInfoToCounterUpdateTransformer { - - private final Map transformIdMapping; - - public MonitoringInfoToCounterUpdateTransformer( - final Map transformIdMapping) { - this.transformIdMapping = transformIdMapping; - } - - // todo: search code for "beam:metrics"... and replace them with relevant enums from - // proto after rebasing above https://github.com/apache/beam/pull/6799 that - // introduces relevant proto entries. - final String BEAM_METRICS_USER_PREFIX = "beam:metric:user"; - - private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo monitoringInfo) { - long value = monitoringInfo.getMetric().getCounterData().getInt64Value(); - String urn = monitoringInfo.getUrn(); - - String type = monitoringInfo.getType(); - - // todo(migryz): run MonitoringInfo through Proto validation process. - // Requires https://github.com/apache/beam/pull/6799 to be merged. - if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) { - if (!type.equals("beam:metrics:sum_int_64")) { - LOG.info( - "Encountered user-counter MonitoringInfo with unexpected type." - + "Expected: beam:metrics:sum_int_64. Received: " - + monitoringInfo.toString()); - return null; - } - - final String ptransform = monitoringInfo.getLabelsMap().get("PTRANSFORM"); - if (ptransform == null) { - LOG.info( - "Encountered user-counter MonitoringInfo with missing ptransformId: " - + monitoringInfo.toString()); - return null; - } - - DataflowStepContext stepContext = transformIdMapping.get(ptransform); - if (stepContext == null) { - LOG.info( - "Encountered user-counter MonitoringInfo with unknown ptransformId: " - + monitoringInfo.toString()); - return null; - } - - CounterStructuredNameAndMetadata name = new CounterStructuredNameAndMetadata(); - - String nameWithNamespace = - monitoringInfo - .getUrn() - .substring(BEAM_METRICS_USER_PREFIX.length()) - .replace("^:", ""); - - final int lastColonIndex = nameWithNamespace.lastIndexOf(':'); - String counterName = nameWithNamespace.substring(lastColonIndex + 1); - String counterNamespace = nameWithNamespace.substring(0, lastColonIndex); - - name.setName( - new CounterStructuredName() - .setOrigin(Origin.USER.toString()) - // Workaround for bug in python sdk that missed colon after ...metric:user. - .setName(counterName) - .setOriginalStepName(stepContext.getNameContext().originalName()) - .setExecutionStepName(stepContext.getNameContext().systemName()) - .setOriginNamespace(counterNamespace)) - .setMetadata(new CounterMetadata().setKind("SUM")); - - return new CounterUpdate() - .setStructuredNameAndMetadata(name) - .setCumulative(false) - .setInteger(DataflowCounterUpdateExtractor.longToSplitInt(value)); - } - return null; - } - } - /** * Updates internal metrics state from provided monitoringInfos list. * * @param monitoringInfos Usually received from FnApi. */ private void updateMetrics(List monitoringInfos) { + LOG.error("migryz updateMetrics start"); final MonitoringInfoToCounterUpdateTransformer monitoringInfoToCounterUpdateTransformer = - new MonitoringInfoToCounterUpdateTransformer( + new FnApiMonitoringInfoToCounterUpdateTransformer( bundleProcessOperation.getPtransformIdToUserStepContext()); counterUpdates = monitoringInfos .stream() - .map(monitoringInfoToCounterUpdateTransformer::monitoringInfoToCounterUpdate) + .map(monitoringInfoToCounterUpdateTransformer::transform) .filter(Objects::nonNull) .collect(Collectors.toList()); } @@ -589,6 +513,8 @@ public void stop() { // Set final metrics to precisely the values in this update. This should overwrite, not // be combined with, all prior updates. + counterUpdates.clear(); + deprecatedCounterUpdates.clear(); deprecatedDistributionUpdates.clear(); deprecatedGaugeUpdates.clear(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java new file mode 100644 index 0000000000000..aeb790b36c9aa --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.fn.control; + +import com.google.api.services.dataflow.model.CounterUpdate; +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo; +import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext; + +public class FnApiMonitoringInfoToCounterUpdateTransformer + implements MonitoringInfoToCounterUpdateTransformer { + + final UserMonitoringInfoToCounterUpdateTransformer userCounterTransformer; + final Map counterTransformers = new HashMap<>(); + + public FnApiMonitoringInfoToCounterUpdateTransformer( + Map stepContextMap) { + SpecMonitoringInfoValidator specValidator = new SpecMonitoringInfoValidator(); + this.userCounterTransformer = + new UserMonitoringInfoToCounterUpdateTransformer(specValidator, stepContextMap); + + MSecMonitoringInfoToCounterUpdateTransformer msecTransformer = + new MSecMonitoringInfoToCounterUpdateTransformer(specValidator, stepContextMap); + for (String urn : msecTransformer.getSupportedUrns()) { + this.counterTransformers.put(urn, msecTransformer); + } + } + + /** + * Allows for injection of user and generic counter transformers for more convenient testing. + */ + @VisibleForTesting + public FnApiMonitoringInfoToCounterUpdateTransformer( + UserMonitoringInfoToCounterUpdateTransformer userCounterTransformer, + Map counterTransformers) { + this.userCounterTransformer = userCounterTransformer; + this.counterTransformers.putAll(counterTransformers); + } + + @Override + public CounterUpdate transform(MonitoringInfo src) { + String urn = src.getUrn(); + if (urn.startsWith(userCounterTransformer.getSupportedUrnPrefix())) { + return userCounterTransformer.transform(src); + } + + MonitoringInfoToCounterUpdateTransformer transformer = counterTransformers.get(urn); + return transformer == null ? null : transformer.transform(src); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformer.java new file mode 100644 index 0000000000000..5d138486cdd8e --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformer.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.fn.control; + +import com.google.api.services.dataflow.model.CounterMetadata; +import com.google.api.services.dataflow.model.CounterStructuredName; +import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata; +import com.google.api.services.dataflow.model.CounterUpdate; +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo; +import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext; +import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MSecMonitoringInfoToCounterUpdateTransformer + implements MonitoringInfoToCounterUpdateTransformer { + + private static final Logger LOG = LoggerFactory.getLogger(BeamFnMapTaskExecutor.class); + + private SpecMonitoringInfoValidator specValidator; + private Map transformIdMapping; + private Map urnToCounterNameMapping; + + /** + * @param specValidator SpecMonitoringInfoValidator to utilize for default validation. + * @param transformIdMapping Mapping of PTransform ID string to DataflowStepContext. + */ + public MSecMonitoringInfoToCounterUpdateTransformer( + SpecMonitoringInfoValidator specValidator, + Map transformIdMapping) { + this.specValidator = specValidator; + this.transformIdMapping = transformIdMapping; + urnToCounterNameMapping = createKnownUrnToCounterNameMapping(); + } + + /** + * Allows to inject urnToCounterNameMapping for cleaner testing. + * + * @param specValidator + * @param transformIdMapping + * @param urnToCounterNameMapping + */ + @VisibleForTesting + protected MSecMonitoringInfoToCounterUpdateTransformer( + SpecMonitoringInfoValidator specValidator, + Map transformIdMapping, + Map urnToCounterNameMapping) { + this.specValidator = specValidator; + this.transformIdMapping = transformIdMapping; + this.urnToCounterNameMapping = urnToCounterNameMapping; + } + + @VisibleForTesting + protected Map createKnownUrnToCounterNameMapping() { + Map result = new HashMap<>(); + result.put("beam:metric:pardo_execution_time:start_bundle_msecs:v1", "start-msecs"); + result.put("beam:metric:pardo_execution_time:process_bundle_msecs:v1", "process-msecs"); + result.put("beam:metric:pardo_execution_time:finish_bundle_msecs:v1", "finish-msecs"); + return result; + } + + /** + * Validates provided monitoring info against specs and common safety checks. + * @param monitoringInfo to validate. + * @return Optional.empty() all validation checks are passed. Optional with error text otherwise. + * @throws RuntimeException if received unexpected urn. + */ + protected Optional validate(MonitoringInfo monitoringInfo) { + Optional validatorResult = specValidator.validate(monitoringInfo); + if (validatorResult.isPresent()) { + return validatorResult; + } + + String urn = monitoringInfo.getUrn(); + if (!urnToCounterNameMapping.keySet().contains(urn)) { + throw new RuntimeException(String.format("Received unexpected counter urn: %s", urn)); + } + + final String ptransform = monitoringInfo.getLabelsMap().get("PTRANSFORM"); + DataflowStepContext stepContext = transformIdMapping.get(ptransform); + if (stepContext == null) { + return Optional.of( + "Encountered MSec MonitoringInfo with unknown ptransformId: " + + monitoringInfo.toString()); + } + + return Optional.empty(); + } + + @Override + public CounterUpdate transform(MonitoringInfo monitoringInfo) { + Optional validationResult = validate(monitoringInfo); + if (validationResult.isPresent()) { + LOG.info(validationResult.get()); + return null; + } + + long value = monitoringInfo.getMetric().getCounterData().getInt64Value(); + String urn = monitoringInfo.getUrn(); + + final String ptransform = monitoringInfo.getLabelsMap().get("PTRANSFORM"); + DataflowStepContext stepContext = transformIdMapping.get(ptransform); + + String counterName = urnToCounterNameMapping.get(urn); + CounterStructuredNameAndMetadata name = new CounterStructuredNameAndMetadata(); + name.setName( + new CounterStructuredName() + .setOrigin("SYSTEM") + .setName(counterName) + .setOriginalStepName(stepContext.getNameContext().originalName()) + .setExecutionStepName(stepContext.getNameContext().stageName())) + .setMetadata(new CounterMetadata().setKind("SUM")); + + return new CounterUpdate() + .setStructuredNameAndMetadata(name) + .setCumulative(true) + .setInteger(DataflowCounterUpdateExtractor.longToSplitInt(value)); + } + + /** + * @return iterable of Urns that this transformer can convert to CounterUpdates. + */ + public Iterable getSupportedUrns() { + return this.urnToCounterNameMapping.keySet(); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MonitoringInfoToCounterUpdateTransformer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MonitoringInfoToCounterUpdateTransformer.java new file mode 100644 index 0000000000000..974fc9637ee17 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/MonitoringInfoToCounterUpdateTransformer.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.fn.control; + +import com.google.api.services.dataflow.model.CounterUpdate; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo; + +interface MonitoringInfoToCounterUpdateTransformer { + + /** + * Method should transform MonitoringInfo to relevant CounterUpdate class as required for + * DataflowRunner. + * + * @param src + * @return CounterUpdate or null if MonitoringInfo is invalid/unsupported. + */ + CounterUpdate transform(MonitoringInfo src); +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java new file mode 100644 index 0000000000000..f909cbf288c42 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformer.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.fn.control; + +import com.google.api.services.dataflow.model.CounterMetadata; +import com.google.api.services.dataflow.model.CounterStructuredName; +import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata; +import com.google.api.services.dataflow.model.CounterUpdate; +import com.google.common.annotations.VisibleForTesting; +import java.util.Map; +import java.util.Optional; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpecs.Enum; +import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext; +import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Origin; +import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class for transforming MonitoringInfo's containing User counter values, to relevant CounterUpdate + * proto. + */ +class UserMonitoringInfoToCounterUpdateTransformer + implements MonitoringInfoToCounterUpdateTransformer { + + private static final Logger LOG = LoggerFactory.getLogger(BeamFnMapTaskExecutor.class); + + private final Map transformIdMapping; + + private final SpecMonitoringInfoValidator specValidator; + + public UserMonitoringInfoToCounterUpdateTransformer( + final SpecMonitoringInfoValidator specMonitoringInfoValidator, + final Map transformIdMapping) { + this.transformIdMapping = transformIdMapping; + this.specValidator = specMonitoringInfoValidator; + } + + static final String BEAM_METRICS_USER_PREFIX = + Enum.USER_COUNTER + .getValueDescriptor() + .getOptions() + .getExtension(BeamFnApi.monitoringInfoSpec) + .getUrn(); + + private Optional validate(MonitoringInfo monitoringInfo) { + Optional validatorResult = specValidator.validate(monitoringInfo); + if (validatorResult.isPresent()) { + return validatorResult; + } + + String urn = monitoringInfo.getUrn(); + if (!urn.startsWith(BEAM_METRICS_USER_PREFIX)) { + throw new RuntimeException( + String.format( + "Received unexpected counter urn. Expected urn starting with: %s, received: %s", + BEAM_METRICS_USER_PREFIX, urn)); + } + + final String ptransform = monitoringInfo.getLabelsMap().get("PTRANSFORM"); + DataflowStepContext stepContext = transformIdMapping.get(ptransform); + if (stepContext == null) { + return Optional.of( + "Encountered user-counter MonitoringInfo with unknown ptransformId: " + + monitoringInfo.toString()); + } + return Optional.empty(); + } + + /** + * Transforms user counter MonitoringInfo to relevant CounterUpdate. + * + * @return Relevant CounterUpdate or null if transformation failed. + */ + @Override + public CounterUpdate transform(MonitoringInfo monitoringInfo) { + Optional validationResult = validate(monitoringInfo); + if (validationResult.isPresent()) { + LOG.info(validationResult.get()); + return null; + } + + long value = monitoringInfo.getMetric().getCounterData().getInt64Value(); + String urn = monitoringInfo.getUrn(); + + final String ptransform = monitoringInfo.getLabelsMap().get("PTRANSFORM"); + + CounterStructuredNameAndMetadata name = new CounterStructuredNameAndMetadata(); + + String nameWithNamespace = urn.substring(BEAM_METRICS_USER_PREFIX.length()).replace("^:", ""); + + final int lastColonIndex = nameWithNamespace.lastIndexOf(':'); + String counterName = nameWithNamespace.substring(lastColonIndex + 1); + String counterNamespace = + lastColonIndex == -1 ? "" : nameWithNamespace.substring(0, lastColonIndex); + + DataflowStepContext stepContext = transformIdMapping.get(ptransform); + name.setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + // Workaround for bug in python sdk that missed colon after ...metric:user. + .setName(counterName) + .setOriginalStepName(stepContext.getNameContext().originalName()) + // Original implementation didn't have Execution name, so skipping this one. + // .setExecutionStepName(stepContext.getNameContext().stageName()) + .setOriginNamespace(counterNamespace)) + .setMetadata(new CounterMetadata().setKind("SUM")); + + return new CounterUpdate() + .setStructuredNameAndMetadata(name) + .setCumulative(true) + .setInteger(DataflowCounterUpdateExtractor.longToSplitInt(value)); + } + + /** + * @return MonitoringInfo urns prefix that this transformer can convert to CounterUpdates. + */ + public String getSupportedUrnPrefix() { + return BEAM_METRICS_USER_PREFIX; + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java index 1c52a5a2c727f..c62565d0c210c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java @@ -19,7 +19,9 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.when; @@ -434,7 +436,7 @@ public void close() {} contains(new CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(finalCounterValue))); } - @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10) + @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 60) public void testExtractCounterUpdatesReturnsValidProgressTrackerCounterUpdatesIfPresent() throws Exception { final String stepName = "fakeStepNameWithUserMetrics"; @@ -554,11 +556,13 @@ public void close() {} metricsCounterUpdates = mapTaskExecutor.extractMetricUpdates(); assertThat(Iterables.size(metricsCounterUpdates), equalTo(1)); + CounterUpdate resultCounter = metricsCounterUpdates.iterator().next(); - assertThat( - metricsCounterUpdates, - contains( - new CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(expectedCounterValue))); + assertTrue( + new CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(expectedCounterValue) + .matches(resultCounter)); + assertEquals( + "ExpectedCounter", resultCounter.getStructuredNameAndMetadata().getName().getName()); } /** diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java new file mode 100644 index 0000000000000..fb66ff284221e --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/FnApiMonitoringInfoToCounterUpdateTransformerTest.java @@ -0,0 +1,76 @@ +package org.apache.beam.runners.dataflow.worker.fn.control; + +import static org.junit.Assert.assertSame; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import com.google.api.services.dataflow.model.CounterUpdate; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + + +public class FnApiMonitoringInfoToCounterUpdateTransformerTest { + + @Mock + private UserMonitoringInfoToCounterUpdateTransformer mockUserCounterTransformer; + + @Mock + private UserMonitoringInfoToCounterUpdateTransformer mockGenericTransformer1; + + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testTransformUtilizesUserCounterTransformerForUserCounters() { + Map genericTransformers = Collections.EMPTY_MAP; + FnApiMonitoringInfoToCounterUpdateTransformer testObject = + new FnApiMonitoringInfoToCounterUpdateTransformer(mockUserCounterTransformer, + genericTransformers); + + CounterUpdate expectedResult = new CounterUpdate(); + when(mockUserCounterTransformer.transform(any())).thenReturn(expectedResult); + when(mockUserCounterTransformer.getSupportedUrnPrefix()).thenReturn("user:prefix:"); + + MonitoringInfo monitoringInfo = MonitoringInfo.newBuilder() + .setUrn("user:prefix:anyNamespace:anyName").putLabels("PTRANSFORM", "anyValue") + .build(); + + CounterUpdate result = testObject.transform(monitoringInfo); + + assertSame(expectedResult, result); + } + + @Test + public void testTransformUtilizesRelevantCounterTransformerForNonUserCounters() { + Map genericTransformers = new HashMap<>(); + final String validUrn = "urn1"; + genericTransformers.put(validUrn, mockGenericTransformer1); + + when(mockUserCounterTransformer.getSupportedUrnPrefix()).thenReturn("invalid:prefix:"); + + FnApiMonitoringInfoToCounterUpdateTransformer testObject = + new FnApiMonitoringInfoToCounterUpdateTransformer(mockUserCounterTransformer, + genericTransformers); + + CounterUpdate expectedResult = new CounterUpdate(); + when(mockGenericTransformer1.transform(any())).thenReturn(expectedResult); + + MonitoringInfo monitoringInfo = MonitoringInfo.newBuilder() + .setUrn(validUrn).putLabels("PTRANSFORM", "anyValue") + .build(); + + CounterUpdate result = testObject.transform(monitoringInfo); + + assertSame(expectedResult, result); + } +} \ No newline at end of file diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformerTest.java new file mode 100644 index 0000000000000..5f44b6b5ebbde --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/MSecMonitoringInfoToCounterUpdateTransformerTest.java @@ -0,0 +1,145 @@ +package org.apache.beam.runners.dataflow.worker.fn.control; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.api.services.dataflow.model.CounterUpdate; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo; +import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext; +import org.apache.beam.runners.dataflow.worker.counters.NameContext; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class MSecMonitoringInfoToCounterUpdateTransformerTest { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Mock + private SpecMonitoringInfoValidator mockSpecValidator; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testTransformReturnsNullIfSpecValidationFails() { + Map counterNameMapping = new HashMap<>(); + counterNameMapping.put("beam:counter:supported", "supportedCounter"); + + Map stepContextMapping = new HashMap<>(); + + MSecMonitoringInfoToCounterUpdateTransformer testObject = + new MSecMonitoringInfoToCounterUpdateTransformer(mockSpecValidator, stepContextMapping, + counterNameMapping); + + Optional error = Optional.of("Error text"); + when(mockSpecValidator.validate(any())).thenReturn(error); + + MonitoringInfo monitoringInfo = MonitoringInfo.newBuilder() + .setUrn("beam:metric:pardo_execution_time:start_bundle_msecs:v1:invalid").build(); + assertEquals(null, testObject.transform(monitoringInfo)); + } + + @Test + public void testTransformThrowsIfMonitoringInfoWithUnknownUrnReceived() { + Map counterNameMapping = new HashMap<>(); + counterNameMapping.put("beam:counter:supported", "supportedCounter"); + + Map stepContextMapping = new HashMap<>(); + MonitoringInfo monitoringInfo = MonitoringInfo.newBuilder() + .setUrn("beam:metric:pardo_execution_time:start_bundle_msecs:v1:invalid").build(); + + MSecMonitoringInfoToCounterUpdateTransformer testObject = + new MSecMonitoringInfoToCounterUpdateTransformer(mockSpecValidator, stepContextMapping, + counterNameMapping); + + when(mockSpecValidator.validate(any())).thenReturn(Optional.empty()); + + exception.expect(RuntimeException.class); + testObject.transform(monitoringInfo); + } + + @Test + public void testTransformThrowsIfMonitoringInfoWithUnknownPTransformLabelPresent() { + Map counterNameMapping = new HashMap<>(); + counterNameMapping.put("beam:counter:supported", "supportedCounter"); + + Map stepContextMapping = new HashMap<>(); + + MSecMonitoringInfoToCounterUpdateTransformer testObject = + new MSecMonitoringInfoToCounterUpdateTransformer(mockSpecValidator, stepContextMapping); + + when(mockSpecValidator.validate(any())).thenReturn(Optional.empty()); + + MonitoringInfo monitoringInfo = MonitoringInfo.newBuilder() + .setUrn("beam:counter:unsupported") + .putLabels("PTRANSFORM", "anyValue") + .build(); + + exception.expect(RuntimeException.class); + testObject.transform(monitoringInfo); + } + + @Test + public void testTransformReturnsValidCounterUpdateWhenValidMSecMonitoringInfoReceived() { + // Setup + Map counterNameMapping = new HashMap<>(); + counterNameMapping.put("beam:counter:supported", "supportedCounter"); + + Map stepContextMapping = new HashMap<>(); + NameContext nc = NameContext + .create("anyStageName", "anyOriginalName", "anySystemName", "anyUserName"); + DataflowStepContext dsc = mock(DataflowStepContext.class); + when(dsc.getNameContext()).thenReturn(nc); + stepContextMapping.put("anyValue", dsc); + + MSecMonitoringInfoToCounterUpdateTransformer testObject = + new MSecMonitoringInfoToCounterUpdateTransformer(mockSpecValidator, stepContextMapping, + counterNameMapping); + when(mockSpecValidator.validate(any())).thenReturn(Optional.empty()); + + // Execute + MonitoringInfo monitoringInfo = MonitoringInfo.newBuilder() + .setUrn("beam:counter:supported").putLabels("PTRANSFORM", "anyValue") + .build(); + + CounterUpdate result = testObject.transform(monitoringInfo); + + // Validate + assertNotEquals(null, result); + + assertEquals( + "{cumulative=true, integer={highBits=0, lowBits=0}, " + + "structuredNameAndMetadata={metadata={kind=SUM}, " + + "name={executionStepName=anyStageName, name=supportedCounter, origin=SYSTEM, " + + "originalStepName=anyOriginalName}}}", + result.toString()); + } + + @Test + public void testCreateKnownUrnToCounterNameMappingRetursExpectedValues() { + Map stepContextMapping = new HashMap<>(); + MSecMonitoringInfoToCounterUpdateTransformer testObject = + new MSecMonitoringInfoToCounterUpdateTransformer(mockSpecValidator, stepContextMapping); + Map result = testObject.createKnownUrnToCounterNameMapping(); + assertEquals("process-msecs", + result.get("beam:metric:pardo_execution_time:process_bundle_msecs:v1")); + assertEquals("finish-msecs", + result.get("beam:metric:pardo_execution_time:finish_bundle_msecs:v1")); + assertEquals("start-msecs", + result.get("beam:metric:pardo_execution_time:start_bundle_msecs:v1")); + } +} \ No newline at end of file diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java index 303cb51eb1281..1f1ffc1dc43c8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java @@ -22,22 +22,30 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.api.services.dataflow.model.CounterMetadata; +import com.google.api.services.dataflow.model.CounterStructuredName; +import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata; +import com.google.api.services.dataflow.model.NameAndKind; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableTable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java new file mode 100644 index 0000000000000..80182acebf022 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/UserMonitoringInfoToCounterUpdateTransformerTest.java @@ -0,0 +1,98 @@ +package org.apache.beam.runners.dataflow.worker.fn.control; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.api.services.dataflow.model.CounterUpdate; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo; +import org.apache.beam.runners.core.metrics.SpecMonitoringInfoValidator; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext; +import org.apache.beam.runners.dataflow.worker.counters.NameContext; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class UserMonitoringInfoToCounterUpdateTransformerTest { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Mock + private SpecMonitoringInfoValidator mockSpecValidator; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testTransformReturnsNullIfSpecValidationFails() { + Map stepContextMapping = new HashMap<>(); + UserMonitoringInfoToCounterUpdateTransformer testObject = + new UserMonitoringInfoToCounterUpdateTransformer(mockSpecValidator, stepContextMapping); + Optional error = Optional.of("Error text"); + when(mockSpecValidator.validate(any())).thenReturn(error); + assertEquals(null, testObject.transform(null)); + } + + @Test + public void testTransformThrowsIfMonitoringInfoWithWrongUrnPrefixReceived() { + Map stepContextMapping = new HashMap<>(); + MonitoringInfo monitoringInfo = MonitoringInfo.newBuilder() + .setUrn("beam:metric:element_count:v1").build(); + UserMonitoringInfoToCounterUpdateTransformer testObject = + new UserMonitoringInfoToCounterUpdateTransformer(mockSpecValidator, stepContextMapping); + when(mockSpecValidator.validate(any())).thenReturn(Optional.empty()); + + exception.expect(RuntimeException.class); + testObject.transform(monitoringInfo); + } + + @Test + public void testTransformReturnsNullIfMonitoringInfoWithUnknownPTransformLabelPresent() { + Map stepContextMapping = new HashMap<>(); + MonitoringInfo monitoringInfo = MonitoringInfo.newBuilder() + .setUrn("beam:metric:user:anyNamespace:anyName").putLabels("PTRANSFORM", "anyValue") + .build(); + UserMonitoringInfoToCounterUpdateTransformer testObject = + new UserMonitoringInfoToCounterUpdateTransformer(mockSpecValidator, stepContextMapping); + when(mockSpecValidator.validate(any())).thenReturn(Optional.empty()); + assertEquals(null, testObject.transform(monitoringInfo)); + } + + @Test + public void testTransformReturnsValidCounterUpdateWhenValidUserMonitoringInfoReceived() { + Map stepContextMapping = new HashMap<>(); + NameContext nc = NameContext + .create("anyStageName", "anyOriginalName", "anySystemName", "anyUserName"); + DataflowStepContext dsc = mock(DataflowStepContext.class); + when(dsc.getNameContext()).thenReturn(nc); + stepContextMapping.put("anyValue", dsc); + + MonitoringInfo monitoringInfo = MonitoringInfo.newBuilder() + .setUrn("beam:metric:user:anyNamespace:anyName").putLabels("PTRANSFORM", "anyValue") + .build(); + UserMonitoringInfoToCounterUpdateTransformer testObject = + new UserMonitoringInfoToCounterUpdateTransformer(mockSpecValidator, stepContextMapping); + when(mockSpecValidator.validate(any())).thenReturn(Optional.empty()); + + CounterUpdate result = testObject.transform(monitoringInfo); + assertNotEquals(null, result); + + assertEquals( + "{cumulative=true, integer={highBits=0, lowBits=0}, " + + "structuredNameAndMetadata={metadata={kind=SUM}, " + + "name={name=anyName, origin=USER, originNamespace=anyNamespace, " + + "originalStepName=anyOriginalName}}}", + result.toString()); + } +} \ No newline at end of file