From 40eefc14797039e872fdba3f0565aab8a729a394 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Fri, 21 Apr 2017 10:03:50 -0700 Subject: [PATCH 1/5] Make it possible to test runners that don't support all metrics Runners may only partially support metrics. This partial support runs along axes: - attempted & committed - counters & distributions & gauges & ... Prior to this PR, we have categories for the first axis, but not the second. This means that a runner that supports only part of the second axis has to blacklist tests on the first axis. Adding categories for both axes lets runners build a matrix of supported features. I also renamed the exiting categories so that they are grouped alphabetically. Happy to undo if this is not a good change. This also lets the DataflowRunner run more tests by accurately identifying its test matrix. --- runners/google-cloud-dataflow-java/pom.xml | 3 +-- .../beam/sdk/testing/UsesCounterMetrics.java | 25 ++++++++++++++++++ .../sdk/testing/UsesDistributionMetrics.java | 26 +++++++++++++++++++ .../beam/sdk/testing/UsesGaugeMetrics.java | 25 ++++++++++++++++++ .../apache/beam/sdk/metrics/MetricsTest.java | 13 +++++++--- 5 files changed, 86 insertions(+), 6 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 64dc71e8a618..49395561681b 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -122,8 +122,7 @@ validates-runner-tests - org.apache.beam.sdk.testing.UsesAttemptedMetrics, - org.apache.beam.sdk.testing.UsesCommittedMetrics, + org.apache.beam.sdk.testing.UsesDistributionMetrics, org.apache.beam.sdk.testing.UsesSetState, org.apache.beam.sdk.testing.UsesMapState, org.apache.beam.sdk.testing.UsesTimersInParDo, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java new file mode 100644 index 000000000000..0d0ed6fe2fe6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCounterMetrics.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +/** + * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Counter}. + * Tests tagged with {@link UsesCounterMetrics} should be run for runners which support counters. + */ +public class UsesCounterMetrics {} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java new file mode 100644 index 000000000000..642202444fa1 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesDistributionMetrics.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +/** + * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Distribution}. + * Tests tagged with {@link UsesDistributionMetrics} should be run for runners which support + * distributions. + */ +public class UsesDistributionMetrics {} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java new file mode 100644 index 000000000000..9d6455efd746 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesGaugeMetrics.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +/** + * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Gauge}. + * Tests tagged with {@link UsesGaugeMetrics} should be run for runners which support gauges. + */ +public class UsesGaugeMetrics {} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index c7068e1ded95..7d214f6d7651 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -33,6 +33,9 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.UsesAttemptedMetrics; import org.apache.beam.sdk.testing.UsesCommittedMetrics; +import org.apache.beam.sdk.testing.UsesCounterMetrics; +import org.apache.beam.sdk.testing.UsesDistributionMetrics; +import org.apache.beam.sdk.testing.UsesGaugeMetrics; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -122,7 +125,8 @@ public void counterToCell() { assertThat(cell.getCumulative(), CoreMatchers.equalTo(42L)); } - @Category({ValidatesRunner.class, UsesCommittedMetrics.class}) + @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class, + UsesDistributionMetrics.class, UsesGaugeMetrics.class}) @Test public void committedMetricsReportToQuery() { PipelineResult result = runPipelineWithMetrics(); @@ -151,7 +155,8 @@ public void committedMetricsReportToQuery() { } - @Category({ValidatesRunner.class, UsesAttemptedMetrics.class}) + @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class, + UsesDistributionMetrics.class, UsesGaugeMetrics.class}) @Test public void attemptedMetricsReportToQuery() { PipelineResult result = runPipelineWithMetrics(); @@ -233,7 +238,7 @@ public void processElement(ProcessContext c) { } @Test - @Category({ValidatesRunner.class, UsesAttemptedMetrics.class}) + @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class}) public void testBoundedSourceMetrics() { long numElements = 1000; @@ -259,7 +264,7 @@ public void testBoundedSourceMetrics() { } @Test - @Category({ValidatesRunner.class, UsesAttemptedMetrics.class}) + @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class}) public void testUnboundedSourceMetrics() { long numElements = 1000; From 4591ff15436026da04224c0eba0013281550cb19 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 24 Apr 2017 11:50:29 -0700 Subject: [PATCH 2/5] MetricsMatchers: refactor matchers for committed/attempted code reuse --- .../beam/sdk/metrics/MetricMatchers.java | 144 ++++++------------ .../apache/beam/sdk/metrics/MetricsTest.java | 77 ++++------ 2 files changed, 73 insertions(+), 148 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java index 2251c820d688..a0dd1195ec01 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java @@ -81,61 +81,39 @@ public void describeTo(Description description) { } /** - * Matches a {@link MetricResult} with the given namespace, name and step, and whose attempted - * value equals the given value. + * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals + * the given value for attempted metrics. */ public static Matcher> attemptedMetricsResult( - final String namespace, final String name, final String step, final T attempted) { - return new TypeSafeMatcher>() { - @Override - protected boolean matchesSafely(MetricResult item) { - return Objects.equals(namespace, item.name().namespace()) - && Objects.equals(name, item.name().name()) - && item.step().contains(step) - && metricResultsEqual(attempted, item.attempted()); - } - - @Override - public void describeTo(Description description) { - description - .appendText("MetricResult{inNamespace=").appendValue(namespace) - .appendText(", name=").appendValue(name) - .appendText(", step=").appendValue(step) - .appendText(", attempted=").appendValue(attempted) - .appendText("}"); - } - - @Override - protected void describeMismatchSafely(MetricResult item, Description mismatchDescription) { - mismatchDescription.appendText("MetricResult{"); - - describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step); - - if (!Objects.equals(attempted, item.attempted())) { - mismatchDescription - .appendText("attempted: ").appendValue(attempted) - .appendText(" != ").appendValue(item.attempted()); - } - - mismatchDescription.appendText("}"); - } - }; + final String namespace, final String name, final String step, final T value) { + return metricsResult(namespace, name, step, value, false); } /** - * Matches a {@link MetricResult} with the given namespace, name and step, and whose committed - * value equals the given value. + * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals + * the given value for committed metrics. */ public static Matcher> committedMetricsResult( - final String namespace, final String name, final String step, - final T committed) { + final String namespace, final String name, final String step, final T value) { + return metricsResult(namespace, name, step, value, true); + } + + /** + * Matches a {@link MetricResult} with the given namespace, name and step, and whose value equals + * the given value for either committed or attempted (based on {@code isCommitted}) metrics. + */ + public static Matcher> metricsResult( + final String namespace, final String name, final String step, final T value, + final boolean isCommitted) { + final String metricState = isCommitted ? "committed" : "attempted"; return new TypeSafeMatcher>() { @Override protected boolean matchesSafely(MetricResult item) { + final T metricValue = isCommitted ? item.committed() : item.attempted(); return Objects.equals(namespace, item.name().namespace()) && Objects.equals(name, item.name().name()) && item.step().contains(step) - && metricResultsEqual(committed, item.committed()); + && metricResultsEqual(value, metricValue); } @Override @@ -144,20 +122,21 @@ public void describeTo(Description description) { .appendText("MetricResult{inNamespace=").appendValue(namespace) .appendText(", name=").appendValue(name) .appendText(", step=").appendValue(step) - .appendText(", committed=").appendValue(committed) + .appendText(String.format(", %s=", metricState)).appendValue(value) .appendText("}"); } @Override protected void describeMismatchSafely(MetricResult item, Description mismatchDescription) { mismatchDescription.appendText("MetricResult{"); + final T metricValue = isCommitted ? item.committed() : item.attempted(); describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step); - if (!Objects.equals(committed, item.committed())) { + if (!Objects.equals(value, metricValue)) { mismatchDescription - .appendText("committed: ").appendValue(committed) - .appendText(" != ").appendValue(item.committed()); + .appendText(String.format("%s: ", metricState)).appendValue(value) + .appendText(" != ").appendValue(metricValue); } mismatchDescription.appendText("}"); @@ -176,62 +155,28 @@ private static boolean metricResultsEqual(T result1, T result2) { static Matcher> distributionAttemptedMinMax( final String namespace, final String name, final String step, final Long attemptedMin, final Long attemptedMax) { - return new TypeSafeMatcher>() { - @Override - protected boolean matchesSafely(MetricResult item) { - return Objects.equals(namespace, item.name().namespace()) - && Objects.equals(name, item.name().name()) - && item.step().contains(step) - && Objects.equals(attemptedMin, item.attempted().min()) - && Objects.equals(attemptedMax, item.attempted().max()); - } - - @Override - public void describeTo(Description description) { - description - .appendText("MetricResult{inNamespace=").appendValue(namespace) - .appendText(", name=").appendValue(name) - .appendText(", step=").appendValue(step) - .appendText(", attemptedMin=").appendValue(attemptedMin) - .appendText(", attemptedMax=").appendValue(attemptedMax) - .appendText("}"); - } - - @Override - protected void describeMismatchSafely(MetricResult item, - Description mismatchDescription) { - mismatchDescription.appendText("MetricResult{"); - - describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step); - - if (!Objects.equals(attemptedMin, item.attempted())) { - mismatchDescription - .appendText("attemptedMin: ").appendValue(attemptedMin) - .appendText(" != ").appendValue(item.attempted()); - } - - if (!Objects.equals(attemptedMax, item.attempted())) { - mismatchDescription - .appendText("attemptedMax: ").appendValue(attemptedMax) - .appendText(" != ").appendValue(item.attempted()); - } - - mismatchDescription.appendText("}"); - } - }; + return distributionMinMax(namespace, name, step, attemptedMin, attemptedMax, false); } static Matcher> distributionCommittedMinMax( final String namespace, final String name, final String step, final Long committedMin, final Long committedMax) { + return distributionMinMax(namespace, name, step, committedMin, committedMax, true); + } + + static Matcher> distributionMinMax( + final String namespace, final String name, final String step, + final Long min, final Long max, final boolean isCommitted) { + final String metricState = isCommitted ? "committed" : "attempted"; return new TypeSafeMatcher>() { @Override protected boolean matchesSafely(MetricResult item) { + DistributionResult metricValue = isCommitted ? item.committed() : item.attempted(); return Objects.equals(namespace, item.name().namespace()) && Objects.equals(name, item.name().name()) && item.step().contains(step) - && Objects.equals(committedMin, item.committed().min()) - && Objects.equals(committedMax, item.committed().max()); + && Objects.equals(min, metricValue.min()) + && Objects.equals(max, metricValue.max()); } @Override @@ -240,8 +185,8 @@ public void describeTo(Description description) { .appendText("MetricResult{inNamespace=").appendValue(namespace) .appendText(", name=").appendValue(name) .appendText(", step=").appendValue(step) - .appendText(", committedMin=").appendValue(committedMin) - .appendText(", committedMax=").appendValue(committedMax) + .appendText(String.format(", %sMin=", metricState)).appendValue(min) + .appendText(String.format(", %sMax=", metricState)).appendValue(max) .appendText("}"); } @@ -251,17 +196,18 @@ protected void describeMismatchSafely(MetricResult item, mismatchDescription.appendText("MetricResult{"); describeMetricsResultMembersMismatch(item, mismatchDescription, namespace, name, step); + DistributionResult metricValue = isCommitted ? item.committed() : item.attempted(); - if (!Objects.equals(committedMin, item.committed())) { + if (!Objects.equals(min, metricValue.min())) { mismatchDescription - .appendText("committedMin: ").appendValue(committedMin) - .appendText(" != ").appendValue(item.committed()); + .appendText(String.format("%sMin: ", metricState)).appendValue(min) + .appendText(" != ").appendValue(metricValue.min()); } - if (!Objects.equals(committedMax, item.committed())) { + if (!Objects.equals(max, metricValue.max())) { mismatchDescription - .appendText("committedMax: ").appendValue(committedMax) - .appendText(" != ").appendValue(item.committed()); + .appendText(String.format("%sMax: ", metricState)).appendValue(max) + .appendText(" != ").appendValue(metricValue.max()); } mismatchDescription.appendText("}"); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 7d214f6d7651..b87b5e9e02e2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -19,9 +19,8 @@ package org.apache.beam.sdk.metrics; import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; -import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult; -import static org.apache.beam.sdk.metrics.MetricMatchers.distributionAttemptedMinMax; -import static org.apache.beam.sdk.metrics.MetricMatchers.distributionCommittedMinMax; +import static org.apache.beam.sdk.metrics.MetricMatchers.distributionMinMax; +import static org.apache.beam.sdk.metrics.MetricMatchers.metricsResult; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertNull; @@ -130,59 +129,16 @@ public void counterToCell() { @Test public void committedMetricsReportToQuery() { PipelineResult result = runPipelineWithMetrics(); - - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) - .build()); - - assertThat(metrics.counters(), hasItem( - committedMetricsResult(NAMESPACE, "count", "MyStep1", 3L))); - assertThat(metrics.distributions(), hasItem( - committedMetricsResult(NAMESPACE, "input", "MyStep1", - DistributionResult.create(26L, 3L, 5L, 13L)))); - - assertThat(metrics.counters(), hasItem( - committedMetricsResult(NAMESPACE, "count", "MyStep2", 6L))); - assertThat(metrics.distributions(), hasItem( - committedMetricsResult(NAMESPACE, "input", "MyStep2", - DistributionResult.create(52L, 6L, 5L, 13L)))); - assertThat(metrics.gauges(), hasItem( - committedMetricsResult(NAMESPACE, "my-gauge", "MyStep2", - GaugeResult.create(12L, Instant.now())))); - - assertThat(metrics.distributions(), hasItem( - distributionCommittedMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L))); + testAllMetrics(result, true); } - @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class, UsesDistributionMetrics.class, UsesGaugeMetrics.class}) @Test public void attemptedMetricsReportToQuery() { PipelineResult result = runPipelineWithMetrics(); - - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) - .build()); - // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly. - assertThat(metrics.counters(), hasItem( - attemptedMetricsResult(NAMESPACE, "count", "MyStep1", 3L))); - assertThat(metrics.distributions(), hasItem( - attemptedMetricsResult(NAMESPACE, "input", "MyStep1", - DistributionResult.create(26L, 3L, 5L, 13L)))); - - assertThat(metrics.counters(), hasItem( - attemptedMetricsResult(NAMESPACE, "count", "MyStep2", 6L))); - assertThat(metrics.distributions(), hasItem( - attemptedMetricsResult(NAMESPACE, "input", "MyStep2", - DistributionResult.create(52L, 6L, 5L, 13L)))); - assertThat(metrics.gauges(), hasItem( - attemptedMetricsResult(NAMESPACE, "my-gauge", "MyStep2", - GaugeResult.create(12L, Instant.now())))); - - assertThat(metrics.distributions(), hasItem( - distributionAttemptedMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L))); + testAllMetrics(result, false); } private PipelineResult runPipelineWithMetrics() { @@ -237,6 +193,30 @@ public void processElement(ProcessContext c) { return result; } + private static void testAllMetrics(PipelineResult result, boolean isCommitted) { + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + + assertThat(metrics.counters(), hasItem( + metricsResult(NAMESPACE, "count", "MyStep1", 3L, isCommitted))); + assertThat(metrics.distributions(), hasItem( + metricsResult(NAMESPACE, "input", "MyStep1", + DistributionResult.create(26L, 3L, 5L, 13L), isCommitted))); + + assertThat(metrics.counters(), hasItem( + metricsResult(NAMESPACE, "count", "MyStep2", 6L, isCommitted))); + assertThat(metrics.distributions(), hasItem( + metricsResult(NAMESPACE, "input", "MyStep2", + DistributionResult.create(52L, 6L, 5L, 13L), isCommitted))); + assertThat(metrics.gauges(), hasItem( + metricsResult(NAMESPACE, "my-gauge", "MyStep2", + GaugeResult.create(12L, Instant.now()), isCommitted))); + + assertThat(metrics.distributions(), hasItem( + distributionMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L, isCommitted))); + } + @Test @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class}) public void testBoundedSourceMetrics() { @@ -268,7 +248,6 @@ public void testBoundedSourceMetrics() { public void testUnboundedSourceMetrics() { long numElements = 1000; - // Use withMaxReadTime to force unbounded mode. pipeline.apply( GenerateSequence.from(0).to(numElements).withMaxReadTime(Duration.standardDays(1))); From 51b7a14e8fe64dca8047873723009b27d02b7f6d Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 24 Apr 2017 12:11:42 -0700 Subject: [PATCH 3/5] Add single-metric-type variants --- .../apache/beam/sdk/metrics/MetricsTest.java | 105 +++++++++++++++--- 1 file changed, 90 insertions(+), 15 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index b87b5e9e02e2..e2318c3edc8e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -127,18 +127,86 @@ public void counterToCell() { @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class, UsesDistributionMetrics.class, UsesGaugeMetrics.class}) @Test - public void committedMetricsReportToQuery() { + public void testAllCommittedMetrics() { PipelineResult result = runPipelineWithMetrics(); - testAllMetrics(result, true); + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + + testAllMetrics(metrics, true); } @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class, UsesDistributionMetrics.class, UsesGaugeMetrics.class}) @Test - public void attemptedMetricsReportToQuery() { + public void testAllAttemptedMetrics() { PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly. - testAllMetrics(result, false); + testAllMetrics(metrics, false); + } + + @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class}) + @Test + public void testCommittedCounterMetrics() { + PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + testCounterMetrics(metrics, true); + } + + @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class}) + @Test + public void testAttemptedCounterMetrics() { + PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + testCounterMetrics(metrics, false); + } + + @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesDistributionMetrics.class}) + @Test + public void testCommittedDistributionMetrics() { + PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + testDistributionMetrics(metrics, true); + } + + @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesDistributionMetrics.class}) + @Test + public void testAttemptedDistributionMetrics() { + PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + testDistributionMetrics(metrics, false); + } + + @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesGaugeMetrics.class}) + @Test + public void testCommittedGaugeMetrics() { + PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + testGaugeMetrics(metrics, true); + } + + @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesGaugeMetrics.class}) + @Test + public void testAttemptedGaugeMetrics() { + PipelineResult result = runPipelineWithMetrics(); + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + testGaugeMetrics(metrics, false); } private PipelineResult runPipelineWithMetrics() { @@ -193,30 +261,37 @@ public void processElement(ProcessContext c) { return result; } - private static void testAllMetrics(PipelineResult result, boolean isCommitted) { - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) - .build()); - + private static void testCounterMetrics(MetricQueryResults metrics, boolean isCommitted) { assertThat(metrics.counters(), hasItem( metricsResult(NAMESPACE, "count", "MyStep1", 3L, isCommitted))); + assertThat(metrics.counters(), hasItem( + metricsResult(NAMESPACE, "count", "MyStep2", 6L, isCommitted))); + } + + private static void testGaugeMetrics(MetricQueryResults metrics, boolean isCommitted) { + assertThat(metrics.gauges(), hasItem( + metricsResult(NAMESPACE, "my-gauge", "MyStep2", + GaugeResult.create(12L, Instant.now()), isCommitted))); + } + + private static void testDistributionMetrics(MetricQueryResults metrics, boolean isCommitted) { assertThat(metrics.distributions(), hasItem( metricsResult(NAMESPACE, "input", "MyStep1", DistributionResult.create(26L, 3L, 5L, 13L), isCommitted))); - assertThat(metrics.counters(), hasItem( - metricsResult(NAMESPACE, "count", "MyStep2", 6L, isCommitted))); assertThat(metrics.distributions(), hasItem( metricsResult(NAMESPACE, "input", "MyStep2", DistributionResult.create(52L, 6L, 5L, 13L), isCommitted))); - assertThat(metrics.gauges(), hasItem( - metricsResult(NAMESPACE, "my-gauge", "MyStep2", - GaugeResult.create(12L, Instant.now()), isCommitted))); - assertThat(metrics.distributions(), hasItem( distributionMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L, isCommitted))); } + private static void testAllMetrics(MetricQueryResults metrics, boolean isCommitted) { + testCounterMetrics(metrics, isCommitted); + testDistributionMetrics(metrics, isCommitted); + testGaugeMetrics(metrics, isCommitted); + } + @Test @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class}) public void testBoundedSourceMetrics() { From f42c876da5a3df7924467019c832694daa4d019e Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 24 Apr 2017 12:12:32 -0700 Subject: [PATCH 4/5] Update DataflowRunner coverage --- runners/google-cloud-dataflow-java/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 49395561681b..75aac43febce 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -123,6 +123,7 @@ org.apache.beam.sdk.testing.UsesDistributionMetrics, + org.apache.beam.sdk.testing.UsesGaugeMetrics, org.apache.beam.sdk.testing.UsesSetState, org.apache.beam.sdk.testing.UsesMapState, org.apache.beam.sdk.testing.UsesTimersInParDo, From eb94ddb9c266688761eac8865af82e211aabd8c1 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Wed, 26 Apr 2017 13:41:13 -0700 Subject: [PATCH 5/5] respond to review --- .../apache/beam/sdk/metrics/MetricsTest.java | 77 ++++++++----------- 1 file changed, 34 insertions(+), 43 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index e2318c3edc8e..8077c27cd7b2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -60,6 +60,13 @@ public class MetricsTest implements Serializable { private static final String NAMESPACE = MetricsTest.class.getName(); private static final MetricName ELEMENTS_READ = SourceMetrics.elementsRead().getName(); + private static MetricQueryResults queryTestMetrics(PipelineResult result) { + return result.metrics().queryMetrics( + MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + } + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @@ -69,14 +76,14 @@ public void tearDown() { } @Test - public void distributionWithoutContainer() { + public void testDistributionWithoutContainer() { assertNull(MetricsEnvironment.getCurrentContainer()); // Should not fail even though there is no metrics container. Metrics.distribution(NS, NAME).update(5L); } @Test - public void counterWithoutContainer() { + public void testCounterWithoutContainer() { assertNull(MetricsEnvironment.getCurrentContainer()); // Should not fail even though there is no metrics container. Counter counter = Metrics.counter(NS, NAME); @@ -87,7 +94,7 @@ public void counterWithoutContainer() { } @Test - public void distributionToCell() { + public void testDistributionToCell() { MetricsContainer container = new MetricsContainer("step"); MetricsEnvironment.setCurrentContainer(container); @@ -106,7 +113,7 @@ public void distributionToCell() { } @Test - public void counterToCell() { + public void testCounterToCell() { MetricsContainer container = new MetricsContainer("step"); MetricsEnvironment.setCurrentContainer(container); Counter counter = Metrics.counter(NS, NAME); @@ -129,11 +136,9 @@ public void counterToCell() { @Test public void testAllCommittedMetrics() { PipelineResult result = runPipelineWithMetrics(); - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) - .build()); + MetricQueryResults metrics = queryTestMetrics(result); - testAllMetrics(metrics, true); + assertAllMetrics(metrics, true); } @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class, @@ -141,72 +146,58 @@ public void testAllCommittedMetrics() { @Test public void testAllAttemptedMetrics() { PipelineResult result = runPipelineWithMetrics(); - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) - .build()); + MetricQueryResults metrics = queryTestMetrics(result); // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly. - testAllMetrics(metrics, false); + assertAllMetrics(metrics, false); } @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class}) @Test public void testCommittedCounterMetrics() { PipelineResult result = runPipelineWithMetrics(); - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) - .build()); - testCounterMetrics(metrics, true); + MetricQueryResults metrics = queryTestMetrics(result); + assertCounterMetrics(metrics, true); } @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class}) @Test public void testAttemptedCounterMetrics() { PipelineResult result = runPipelineWithMetrics(); - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) - .build()); - testCounterMetrics(metrics, false); + MetricQueryResults metrics = queryTestMetrics(result); + assertCounterMetrics(metrics, false); } @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesDistributionMetrics.class}) @Test public void testCommittedDistributionMetrics() { PipelineResult result = runPipelineWithMetrics(); - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) - .build()); - testDistributionMetrics(metrics, true); + MetricQueryResults metrics = queryTestMetrics(result); + assertDistributionMetrics(metrics, true); } @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesDistributionMetrics.class}) @Test public void testAttemptedDistributionMetrics() { PipelineResult result = runPipelineWithMetrics(); - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) - .build()); - testDistributionMetrics(metrics, false); + MetricQueryResults metrics = queryTestMetrics(result); + assertDistributionMetrics(metrics, false); } @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesGaugeMetrics.class}) @Test public void testCommittedGaugeMetrics() { PipelineResult result = runPipelineWithMetrics(); - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) - .build()); - testGaugeMetrics(metrics, true); + MetricQueryResults metrics = queryTestMetrics(result); + assertGaugeMetrics(metrics, true); } @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesGaugeMetrics.class}) @Test public void testAttemptedGaugeMetrics() { PipelineResult result = runPipelineWithMetrics(); - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) - .build()); - testGaugeMetrics(metrics, false); + MetricQueryResults metrics = queryTestMetrics(result); + assertGaugeMetrics(metrics, false); } private PipelineResult runPipelineWithMetrics() { @@ -261,20 +252,20 @@ public void processElement(ProcessContext c) { return result; } - private static void testCounterMetrics(MetricQueryResults metrics, boolean isCommitted) { + private static void assertCounterMetrics(MetricQueryResults metrics, boolean isCommitted) { assertThat(metrics.counters(), hasItem( metricsResult(NAMESPACE, "count", "MyStep1", 3L, isCommitted))); assertThat(metrics.counters(), hasItem( metricsResult(NAMESPACE, "count", "MyStep2", 6L, isCommitted))); } - private static void testGaugeMetrics(MetricQueryResults metrics, boolean isCommitted) { + private static void assertGaugeMetrics(MetricQueryResults metrics, boolean isCommitted) { assertThat(metrics.gauges(), hasItem( metricsResult(NAMESPACE, "my-gauge", "MyStep2", GaugeResult.create(12L, Instant.now()), isCommitted))); } - private static void testDistributionMetrics(MetricQueryResults metrics, boolean isCommitted) { + private static void assertDistributionMetrics(MetricQueryResults metrics, boolean isCommitted) { assertThat(metrics.distributions(), hasItem( metricsResult(NAMESPACE, "input", "MyStep1", DistributionResult.create(26L, 3L, 5L, 13L), isCommitted))); @@ -286,10 +277,10 @@ private static void testDistributionMetrics(MetricQueryResults metrics, boolean distributionMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L, isCommitted))); } - private static void testAllMetrics(MetricQueryResults metrics, boolean isCommitted) { - testCounterMetrics(metrics, isCommitted); - testDistributionMetrics(metrics, isCommitted); - testGaugeMetrics(metrics, isCommitted); + private static void assertAllMetrics(MetricQueryResults metrics, boolean isCommitted) { + assertCounterMetrics(metrics, isCommitted); + assertDistributionMetrics(metrics, isCommitted); + assertGaugeMetrics(metrics, isCommitted); } @Test