Skip to content

Commit

Permalink
Merge pull request #7323: [BEAM-6181] Implemented msec counters suppo…
Browse files Browse the repository at this point in the history
…rt in FnApi world.
  • Loading branch information
swegner committed Jan 23, 2019
2 parents a843a08 + 38c6367 commit 8b4898e
Show file tree
Hide file tree
Showing 13 changed files with 971 additions and 166 deletions.
Expand Up @@ -19,9 +19,6 @@

import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
Expand All @@ -30,7 +27,6 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoTypeUrns;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Splitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -75,6 +71,8 @@ public class SimpleMonitoringInfoBuilder {

private MonitoringInfo.Builder builder;

private SpecMonitoringInfoValidator validator = new SpecMonitoringInfoValidator();

static {
for (MonitoringInfoSpecs.Enum val : MonitoringInfoSpecs.Enum.values()) {
// The enum iterator inserts an UNRECOGNIZED = -1 value which isn't explicitly added in
Expand All @@ -96,56 +94,6 @@ public SimpleMonitoringInfoBuilder(boolean validateAndDropInvalid) {
this.validateAndDropInvalid = validateAndDropInvalid;
}

/** @return True if the MonitoringInfo has valid fields set, matching the spec */
private boolean validate() {
String urn = this.builder.getUrn();
if (urn == null || urn.isEmpty()) {
LOG.warn("Dropping MonitoringInfo since no URN was specified.");
return false;
}

MonitoringInfoSpec spec;
// If it's a user counter, and it has this prefix.
if (urn.startsWith(USER_COUNTER_URN_PREFIX)) {
spec = SimpleMonitoringInfoBuilder.specs.get(USER_COUNTER_URN_PREFIX);
List<String> split = Splitter.on(':').splitToList(urn);
if (split.size() != 5) {
LOG.warn(
"Dropping MonitoringInfo for URN {}, UserMetric namespaces and "
+ "name cannot contain ':' characters.",
urn);
return false;
}
} else if (!SimpleMonitoringInfoBuilder.specs.containsKey(urn)) {
// Succeed for unknown URNs, this is an extensible metric.
return true;
} else {
spec = SimpleMonitoringInfoBuilder.specs.get(urn);
}

if (!this.builder.getType().equals(spec.getTypeUrn())) {
LOG.warn(
"Dropping MonitoringInfo since for URN {} with invalid type field. Expected: {}"
+ " Actual: {}",
this.builder.getUrn(),
spec.getTypeUrn(),
this.builder.getType());
return false;
}

Set<String> requiredLabels = new HashSet<String>(spec.getRequiredLabelsList());
if (!this.builder.getLabels().keySet().equals(requiredLabels)) {
LOG.warn(
"Dropping MonitoringInfo since for URN {} with invalid labels. Expected: {}"
+ " Actual: {}",
this.builder.getUrn(),
requiredLabels,
this.builder.getLabels().keySet());
return false;
}
return true;
}

/** @return The metric URN for a user metric, with a proper URN prefix. */
private static String userMetricUrn(String metricNamespace, String metricName) {
String fixedMetricNamespace = metricNamespace.replace(':', '_');
Expand All @@ -163,8 +111,9 @@ private static String userMetricUrn(String metricNamespace, String metricName) {
*
* @param urn The urn of the MonitoringInfo
*/
public void setUrn(String urn) {
public SimpleMonitoringInfoBuilder setUrn(String urn) {
this.builder.setUrn(urn);
return this;
}

/**
Expand All @@ -173,36 +122,42 @@ public void setUrn(String urn) {
* @param namespace
* @param name
*/
public void setUrnForUserMetric(String namespace, String name) {
public SimpleMonitoringInfoBuilder setUrnForUserMetric(String namespace, String name) {
this.builder.setUrn(userMetricUrn(namespace, name));
return this;
}

/** Sets the timestamp of the MonitoringInfo to the current time. */
public void setTimestampToNow() {
public SimpleMonitoringInfoBuilder setTimestampToNow() {
Instant time = Instant.now();
this.builder.getTimestampBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano());
return this;
}

/** Sets the int64Value of the CounterData in the MonitoringInfo, and the appropraite type URN. */
public void setInt64Value(long value) {
public SimpleMonitoringInfoBuilder setInt64Value(long value) {
this.builder.getMetricBuilder().getCounterDataBuilder().setInt64Value(value);
this.builder.setType(SUM_INT64_TYPE_URN);
return this;
}

/** Sets the PTRANSFORM MonitoringInfo label to the given param. */
public void setPTransformLabel(String pTransform) {
public SimpleMonitoringInfoBuilder setPTransformLabel(String pTransform) {
// TODO(ajamato): Add validation that it is a valid pTransform name in the bundle descriptor.
setLabel("PTRANSFORM", pTransform);
return this;
}

/** Sets the PCOLLECTION MonitoringInfo label to the given param. */
public void setPCollectionLabel(String pCollection) {
public SimpleMonitoringInfoBuilder setPCollectionLabel(String pCollection) {
setLabel("PCOLLECTION", pCollection);
return this;
}

/** Sets the MonitoringInfo label to the given name and value. */
public void setLabel(String labelName, String labelValue) {
public SimpleMonitoringInfoBuilder setLabel(String labelName, String labelValue) {
this.builder.putLabels(labelName, labelValue);
return this;
}

/**
Expand All @@ -211,9 +166,12 @@ public void setLabel(String labelName, String labelValue) {
*/
@Nullable
public MonitoringInfo build() {
if (validateAndDropInvalid && !validate()) {
final MonitoringInfo result = this.builder.build();

if (validateAndDropInvalid && this.validator.validate(result).isPresent()) {
return null;
}
return this.builder.build();

return result;
}
}
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.metrics;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpec;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpecs;

/** Class implements validation of MonitoringInfos against MonitoringInfoSpecs. */
public class SpecMonitoringInfoValidator {
protected final MonitoringInfoSpec[] specs;

public SpecMonitoringInfoValidator() {
specs =
Arrays.stream(MonitoringInfoSpecs.Enum.values())
// Filtering default value for "unknown" Enums. Coming from proto implementation.
.filter(x -> !x.name().equals("UNRECOGNIZED"))
.map(
x -> x.getValueDescriptor().getOptions().getExtension(BeamFnApi.monitoringInfoSpec))
.toArray(size -> new MonitoringInfoSpec[size]);
}

/**
* Validates provided {link MonitoringInfo} against relevant {link MonitoringInfoSpecs} if
* present.
*
* @return error string if validation fails.
*/
public Optional<String> validate(MonitoringInfo monitoringInfo) {
MonitoringInfoSpec spec = null;

for (MonitoringInfoSpec specIterator : specs) {
if (monitoringInfo.getUrn().startsWith(specIterator.getUrn())) {
spec = specIterator;
break;
}
}

// Skip checking unknown MonitoringInfos
if (spec == null) {
return Optional.empty();
}

if (!monitoringInfo.getType().equals(spec.getTypeUrn())) {
return Optional.of(
String.format(
"Monitoring info with urn: %s should have type: %s, received %s",
monitoringInfo.getUrn(), spec.getTypeUrn(), monitoringInfo.getType()));
}

Set<String> requiredLabels = new HashSet<>(spec.getRequiredLabelsList());
if (!monitoringInfo.getLabelsMap().keySet().containsAll(requiredLabels)) {
return Optional.of(
String.format(
"MonitoringInfo with urn: %s should have labels: %s, actual: %s",
monitoringInfo.getUrn(), requiredLabels, monitoringInfo.getLabelsMap()));
}

return Optional.empty();
}
}
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.metrics;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
import org.junit.Before;
import org.junit.Test;

/** Relevant tests. */
public class SpecMonitoringInfoValidatorTest {

SpecMonitoringInfoValidator testObject = null;

@Before
public void setUp() throws Exception {
testObject = new SpecMonitoringInfoValidator();
}

@Test
public void validateReturnsErrorOnInvalidMonitoringInfoType() {
MonitoringInfo testInput =
MonitoringInfo.newBuilder()
.setUrn("beam:metric:user:someCounter")
.setType("beam:metrics:bad_value")
.build();
assertTrue(testObject.validate(testInput).isPresent());
}

@Test
public void validateReturnsNoErrorOnValidMonitoringInfo() {
MonitoringInfo testInput =
MonitoringInfo.newBuilder()
.setUrn(SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX + "someCounter")
.setType(SimpleMonitoringInfoBuilder.SUM_INT64_TYPE_URN)
.putLabels("dummy", "value")
.build();
assertFalse(testObject.validate(testInput).isPresent());

testInput =
MonitoringInfo.newBuilder()
.setUrn(SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN)
.setType(SimpleMonitoringInfoBuilder.SUM_INT64_TYPE_URN)
.putLabels("PTRANSFORM", "value")
.putLabels("PCOLLECTION", "anotherValue")
.build();
assertFalse(testObject.validate(testInput).isPresent());
}

@Test
public void validateReturnsErrorOnInvalidMonitoringInfoLabels() {
MonitoringInfo testInput =
MonitoringInfo.newBuilder()
.setUrn(SimpleMonitoringInfoBuilder.ELEMENT_COUNT_URN)
.setType(SimpleMonitoringInfoBuilder.SUM_INT64_TYPE_URN)
.putLabels("PCOLLECTION", "anotherValue")
.build();
assertTrue(testObject.validate(testInput).isPresent());
}
}
Expand Up @@ -35,6 +35,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.dataflow.util.TimeUtil;
Expand Down Expand Up @@ -266,27 +267,41 @@ private synchronized WorkItemStatus createStatusUpdate(boolean isFinal) {
return status;
}

// todo(migryz) this method should return List<CounterUpdate> instead of updating member variable
@VisibleForTesting
synchronized void populateCounterUpdates(WorkItemStatus status) {
if (worker == null) {
return;
}

// Checking against boolean, because getCompleted can return null
boolean isFinalUpdate = Boolean.TRUE.equals(status.getCompleted());

ImmutableList.Builder<CounterUpdate> counterUpdatesListBuilder = ImmutableList.builder();
Map<Object, CounterUpdate> counterUpdatesMap = new HashMap<>();

final Consumer<CounterUpdate> appendCounterUpdate =
x ->
counterUpdatesMap.put(
x.getStructuredNameAndMetadata() == null
? x.getNameAndKind()
: x.getStructuredNameAndMetadata(),
x);

// Output counters
counterUpdatesListBuilder.addAll(extractCounters(worker.getOutputCounters()));
extractCounters(worker.getOutputCounters()).forEach(appendCounterUpdate);

// User metrics reported in Worker
counterUpdatesListBuilder.addAll(extractMetrics(isFinalUpdate));
extractMetrics(isFinalUpdate).forEach(appendCounterUpdate);

// MSec counters reported in worker
counterUpdatesListBuilder.addAll(extractMsecCounters(isFinalUpdate));
extractMsecCounters(isFinalUpdate).forEach(appendCounterUpdate);

// Metrics reported in SDK runner.
counterUpdatesListBuilder.addAll(worker.extractMetricUpdates());
// This includes all different kinds of metrics coming from SDK.
// Keep in mind that these metrics might contain different types of counter names:
// i.e. structuredNameAndMetadata and nameAndKind
worker.extractMetricUpdates().forEach(appendCounterUpdate);

ImmutableList<CounterUpdate> counterUpdates = counterUpdatesListBuilder.build();
status.setCounterUpdates(counterUpdates);
status.setCounterUpdates(ImmutableList.copyOf(counterUpdatesMap.values()));
}

private synchronized Iterable<CounterUpdate> extractCounters(@Nullable CounterSet counters) {
Expand Down

0 comments on commit 8b4898e

Please sign in to comment.