From ca24044449950850f6e5e5aebce5382b8f50446f Mon Sep 17 00:00:00 2001 From: bchambers Date: Thu, 15 Dec 2016 17:04:59 -0800 Subject: [PATCH 1/7] Add UsesMetrics interface and exclude from runners --- runners/apex/pom.xml | 3 ++- runners/google-cloud-dataflow-java/pom.xml | 3 ++- runners/spark/pom.xml | 3 ++- .../apache/beam/sdk/testing/UsesMetrics.java | 24 +++++++++++++++++++ 4 files changed, 30 insertions(+), 3 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index f71637c66fab..d03964daaa7e 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -186,7 +186,8 @@ org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, - org.apache.beam.sdk.testing.UsesSplittableParDo + org.apache.beam.sdk.testing.UsesSplittableParDo, + org.apache.beam.sdk.testing.UsesMetrics none true diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 77187d68cd4f..dac0b3fd38c9 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -80,7 +80,8 @@ org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, - org.apache.beam.sdk.testing.UsesSplittableParDo + org.apache.beam.sdk.testing.UsesSplittableParDo, + org.apache.beam.sdk.testing.UsesMetrics org.apache.beam.sdk.transforms.FlattenTest diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 5a2fe87ab4cb..309e1ffbef3f 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -75,7 +75,8 @@ org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, - org.apache.beam.sdk.testing.UsesSplittableParDo + org.apache.beam.sdk.testing.UsesSplittableParDo, + org.apache.beam.sdk.testing.UsesMetrics 1 false diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java new file mode 100644 index 000000000000..261354c240d3 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMetrics.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.testing; + +/** + * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.Metrics}. + */ +public interface UsesMetrics {} From de5b3135c289868c2fea937350e52cd971b1b550 Mon Sep 17 00:00:00 2001 From: bchambers Date: Thu, 15 Dec 2016 17:04:21 -0800 Subject: [PATCH 2/7] squash! Add Serializability as needed for Metrics --- .../main/java/org/apache/beam/sdk/metrics/MetricName.java | 3 ++- .../src/main/java/org/apache/beam/sdk/metrics/Metrics.java | 5 +++-- .../test/java/org/apache/beam/sdk/metrics/MetricsTest.java | 3 ++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java index 843a88575012..3c7704336ed8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.metrics; import com.google.auto.value.AutoValue; +import java.io.Serializable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -28,7 +29,7 @@ */ @Experimental(Kind.METRICS) @AutoValue -public abstract class MetricName { +public abstract class MetricName implements Serializable { /** The namespace associated with this metric. */ public abstract String namespace(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java index b72a0b244a17..045e076b2a78 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.metrics; +import java.io.Serializable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -58,7 +59,7 @@ public static Distribution distribution(Class namespace, String name) { } /** Implementation of {@link Counter} that delegates to the instance for the current context. */ - private static class DelegatingCounter implements Counter { + private static class DelegatingCounter implements Counter, Serializable { private final MetricName name; private DelegatingCounter(MetricName name) { @@ -92,7 +93,7 @@ private DelegatingCounter(MetricName name) { /** * Implementation of {@link Distribution} that delegates to the instance for the current context. */ - private static class DelegatingDistribution implements Distribution { + private static class DelegatingDistribution implements Distribution, Serializable { private final MetricName name; private DelegatingDistribution(MetricName name) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 732cb346d20a..f402f500e448 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import java.io.Serializable; import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.Test; @@ -29,7 +30,7 @@ /** * Tests for {@link Metrics}. */ -public class MetricsTest { +public class MetricsTest implements Serializable { private static final String NS = "test"; private static final String NAME = "name"; From 526c27b0479af9e7ba703de826a4ac964792dfd3 Mon Sep 17 00:00:00 2001 From: bchambers Date: Thu, 15 Dec 2016 17:05:16 -0800 Subject: [PATCH 3/7] squash! Add runnable on service test for Metrics --- .../apache/beam/sdk/metrics/MetricsTest.java | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index f402f500e448..e622d0c43dcd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -18,14 +18,25 @@ package org.apache.beam.sdk.metrics; +import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import java.io.Serializable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesMetrics; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.Test; +import org.junit.experimental.categories.Category; /** * Tests for {@link Metrics}. @@ -96,4 +107,54 @@ public void counterToCell() { counter.dec(); assertThat(cell.getCumulative(), CoreMatchers.equalTo(42L)); } + + @Category({RunnableOnService.class, UsesMetrics.class}) + @Test + public void metricsE2E() { + final Counter count = Metrics.counter(MetricsTest.class, "count"); + Pipeline pipeline = TestPipeline.create(); + pipeline + .apply(Create.of(5, 8, 13)) + .apply("MyStep1", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + Distribution values = Metrics.distribution(MetricsTest.class, "input"); + count.inc(); + values.update(c.element()); + + c.output(c.element()); + c.output(c.element()); + } + })) + .apply("MyStep2", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + Distribution values = Metrics.distribution(MetricsTest.class, "input"); + count.inc(); + values.update(c.element()); + } + })); + PipelineResult result = pipeline.run(); + + result.waitUntilFinish(); + + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) + .build()); + final String step1Name = "MyStep1/AnonymousParDo/AnonymousParMultiDo"; + assertThat(metrics.counters(), hasItem( + metricResult(MetricsTest.class.getName(), "count", step1Name, 3L, 3L))); + assertThat(metrics.distributions(), hasItem( + metricResult(MetricsTest.class.getName(), "input", step1Name, + DistributionResult.create(26L, 3L, 5L, 13L), + DistributionResult.create(26L, 3L, 5L, 13L)))); + + final String step2Name = "MyStep2/AnonymousParDo/AnonymousParMultiDo"; + assertThat(metrics.counters(), hasItem( + metricResult(MetricsTest.class.getName(), "count", step2Name, 6L, 6L))); + assertThat(metrics.distributions(), hasItem( + metricResult(MetricsTest.class.getName(), "input", step2Name, + DistributionResult.create(52L, 6L, 5L, 13L), + DistributionResult.create(52L, 6L, 5L, 13L)))); + } } From 304e42036174abbd459091b3c509a9bbb9ba0185 Mon Sep 17 00:00:00 2001 From: bchambers Date: Thu, 15 Dec 2016 17:48:23 -0800 Subject: [PATCH 4/7] squash! Remove test from DirectRunnerTest --- .../beam/runners/direct/DirectRunnerTest.java | 39 ------------------- 1 file changed, 39 deletions(-) diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index eb0f344dabe5..eafb788d9c96 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -18,8 +18,6 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; -import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isA; @@ -37,7 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -48,13 +45,6 @@ import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Distribution; -import org.apache.beam.sdk.metrics.DistributionResult; -import org.apache.beam.sdk.metrics.MetricNameFilter; -import org.apache.beam.sdk.metrics.MetricQueryResults; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.PipelineRunner; @@ -467,35 +457,6 @@ public Long decode(InputStream inStream, Context context) throws IOException { } } - @Test - public void testMetrics() throws Exception { - Pipeline pipeline = getPipeline(); - pipeline - .apply(Create.of(5, 8, 13)) - .apply("MyStep", ParDo.of(new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - Counter count = Metrics.counter(DirectRunnerTest.class, "count"); - Distribution values = Metrics.distribution(DirectRunnerTest.class, "input"); - - count.inc(); - values.update(c.element()); - } - })); - PipelineResult result = pipeline.run(); - MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(DirectRunnerTest.class)) - .build()); - - final String stepName = "MyStep/AnonymousParDo/AnonymousParMultiDo"; - assertThat(metrics.counters(), contains( - metricResult(DirectRunnerTest.class.getName(), "count", stepName, 3L, 3L))); - assertThat(metrics.distributions(), contains( - metricResult(DirectRunnerTest.class.getName(), "input", stepName, - DistributionResult.create(26L, 3L, 5L, 13L), - DistributionResult.create(26L, 3L, 5L, 13L)))); - } - private static class MustSplitSource extends BoundedSource{ public static BoundedSource of(BoundedSource underlying) { return new MustSplitSource<>(underlying); From 97455c69bc330e8f9ce01b30501bc8052537f774 Mon Sep 17 00:00:00 2001 From: bchambers Date: Fri, 16 Dec 2016 11:25:58 -0800 Subject: [PATCH 5/7] fixup! Exclude Metrics tests from flink --- runners/flink/runner/pom.xml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index 09773e106e6e..7f49372aff17 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -56,7 +56,8 @@ org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, - org.apache.beam.sdk.testing.UsesSplittableParDo + org.apache.beam.sdk.testing.UsesSplittableParDo, + org.apache.beam.sdk.testing.UsesMetrics none true @@ -86,7 +87,8 @@ org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, - org.apache.beam.sdk.testing.UsesSplittableParDo + org.apache.beam.sdk.testing.UsesSplittableParDo, + org.apache.beam.sdk.testing.UsesMetrics none true From 57520947c7496d3a67f270ec056e20653918ac96 Mon Sep 17 00:00:00 2001 From: bchambers Date: Fri, 16 Dec 2016 12:00:42 -0800 Subject: [PATCH 6/7] fixup! Review comments --- .../org/apache/beam/sdk/metrics/MetricMatchers.java | 4 ++-- .../java/org/apache/beam/sdk/metrics/MetricsTest.java | 11 +++++------ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java index 6cd4c5245eca..798d9d41fd30 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java @@ -78,7 +78,7 @@ public static Matcher> metricResult( protected boolean matchesSafely(MetricResult item) { return Objects.equals(namespace, item.name().namespace()) && Objects.equals(name, item.name().name()) - && Objects.equals(step, item.step()) + && item.step().contains(step) && Objects.equals(committed, item.committed()) && Objects.equals(attempted, item.attempted()); } @@ -109,7 +109,7 @@ protected void describeMismatchSafely(MetricResult item, Description mismatch .appendText(" != ").appendValue(item.name().name()); } - if (!Objects.equals(step, item.step())) { + if (!item.step().contains(step)) { mismatchDescription .appendText("step: ").appendValue(step) .appendText(" != ").appendValue(item.step()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index e622d0c43dcd..3128a0e4e121 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -141,19 +141,18 @@ public void processElement(ProcessContext c) { MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() .addNameFilter(MetricNameFilter.inNamespace(MetricsTest.class)) .build()); - final String step1Name = "MyStep1/AnonymousParDo/AnonymousParMultiDo"; + // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly. assertThat(metrics.counters(), hasItem( - metricResult(MetricsTest.class.getName(), "count", step1Name, 3L, 3L))); + metricResult(MetricsTest.class.getName(), "count", "MyStep1", 3L, 3L))); assertThat(metrics.distributions(), hasItem( - metricResult(MetricsTest.class.getName(), "input", step1Name, + metricResult(MetricsTest.class.getName(), "input", "MyStep1", DistributionResult.create(26L, 3L, 5L, 13L), DistributionResult.create(26L, 3L, 5L, 13L)))); - final String step2Name = "MyStep2/AnonymousParDo/AnonymousParMultiDo"; assertThat(metrics.counters(), hasItem( - metricResult(MetricsTest.class.getName(), "count", step2Name, 6L, 6L))); + metricResult(MetricsTest.class.getName(), "count", "MyStep2", 6L, 6L))); assertThat(metrics.distributions(), hasItem( - metricResult(MetricsTest.class.getName(), "input", step2Name, + metricResult(MetricsTest.class.getName(), "input", "MyStep2", DistributionResult.create(52L, 6L, 5L, 13L), DistributionResult.create(52L, 6L, 5L, 13L)))); } From 79d76e96b41015a5d97fd3e3305554ceb43f103a Mon Sep 17 00:00:00 2001 From: bchambers Date: Fri, 16 Dec 2016 14:45:30 -0800 Subject: [PATCH 7/7] fixup! Rename test --- .../src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 3128a0e4e121..075df19b168c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -110,7 +110,7 @@ public void counterToCell() { @Category({RunnableOnService.class, UsesMetrics.class}) @Test - public void metricsE2E() { + public void metricsReportToQuery() { final Counter count = Metrics.counter(MetricsTest.class, "count"); Pipeline pipeline = TestPipeline.create(); pipeline