From 0d30860483a2c8f3d3efde9a45a162e5b8ece476 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Thu, 7 Jul 2016 13:49:30 -0700 Subject: [PATCH 1/2] Mark primitive display data tests RunnableOnService --- .../dataflow/io/DataflowAvroIOTest.java | 69 -------------- .../dataflow/io/DataflowBigQueryIOTest.java | 94 ------------------- .../dataflow/io/DataflowDatastoreIOTest.java | 66 ------------- .../dataflow/io/DataflowPubsubIOTest.java | 63 ------------- .../dataflow/io/DataflowTextIOTest.java | 76 --------------- .../transforms/DataflowCombineTest.java | 58 ------------ .../DataflowDisplayDataEvaluator.java | 72 -------------- .../transforms/DataflowMapElementsTest.java | 55 ----------- .../org/apache/beam/sdk/io/AvroIOTest.java | 34 +++++++ .../apache/beam/sdk/io/BigQueryIOTest.java | 61 ++++++++++++ .../org/apache/beam/sdk/io/PubsubIOTest.java | 30 ++++++ .../org/apache/beam/sdk/io/TextIOTest.java | 32 +++++++ .../beam/sdk/io/datastore/V1Beta3Test.java | 33 +++++++ .../beam/sdk/transforms/CombineTest.java | 19 ++++ .../beam/sdk/transforms/MapElementsTest.java | 22 +++++ .../display/DisplayDataEvaluator.java | 12 ++- 16 files changed, 241 insertions(+), 555 deletions(-) delete mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java delete mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java delete mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java delete mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java delete mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java delete mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java delete mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java delete mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java deleted file mode 100644 index 006daa996fae..000000000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.io; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertThat; - -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; - -import org.apache.avro.Schema; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Set; - -/** - * {@link DataflowRunner} specific tests for {@link AvroIO} transforms. - */ -@RunWith(JUnit4.class) -public class DataflowAvroIOTest { - @Test - public void testPrimitiveWriteDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - - AvroIO.Write.Bound write = AvroIO.Write - .to("foo") - .withSchema(Schema.create(Schema.Type.STRING)) - .withoutValidation(); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("AvroIO.Write should include the file pattern in its primitive transform", - displayData, hasItem(hasDisplayItem("fileNamePattern"))); - } - - @Test - public void testPrimitiveReadDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - - AvroIO.Read.Bound read = AvroIO.Read.from("foo.*") - .withSchema(Schema.create(Schema.Type.STRING)) - .withoutValidation(); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(read); - assertThat("AvroIO.Read should include the file pattern in its primitive transform", - displayData, hasItem(hasDisplayItem("filePattern"))); - } -} diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java deleted file mode 100644 index 2b13b9c5cac6..000000000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.io; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertThat; - -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; -import org.apache.beam.sdk.io.BigQueryIO; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; - -import com.google.api.services.bigquery.model.TableSchema; - -import org.junit.Test; - -import java.util.Set; - -/** - * Unit tests for Dataflow usage of {@link BigQueryIO} transforms. - */ -public class DataflowBigQueryIOTest { - @Test - public void testTableSourcePrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - BigQueryIO.Read.Bound read = BigQueryIO.Read - .from("project:dataset.tableId") - .withoutValidation(); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(read); - assertThat("BigQueryIO.Read should include the table spec in its primitive display data", - displayData, hasItem(hasDisplayItem("table"))); - } - - @Test - public void testQuerySourcePrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - BigQueryIO.Read.Bound read = BigQueryIO.Read - .fromQuery("foobar") - .withoutValidation(); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(read); - assertThat("BigQueryIO.Read should include the query in its primitive display data", - displayData, hasItem(hasDisplayItem("query"))); - } - - @Test - public void testBatchSinkPrimitiveDisplayData() { - DataflowPipelineOptions options = DataflowDisplayDataEvaluator.getDefaultOptions(); - options.setStreaming(false); - testSinkPrimitiveDisplayData(options); - } - - @Test - public void testStreamingSinkPrimitiveDisplayData() { - DataflowPipelineOptions options = DataflowDisplayDataEvaluator.getDefaultOptions(); - options.setStreaming(true); - testSinkPrimitiveDisplayData(options); - } - - private void testSinkPrimitiveDisplayData(DataflowPipelineOptions options) { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(options); - - BigQueryIO.Write.Bound write = BigQueryIO.Write - .to("project:dataset.table") - .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2")) - .withoutValidation(); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("BigQueryIO.Write should include the table spec in its primitive display data", - displayData, hasItem(hasDisplayItem("tableSpec"))); - - assertThat("BigQueryIO.Write should include the table schema in its primitive display data", - displayData, hasItem(hasDisplayItem("schema"))); - } -} diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java deleted file mode 100644 index 8cdf611b1887..000000000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.io; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertThat; - -import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; -import org.apache.beam.sdk.io.datastore.DatastoreIO; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.POutput; - -import com.google.datastore.v1beta3.Entity; -import com.google.datastore.v1beta3.Query; - -import org.junit.Test; - -import java.util.Set; - -/** - * Unit tests for Dataflow usage of {@link DatastoreIO} transforms. - */ -public class DataflowDatastoreIOTest { - @Test - public void testSourcePrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - PTransform read = DatastoreIO.v1beta3().read().withProjectId( - "myProject").withQuery(Query.newBuilder().build()); - - Set displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); - assertThat("DatastoreIO read should include the project in its primitive display data", - displayData, hasItem(hasDisplayItem("projectId"))); - } - - @Test - public void testSinkPrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - PTransform, ?> write = - DatastoreIO.v1beta3().write().withProjectId("myProject"); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("DatastoreIO write should include the project in its primitive display data", - displayData, hasItem(hasDisplayItem("projectId"))); - } -} diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java deleted file mode 100644 index 27bc2d9290e5..000000000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.io; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertThat; - -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; -import org.apache.beam.sdk.io.PubsubIO; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Set; - -/** - * {@link DataflowRunner} specific tests for {@link PubsubIO} transforms. - */ -@RunWith(JUnit4.class) -public class DataflowPubsubIOTest { - @Test - public void testPrimitiveWriteDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - PubsubIO.Write.Bound write = PubsubIO.Write.topic("projects/project/topics/topic"); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("PubsubIO.Write should include the topic in its primitive display data", - displayData, hasItem(hasDisplayItem("topic"))); - } - - @Test - public void testPrimitiveReadDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - PubsubIO.Read.Bound read = - PubsubIO.Read.subscription("projects/project/subscriptions/subscription") - .maxNumRecords(1); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(read); - assertThat("PubsubIO.Read should include the subscription in its primitive display data", - displayData, hasItem(hasDisplayItem("subscription"))); - } -} diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java deleted file mode 100644 index 727ffdccab98..000000000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.io; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.startsWith; -import static org.junit.Assert.assertThat; - -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; -import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; -import org.apache.beam.sdk.util.TestCredential; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Set; - -/** - * {@link DataflowRunner} specific tests for TextIO Read and Write transforms. - */ -@RunWith(JUnit4.class) -public class DataflowTextIOTest { - private TestDataflowPipelineOptions buildTestPipelineOptions() { - TestDataflowPipelineOptions options = - TestPipeline.testingPipelineOptions().as(TestDataflowPipelineOptions.class); - options.setGcpCredential(new TestCredential()); - return options; - } - - @Test - public void testPrimitiveWriteDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - - TextIO.Write.Bound write = TextIO.Write.to("foobar"); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("TextIO.Write should include the file prefix in its primitive display data", - displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); - } - - @Test - public void testPrimitiveReadDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - - TextIO.Read.Bound read = TextIO.Read - .from("foobar") - .withoutValidation(); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(read); - assertThat("TextIO.Read should include the file prefix in its primitive display data", - displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); - } -} diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java deleted file mode 100644 index 3af0caec9528..000000000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.transforms; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.CombineTest; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.POutput; - -import org.junit.Test; - -import java.util.Set; - -/** - * Unit tests for Dataflow usage of {@link Combine} transforms. - */ -public class DataflowCombineTest { - @Test - public void testCombinePerKeyPrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - - CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts(); - PTransform>, ? extends POutput> combine = - Combine.perKey(combineFn); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(combine, - KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); - - assertThat("Combine.perKey should include the combineFn in its primitive transform", - displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); - } -} diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java deleted file mode 100644 index d809cc6ad92a..000000000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.transforms; - -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.sdk.options.GcpOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; -import org.apache.beam.sdk.util.NoopCredentialFactory; -import org.apache.beam.sdk.util.NoopPathValidator; - -import com.google.common.collect.Lists; - -/** - * Factory methods for creating {@link DisplayDataEvaluator} instances against the - * {@link DataflowRunner}. - */ -public final class DataflowDisplayDataEvaluator { - /** Do not instantiate. */ - private DataflowDisplayDataEvaluator() {} - - /** - * Retrieve a set of default {@link DataflowPipelineOptions} which can be used to build - * dataflow pipelines for evaluating display data. - */ - public static DataflowPipelineOptions getDefaultOptions() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - - options.setRunner(DataflowRunner.class); - options.setProject("foobar"); - options.setTempLocation("gs://bucket/tmpLocation"); - options.setFilesToStage(Lists.newArrayList()); - - options.as(DataflowPipelineDebugOptions.class).setPathValidatorClass(NoopPathValidator.class); - options.as(GcpOptions.class).setCredentialFactoryClass(NoopCredentialFactory.class); - - return options; - } - - /** - * Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against - * the {@link DataflowRunner}. - */ - public static DisplayDataEvaluator create() { - return create(getDefaultOptions()); - } - - /** - * Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against - * the {@link DataflowRunner} with the specified {@code options}. - */ - public static DisplayDataEvaluator create(DataflowPipelineOptions options) { - return DisplayDataEvaluator.create(options); - } -} diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java deleted file mode 100644 index 8a5e67d5cad5..000000000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.transforms; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; - -import org.junit.Test; - -import java.io.Serializable; -import java.util.Set; - -/** - * Unit tests for Dataflow usage of {@link MapElements} transforms. - */ -public class DataflowMapElementsTest implements Serializable { - @Test - public void testPrimitiveDisplayData() { - SimpleFunction mapFn = new SimpleFunction() { - @Override - public Integer apply(Integer input) { - return input; - } - }; - - MapElements map = MapElements.via(mapFn); - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - - Set displayData = evaluator.displayDataForPrimitiveTransforms(map); - assertThat("MapElements should include the mapFn in its primitive display data", - displayData, hasItem(hasDisplayItem("mapFn", mapFn.getClass()))); - } -} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 8625b1050101..047e7d07e307 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -30,9 +31,11 @@ import org.apache.beam.sdk.io.AvroIO.Write.Bound; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.values.PCollection; @@ -40,6 +43,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; +import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.Nullable; @@ -55,6 +59,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Set; /** * Tests for AvroIO Read and Write transforms. @@ -272,6 +277,20 @@ public void testReadDisplayData() { assertThat(displayData, hasDisplayItem("validation", false)); } + @Test + @Category(RunnableOnService.class) + public void testPrimitiveReadDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + AvroIO.Read.Bound read = AvroIO.Read.from("foo.*") + .withSchema(Schema.create(Schema.Type.STRING)) + .withoutValidation(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("AvroIO.Read should include the file pattern in its primitive transform", + displayData, hasItem(hasDisplayItem("filePattern"))); + } + @Test public void testWriteDisplayData() { AvroIO.Write.Bound write = AvroIO.Write @@ -291,4 +310,19 @@ public void testWriteDisplayData() { assertThat(displayData, hasDisplayItem("numShards", 100)); assertThat(displayData, hasDisplayItem("validation", false)); } + + @Test + @Category(RunnableOnService.class) + public void testPrimitiveWriteDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + AvroIO.Write.Bound write = AvroIO.Write + .to("foo") + .withSchema(Schema.create(Schema.Type.STRING)) + .withoutValidation(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("AvroIO.Write should include the file pattern in its primitive transform", + displayData, hasItem(hasDisplayItem("fileNamePattern"))); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index 43bf314b5356..3c00a085f83d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -45,6 +46,7 @@ import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -57,6 +59,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.BigQueryServices; import org.apache.beam.sdk.util.BigQueryServices.DatasetService; @@ -109,6 +112,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Set; import javax.annotation.Nullable; @@ -636,6 +640,33 @@ public void testBuildSourceDisplayData() { assertThat(displayData, hasDisplayItem("validation", false)); } + @Test + @Category(RunnableOnService.class) + public void testTableSourcePrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + BigQueryIO.Read.Bound read = BigQueryIO.Read + .from("project:dataset.tableId") + .withoutValidation(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("BigQueryIO.Read should include the table spec in its primitive display data", + displayData, hasItem(hasDisplayItem("table"))); + } + + @Test + @Category(RunnableOnService.class) + public void testQuerySourcePrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + BigQueryIO.Read.Bound read = BigQueryIO.Read + .fromQuery("foobar") + .withoutValidation(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("BigQueryIO.Read should include the query in its primitive display data", + displayData, hasItem(hasDisplayItem("query"))); + } + + @Test public void testBuildSink() { BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("foo.com:project:somedataset.sometable"); @@ -644,6 +675,36 @@ public void testBuildSink() { null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY); } + @Test + @Category(RunnableOnService.class) + public void testBatchSinkPrimitiveDisplayData() { + testSinkPrimitiveDisplayData(/* streaming: */ false); + } + + @Test + @Category(RunnableOnService.class) + public void testStreamingSinkPrimitiveDisplayData() { + testSinkPrimitiveDisplayData(/* streaming: */ true); + } + + private void testSinkPrimitiveDisplayData(boolean streaming) { + PipelineOptions options = DisplayDataEvaluator.getDefaultOptions(); + options.as(StreamingOptions.class).setStreaming(streaming); + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options); + + BigQueryIO.Write.Bound write = BigQueryIO.Write + .to("project:dataset.table") + .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2")) + .withoutValidation(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("BigQueryIO.Write should include the table spec in its primitive display data", + displayData, hasItem(hasDisplayItem("tableSpec"))); + + assertThat("BigQueryIO.Write should include the table schema in its primitive display data", + displayData, hasItem(hasDisplayItem("schema"))); + } + @Test public void testBuildSinkwithoutValidation() { // This test just checks that using withoutValidation will not trigger object diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java index efa1cd2717c9..1e9ebf2d1529 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java @@ -19,18 +19,24 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.util.Set; + /** * Tests for PubsubIO Read and Write transforms. */ @@ -100,6 +106,19 @@ public void testReadDisplayData() { assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime)); } + @Test + @Category(RunnableOnService.class) + public void testPrimitiveReadDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PubsubIO.Read.Bound read = + PubsubIO.Read.subscription("projects/project/subscriptions/subscription") + .maxNumRecords(1); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("PubsubIO.Read should include the subscription in its primitive display data", + displayData, hasItem(hasDisplayItem("subscription"))); + } + @Test public void testWriteDisplayData() { String topic = "projects/project/topics/topic"; @@ -114,4 +133,15 @@ public void testWriteDisplayData() { assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp")); assertThat(displayData, hasDisplayItem("idLabel", "myId")); } + + @Test + @Category(RunnableOnService.class) + public void testPrimitiveWriteDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PubsubIO.Write.Bound write = PubsubIO.Write.topic("projects/project/topics/topic"); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("PubsubIO.Write should include the topic in its primitive display data", + displayData, hasItem(hasDisplayItem("topic"))); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index df598c8606c7..28e9ea4c0a39 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -22,8 +22,11 @@ import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -42,10 +45,12 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.IOChannelUtils; @@ -79,6 +84,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -185,6 +191,20 @@ public void testReadDisplayData() { assertThat(displayData, hasDisplayItem("validation", false)); } + @Test + @Category(RunnableOnService.class) + public void testPrimitiveReadDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + TextIO.Read.Bound read = TextIO.Read + .from("foobar") + .withoutValidation(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("TextIO.Read should include the file prefix in its primitive display data", + displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); + } + void runTestWrite(T[] elems, Coder coder) throws Exception { runTestWrite(elems, coder, 1); } @@ -314,6 +334,18 @@ public void testWriteDisplayData() { assertThat(displayData, hasDisplayItem("validation", false)); } + @Test + @Category(RunnableOnService.class) + public void testPrimitiveWriteDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + TextIO.Write.Bound write = TextIO.Write.to("foobar"); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("TextIO.Write should include the file prefix in its primitive display data", + displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); + } + @Test public void testUnsupportedFilePattern() throws IOException { File outFolder = tmpFolder.newFolder(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java index 9a87ed3bc4ff..dd222890d51e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -45,8 +46,14 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; +import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.util.TestCredential; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; import com.google.common.collect.Lists; import com.google.datastore.v1beta3.Entity; @@ -68,6 +75,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -80,6 +88,7 @@ import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; +import java.util.Set; /** * Tests for {@link V1Beta3}. @@ -198,6 +207,18 @@ public void testReadDisplayData() { assertThat(displayData, hasDisplayItem("namespace", NAMESPACE)); } + @Test + @Category(RunnableOnService.class) + public void testSourcePrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform read = DatastoreIO.v1beta3().read().withProjectId( + "myProject").withQuery(Query.newBuilder().build()); + + Set displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); + assertThat("DatastoreIO read should include the project in its primitive display data", + displayData, hasItem(hasDisplayItem("projectId"))); + } + @Test public void testWriteDoesNotAllowNullProject() throws Exception { thrown.expect(NullPointerException.class); @@ -232,6 +253,18 @@ public void testWriteDisplayData() { assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); } + @Test + @Category(RunnableOnService.class) + public void testSinkPrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform, ?> write = + DatastoreIO.v1beta3().write().withProjectId("myProject"); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("DatastoreIO write should include the project in its primitive display data", + displayData, hasItem(hasDisplayItem("projectId"))); + } + @Test public void testQuerySplitBasic() throws Exception { KindExpression mykind = KindExpression.newBuilder().setName("mykind").build(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 6f6c4a194611..b45308991178 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -50,6 +51,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.Context; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -63,6 +65,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.POutput; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableSet; @@ -711,6 +714,22 @@ public void populateDisplayData(DisplayData.Builder builder) { assertThat(displayData, hasDisplayItem(hasNamespace(combineFn.getClass()))); } + @Test + @Category(RunnableOnService.class) + public void testCombinePerKeyPrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts(); + PTransform>, ? extends POutput> combine = + Combine.perKey(combineFn); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(combine, + KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); + + assertThat("Combine.perKey should include the combineFn in its primitive transform", + displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); + } + //////////////////////////////////////////////////////////////////////////// // Test classes, for different kinds of combining fns. diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java index e6694d224fb7..f18504cf9936 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java @@ -19,13 +19,16 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; @@ -38,6 +41,7 @@ import org.junit.runners.JUnit4; import java.io.Serializable; +import java.util.Set; /** * Tests for {@link MapElements}. @@ -155,6 +159,24 @@ public Integer apply(Integer input) { assertThat(DisplayData.from(simpleMap), hasDisplayItem("mapFn", simpleFn.getClass())); } + @Test + @Category(RunnableOnService.class) + public void testPrimitiveDisplayData() { + SimpleFunction mapFn = new SimpleFunction() { + @Override + public Integer apply(Integer input) { + return input; + } + }; + + MapElements map = MapElements.via(mapFn); + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(map); + assertThat("MapElements should include the mapFn in its primitive display data", + displayData, hasItem(hasDisplayItem("mapFn", mapFn.getClass()))); + } + static class VoidValues extends PTransform>, PCollection>> { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java index a78a4ad0bc63..dc8c1e9fe90c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java @@ -41,10 +41,11 @@ public class DisplayDataEvaluator { private final PipelineOptions options; /** - * Create a new {@link DisplayDataEvaluator} using {@link TestPipeline#testingPipelineOptions()}. + * Create a new {@link DisplayDataEvaluator} using options returned from + * {@link #getDefaultOptions()}. */ public static DisplayDataEvaluator create() { - return create(TestPipeline.testingPipelineOptions()); + return create(getDefaultOptions()); } /** @@ -54,6 +55,13 @@ public static DisplayDataEvaluator create(PipelineOptions pipelineOptions) { return new DisplayDataEvaluator(pipelineOptions); } + /** + * The default {@link PipelineOptions} which will be used by {@link #create()}. + */ + public static PipelineOptions getDefaultOptions() { + return TestPipeline.testingPipelineOptions(); + } + private DisplayDataEvaluator(PipelineOptions options) { this.options = options; } From 6e5551c30c4a35696dae54ab1494905c6a60fdb1 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Fri, 8 Jul 2016 11:08:26 -0700 Subject: [PATCH 2/2] fixup! Adapt changes to migrate tests to DirectRunner / RunnableOnService --- runners/google-cloud-dataflow-java/pom.xml | 10 ------ .../apache/beam/sdk/transforms/Combine.java | 5 +++ .../apache/beam/sdk/io/BigQueryIOTest.java | 34 ++++++++++++------- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 76e5f80f4d59..9cd1fb43d889 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -289,11 +289,6 @@ runtime - - org.apache.avro - avro - - com.google.api-client google-api-client @@ -330,11 +325,6 @@ google-api-services-clouddebugger - - com.google.apis - google-api-services-bigquery - - com.google.cloud.bigdataoss util diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 5faf4e3e1ca6..9a87b36c9380 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -2295,6 +2295,11 @@ public void processElement(ProcessContext c) { c.output(KV.of(key, combineFnRunner.apply(key, c.element().getValue(), c))); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Combine.GroupedValues.this.populateDisplayData(builder); + } }).withSideInputs(sideInputs)); try { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index 3c00a085f83d..78d950e8b18c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -333,9 +333,10 @@ public JobStatistics dryRunQuery(String projectId, String query) @Rule public transient ExpectedException thrown = ExpectedException.none(); @Rule public transient ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class); @Rule public transient TemporaryFolder testFolder = new TemporaryFolder(); - @Mock public transient BigQueryServices.JobService mockJobService; + @Mock(extraInterfaces = Serializable.class) + public transient BigQueryServices.JobService mockJobService; @Mock private transient IOChannelFactory mockIOChannelFactory; - @Mock private transient DatasetService mockDatasetService; + @Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService; private transient BigQueryOptions bqOptions; @@ -642,10 +643,13 @@ public void testBuildSourceDisplayData() { @Test @Category(RunnableOnService.class) - public void testTableSourcePrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(bqOptions); BigQueryIO.Read.Bound read = BigQueryIO.Read .from("project:dataset.tableId") + .withTestServices(new FakeBigQueryServices() + .withDatasetService(mockDatasetService) + .withJobService(mockJobService)) .withoutValidation(); Set displayData = evaluator.displayDataForPrimitiveTransforms(read); @@ -655,10 +659,13 @@ public void testTableSourcePrimitiveDisplayData() { @Test @Category(RunnableOnService.class) - public void testQuerySourcePrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(bqOptions); BigQueryIO.Read.Bound read = BigQueryIO.Read .fromQuery("foobar") + .withTestServices(new FakeBigQueryServices() + .withDatasetService(mockDatasetService) + .withJobService(mockJobService)) .withoutValidation(); Set displayData = evaluator.displayDataForPrimitiveTransforms(read); @@ -677,24 +684,27 @@ public void testBuildSink() { @Test @Category(RunnableOnService.class) - public void testBatchSinkPrimitiveDisplayData() { + public void testBatchSinkPrimitiveDisplayData() throws IOException, InterruptedException { testSinkPrimitiveDisplayData(/* streaming: */ false); } @Test @Category(RunnableOnService.class) - public void testStreamingSinkPrimitiveDisplayData() { + public void testStreamingSinkPrimitiveDisplayData() throws IOException, InterruptedException { testSinkPrimitiveDisplayData(/* streaming: */ true); } - private void testSinkPrimitiveDisplayData(boolean streaming) { - PipelineOptions options = DisplayDataEvaluator.getDefaultOptions(); - options.as(StreamingOptions.class).setStreaming(streaming); - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options); + private void testSinkPrimitiveDisplayData(boolean streaming) throws IOException, + InterruptedException { + bqOptions.as(StreamingOptions.class).setStreaming(streaming); + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(bqOptions); BigQueryIO.Write.Bound write = BigQueryIO.Write .to("project:dataset.table") .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2")) + .withTestServices(new FakeBigQueryServices() + .withDatasetService(mockDatasetService) + .withJobService(mockJobService)) .withoutValidation(); Set displayData = evaluator.displayDataForPrimitiveTransforms(write);