Skip to content

Commit

Permalink
Implemented msec counters support in FnApi world.
Browse files Browse the repository at this point in the history
Minor refactoring to generalize validation and
counter transformation.
  • Loading branch information
Mikhail Gryzykhin committed Dec 19, 2018
1 parent bd68ef6 commit 92b3642
Show file tree
Hide file tree
Showing 15 changed files with 940 additions and 167 deletions.
1 change: 1 addition & 0 deletions model/fn-execution/src/main/proto/beam_fn_api.proto
Expand Up @@ -682,6 +682,7 @@ message Metrics {

// User defined metrics
message User {

// A key for identifying a metric at the most granular level.
message MetricName {
// (Required): The namespace of this metric.
Expand Down
Expand Up @@ -17,12 +17,8 @@
*/
package org.apache.beam.runners.core.metrics;

import com.google.common.base.Splitter;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
Expand Down Expand Up @@ -75,6 +71,8 @@ public class SimpleMonitoringInfoBuilder {

private MonitoringInfo.Builder builder;

private SpecMonitoringInfoValidator validator = new SpecMonitoringInfoValidator();

static {
for (MonitoringInfoSpecs.Enum val : MonitoringInfoSpecs.Enum.values()) {
// The enum iterator inserts an UNRECOGNIZED = -1 value which isn't explicitly added in
Expand All @@ -96,56 +94,6 @@ public SimpleMonitoringInfoBuilder(boolean validateAndDropInvalid) {
this.validateAndDropInvalid = validateAndDropInvalid;
}

/** @return True if the MonitoringInfo has valid fields set, matching the spec */
private boolean validate() {
String urn = this.builder.getUrn();
if (urn == null || urn.isEmpty()) {
LOG.warn("Dropping MonitoringInfo since no URN was specified.");
return false;
}

MonitoringInfoSpec spec;
// If it's a user counter, and it has this prefix.
if (urn.startsWith(USER_COUNTER_URN_PREFIX)) {
spec = SimpleMonitoringInfoBuilder.specs.get(USER_COUNTER_URN_PREFIX);
List<String> split = Splitter.on(':').splitToList(urn);
if (split.size() != 5) {
LOG.warn(
"Dropping MonitoringInfo for URN {}, UserMetric namespaces and "
+ "name cannot contain ':' characters.",
urn);
return false;
}
} else if (!SimpleMonitoringInfoBuilder.specs.containsKey(urn)) {
// Succeed for unknown URNs, this is an extensible metric.
return true;
} else {
spec = SimpleMonitoringInfoBuilder.specs.get(urn);
}

if (!this.builder.getType().equals(spec.getTypeUrn())) {
LOG.warn(
"Dropping MonitoringInfo since for URN {} with invalid type field. Expected: {}"
+ " Actual: {}",
this.builder.getUrn(),
spec.getTypeUrn(),
this.builder.getType());
return false;
}

Set<String> requiredLabels = new HashSet<String>(spec.getRequiredLabelsList());
if (!this.builder.getLabels().keySet().equals(requiredLabels)) {
LOG.warn(
"Dropping MonitoringInfo since for URN {} with invalid labels. Expected: {}"
+ " Actual: {}",
this.builder.getUrn(),
requiredLabels,
this.builder.getLabels().keySet());
return false;
}
return true;
}

/** @return The metric URN for a user metric, with a proper URN prefix. */
private static String userMetricUrn(String metricNamespace, String metricName) {
String fixedMetricNamespace = metricNamespace.replace(':', '_');
Expand All @@ -163,8 +111,9 @@ private static String userMetricUrn(String metricNamespace, String metricName) {
*
* @param urn The urn of the MonitoringInfo
*/
public void setUrn(String urn) {
public SimpleMonitoringInfoBuilder setUrn(String urn) {
this.builder.setUrn(urn);
return this;
}

/**
Expand All @@ -173,36 +122,42 @@ public void setUrn(String urn) {
* @param namespace
* @param name
*/
public void setUrnForUserMetric(String namespace, String name) {
public SimpleMonitoringInfoBuilder setUrnForUserMetric(String namespace, String name) {
this.builder.setUrn(userMetricUrn(namespace, name));
return this;
}

/** Sets the timestamp of the MonitoringInfo to the current time. */
public void setTimestampToNow() {
public SimpleMonitoringInfoBuilder setTimestampToNow() {
Instant time = Instant.now();
this.builder.getTimestampBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano());
return this;
}

/** Sets the int64Value of the CounterData in the MonitoringInfo, and the appropraite type URN. */
public void setInt64Value(long value) {
public SimpleMonitoringInfoBuilder setInt64Value(long value) {
this.builder.getMetricBuilder().getCounterDataBuilder().setInt64Value(value);
this.builder.setType(SUM_INT64_TYPE_URN);
return this;
}

/** Sets the PTRANSFORM MonitoringInfo label to the given param. */
public void setPTransformLabel(String pTransform) {
public SimpleMonitoringInfoBuilder setPTransformLabel(String pTransform) {
// TODO(ajamato): Add validation that it is a valid pTransform name in the bundle descriptor.
setLabel("PTRANSFORM", pTransform);
return this;
}

/** Sets the PCOLLECTION MonitoringInfo label to the given param. */
public void setPCollectionLabel(String pCollection) {
public SimpleMonitoringInfoBuilder setPCollectionLabel(String pCollection) {
setLabel("PCOLLECTION", pCollection);
return this;
}

/** Sets the MonitoringInfo label to the given name and value. */
public void setLabel(String labelName, String labelValue) {
public SimpleMonitoringInfoBuilder setLabel(String labelName, String labelValue) {
this.builder.putLabels(labelName, labelValue);
return this;
}

/**
Expand All @@ -211,9 +166,12 @@ public void setLabel(String labelName, String labelValue) {
*/
@Nullable
public MonitoringInfo build() {
if (validateAndDropInvalid && !validate()) {
final MonitoringInfo result = this.builder.build();

if (validateAndDropInvalid && this.validator.validate(result).isPresent()) {
return null;
}
return this.builder.build();

return result;
}
}
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.metrics;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpec;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoSpecs;

/** Class implements validation of MonitoringInfos against MonitoringInfoSpecs. */
public class SpecMonitoringInfoValidator {
protected final MonitoringInfoSpec[] specs;

public SpecMonitoringInfoValidator() {
specs =
Arrays.stream(MonitoringInfoSpecs.Enum.values())
.filter(x -> !(x).name().equals("UNRECOGNIZED"))
.map(
x -> x.getValueDescriptor().getOptions().getExtension(BeamFnApi.monitoringInfoSpec))
.toArray(size -> new MonitoringInfoSpec[size]);
}

/**
* Validates provided {link MonitoringInfo} against relevant {link MonitoringInfoSpecs} if
* present.
*
* @return error string if validation fails.
*/
public Optional<String> validate(MonitoringInfo monitoringInfo) {
MonitoringInfoSpec spec = null;

for (MonitoringInfoSpec specIterator : specs) {
if (monitoringInfo.getUrn().startsWith(specIterator.getUrn())) {
spec = specIterator;
break;
}
}

// Skip checking unknown MonitoringInfos
if (spec == null) {
return Optional.empty();
}

if (!monitoringInfo.getType().equals(spec.getTypeUrn())) {
return Optional.of(
String.format(
"Monitoring info with urn: %s should have type: %s, received %s",
monitoringInfo.getUrn(), spec.getTypeUrn(), monitoringInfo.getType()));
}

Set<String> requiredLabels = new HashSet<>(spec.getRequiredLabelsList());
if (!monitoringInfo.getLabelsMap().keySet().containsAll(requiredLabels)) {
return Optional.of(
String.format(
"MonitoringInfo with urn: %s should have labels: %s, actual: %s",
monitoringInfo.getUrn(), requiredLabels, monitoringInfo.getLabelsMap()));
}

return Optional.empty();
}
}
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.metrics;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
import org.junit.Before;
import org.junit.Test;

/** Relevant tests. */
public class SpecMonitoringInfoValidatorTest {

SpecMonitoringInfoValidator testObject = null;

@Before
public void setUp() throws Exception {
testObject = new SpecMonitoringInfoValidator();
}

@Test
public void validateReturnsErrorOnInvalidMonitoringInfoType() {
MonitoringInfo testInput =
MonitoringInfo.newBuilder()
.setUrn("beam:metric:user:someCounter")
.setType("beam:metrics:bad_value")
.build();
assertTrue(testObject.validate(testInput).isPresent());
}

@Test
public void validateReturnsNoErrorOnValidMonitoringInfo() {
MonitoringInfo testInput =
MonitoringInfo.newBuilder()
.setUrn("beam:metric:user:someCounter")
.setType("beam:metrics:sum_int_64")
.putLabels("dummy", "value")
.build();
assertFalse(testObject.validate(testInput).isPresent());

testInput =
MonitoringInfo.newBuilder()
.setUrn("beam:metric:element_count:v1")
.setType("beam:metrics:sum_int_64")
.putLabels("PTRANSFORM", "value")
.putLabels("PCOLLECTION", "anotherValue")
.build();
assertFalse(testObject.validate(testInput).isPresent());
}

@Test
public void validateReturnsErrorOnInvalidMonitoringInfoLabels() {
MonitoringInfo testInput =
MonitoringInfo.newBuilder()
.setUrn("beam:metric:element_count:v1")
.setType("beam:metrics:sum_int_64")
.putLabels("PCOLLECTION", "anotherValue")
.build();
assertTrue(testObject.validate(testInput).isPresent());
}
}
Expand Up @@ -266,27 +266,41 @@ private synchronized WorkItemStatus createStatusUpdate(boolean isFinal) {
return status;
}

// todo(migryz) this method should return List<CounterUpdate> instead of updating member variable
@VisibleForTesting
synchronized void populateCounterUpdates(WorkItemStatus status) {
if (worker == null) {
return;
}

// Checking against boolean, because getCompleted can return null?
boolean isFinalUpdate = Boolean.TRUE.equals(status.getCompleted());

ImmutableList.Builder<CounterUpdate> counterUpdatesListBuilder = ImmutableList.builder();
Map<Object, CounterUpdate> counterUpdatesMap = new HashMap<>();
// Output counters
counterUpdatesListBuilder.addAll(extractCounters(worker.getOutputCounters()));
extractCounters(worker.getOutputCounters()).forEach(x -> counterUpdatesMap.put(
x.getStructuredNameAndMetadata() == null ? x.getNameAndKind()
: x.getStructuredNameAndMetadata(), x));

// User metrics reported in Worker
counterUpdatesListBuilder.addAll(extractMetrics(isFinalUpdate));
extractMetrics(isFinalUpdate).forEach(x -> counterUpdatesMap.put(
x.getStructuredNameAndMetadata() == null ? x.getNameAndKind()
: x.getStructuredNameAndMetadata(), x));

// counterUpdatesListBuilder.addAll(extractMetrics(isFinalUpdate));
// MSec counters reported in worker
counterUpdatesListBuilder.addAll(extractMsecCounters(isFinalUpdate));
// Metrics reported in SDK runner.
counterUpdatesListBuilder.addAll(worker.extractMetricUpdates());
extractMsecCounters(isFinalUpdate).forEach(x -> counterUpdatesMap.put(
x.getStructuredNameAndMetadata() == null ? x.getNameAndKind()
: x.getStructuredNameAndMetadata(), x));

ImmutableList<CounterUpdate> counterUpdates = counterUpdatesListBuilder.build();
status.setCounterUpdates(counterUpdates);
// Metrics reported in SDK runner.
// This includes all different kinds of metrics coming from SDK.
// Keep in mind that these metrics might contain different types of counter names:
// i.e. structuredNameAndMetadata and nameAndKind
worker.extractMetricUpdates().forEach(x -> counterUpdatesMap.put(
x.getStructuredNameAndMetadata() == null ? x.getNameAndKind()
: x.getStructuredNameAndMetadata(), x));

status.setCounterUpdates(ImmutableList.copyOf(counterUpdatesMap.values()));
}

private synchronized Iterable<CounterUpdate> extractCounters(@Nullable CounterSet counters) {
Expand Down

0 comments on commit 92b3642

Please sign in to comment.