From c02d39b2e6f29b0012938d696536f7d110d78a9a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 26 Apr 2017 22:22:49 -0700 Subject: [PATCH 1/2] Add a test for metrics reporting in Spark --- .../metrics/sink/SparkMetricsTest.java | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsTest.java diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsTest.java new file mode 100644 index 000000000000..f67d43a08a01 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.aggregators.metrics.sink; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import org.apache.beam.runners.spark.PipelineRule; +import org.apache.beam.runners.spark.examples.WordCount; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExternalResource; + + +/** + * A test for the metrics reporting in Spark. + */ +public class SparkMetricsTest { + + @Rule + public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); + + @Rule + public InMemoryMetricsSinkRule clearInMemoryMetricsSink = new InMemoryMetricsSinkRule(); + + @Rule + public final PipelineRule pipelineRule = PipelineRule.batch(); + + private Pipeline createSparkPipeline() { + pipelineRule.getOptions().setEnableSparkMetricSinks(true); + return pipelineRule.createPipeline(); + } + + private void runPipeline() { + + final List words = + Arrays.asList("hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"); + + final Set expectedCounts = + ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); + + final Pipeline pipeline = createSparkPipeline(); + + final PCollection output = + pipeline + .apply(Create.of(words).withCoder(StringUtf8Coder.of())) + .apply(new WordCount.CountWords()) + .apply(MapElements.via(new WordCount.FormatAsTextFn())); + + PAssert.that(output).containsInAnyOrder(expectedCounts); + + pipeline.run(); + } + + @Test + public void testNamedMetric() throws Exception { + assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue())); + + runPipeline(); + + assertThat(InMemoryMetrics.valueOf("emptyLines"), is(1d)); + } + + @Test + public void testNonExistingMetricName() throws Exception { + runPipeline(); + + final Long valueOf = InMemoryMetrics.valueOf("myMissingAggregator"); + + assertThat(valueOf, is(nullValue())); + } +} From d498d84145085a017fa7a1bba2e2ef6e3abd9348 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 28 Apr 2017 12:19:50 -0700 Subject: [PATCH 2/2] CR feedback (remove unecessary test and rule) s/SparkMetricsTest/SparkMetricsSinkTest/ --- ...kMetricsTest.java => SparkMetricsSinkTest.java} | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) rename runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/{SparkMetricsTest.java => SparkMetricsSinkTest.java} (88%) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java similarity index 88% rename from runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsTest.java rename to runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java index f67d43a08a01..95008f069910 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -42,14 +42,11 @@ /** * A test for the metrics reporting in Spark. */ -public class SparkMetricsTest { +public class SparkMetricsSinkTest { @Rule public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); - @Rule - public InMemoryMetricsSinkRule clearInMemoryMetricsSink = new InMemoryMetricsSinkRule(); - @Rule public final PipelineRule pipelineRule = PipelineRule.batch(); @@ -87,13 +84,4 @@ public void testNamedMetric() throws Exception { assertThat(InMemoryMetrics.valueOf("emptyLines"), is(1d)); } - - @Test - public void testNonExistingMetricName() throws Exception { - runPipeline(); - - final Long valueOf = InMemoryMetrics.valueOf("myMissingAggregator"); - - assertThat(valueOf, is(nullValue())); - } }