From 9d9080a97050712845a31b777d1f739602e09e43 Mon Sep 17 00:00:00 2001 From: Jason White Date: Wed, 17 Aug 2016 01:35:47 +0200 Subject: [PATCH 1/2] add Create#empty --- .../apache/beam/sdk/transforms/Create.java | 15 ++++++++ .../beam/sdk/transforms/CreateTest.java | 35 +++++++++++++------ 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java index 4446517f7fce..085fb4c96aa3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java @@ -132,6 +132,21 @@ public static Values of(T... elems) { return of(Arrays.asList(elems)); } + /** + * Returns a new {@code Create.Values} transform that produces + * an empty {@link PCollection}. + * + *

The elements will have a timestamp of negative infinity, see + * {@link Create#timestamped} for a way of creating a {@code PCollection} + * with timestamped elements. + * + *

Since there are no elements, the {@code Coder} cannot be automatically determined. + * Instead, the {@code Coder} is provided via the {@code coder} argument. + */ + public static Values empty(Coder coder) { + return new Values<>(new ArrayList(), Optional.of(coder)); + } + /** * Returns a new {@code Create.Values} transform that produces a * {@link PCollection} of {@link KV}s corresponding to the keys and diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index a27ba1a39a86..93260cdfd4e3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -19,7 +19,6 @@ import static org.apache.beam.sdk.TestUtils.LINES; import static org.apache.beam.sdk.TestUtils.LINES_ARRAY; -import static org.apache.beam.sdk.TestUtils.NO_LINES; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -92,22 +91,35 @@ public void testCreate() { @Category(RunnableOnService.class) public void testCreateEmpty() { PCollection output = - p.apply(Create.of(NO_LINES) - .withCoder(StringUtf8Coder.of())); + p.apply(Create.empty(StringUtf8Coder.of())); PAssert.that(output) .containsInAnyOrder(NO_LINES_ARRAY); + + assertEquals(StringUtf8Coder.of(), output.getCoder()); p.run(); } @Test - public void testCreateEmptyInfersCoder() { + public void testCreateEmptyIterableRequiresCoder() { p.enableAbandonedNodeEnforcement(false); - PCollection output = - p.apply(Create.of()); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("default Create Coder"); + thrown.expectMessage("Create.empty(Coder)"); + thrown.expectMessage("withCoder(Coder)"); + p.apply(Create.of(Collections.emptyList())); + } + + @Test + @Category(NeedsRunner.class) + public void testCreateEmptyIterableWithCoder() { + PCollection output = + p.apply(Create.of(Collections.emptyList()).withCoder(VoidCoder.of())); assertEquals(VoidCoder.of(), output.getCoder()); + PAssert.that(output).empty(); + p.run(); } static class Record implements Serializable { @@ -250,13 +262,14 @@ public void testCreateTimestampedEmpty() { } @Test - public void testCreateTimestampedEmptyInfersCoder() { + public void testCreateTimestampedEmptyUnspecifiedCoder() { p.enableAbandonedNodeEnforcement(false); - PCollection output = p - .apply(Create.timestamped()); - - assertEquals(VoidCoder.of(), output.getCoder()); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("default Create Coder"); + thrown.expectMessage("Create.empty(Coder)"); + thrown.expectMessage("withCoder(Coder)"); + p.apply(Create.timestamped(new ArrayList>())); } @Test From 015cee64d6079c1feb7c5f410080f3f1664c2066 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 8 Feb 2017 16:56:49 -0800 Subject: [PATCH 2/2] Replace Create.of(T...) with Create.of(T, T...) This ensures that an empty Create must go through either Create.empty(Coder) or Create.of(Iterable). Enforce the non-emptiness of the elements iterable with no coder. --- .../beam/runners/apex/ApexRunnerTest.java | 5 ++- .../FlattenPCollectionTranslatorTest.java | 4 +- .../direct/CloningBundleFactoryTest.java | 8 ++-- ...ImmutabilityCheckingBundleFactoryTest.java | 2 +- .../beam/runners/flink/WriteSinkITCase.java | 3 +- .../beam/runners/dataflow/DataflowRunner.java | 7 +++- .../streaming/FlattenStreamingTest.java | 3 +- .../ResumeFromCheckpointStreamingTest.java | 4 +- .../apache/beam/sdk/transforms/Create.java | 17 +++++++-- .../org/apache/beam/sdk/io/AvroIOTest.java | 2 +- .../apache/beam/sdk/testing/PAssertTest.java | 2 +- .../beam/sdk/transforms/CombineTest.java | 4 +- .../beam/sdk/transforms/LatestTest.java | 11 ++---- .../apache/beam/sdk/transforms/ParDoTest.java | 11 ++---- .../beam/sdk/transforms/SampleTest.java | 38 ++++++++++++------- .../apache/beam/sdk/transforms/TopTest.java | 5 +-- .../apache/beam/sdk/transforms/ViewTest.java | 22 ++++++----- .../display/DisplayDataEvaluator.java | 8 +++- .../transforms/windowing/WindowingTest.java | 4 +- .../extensions/joinlibrary/InnerJoinTest.java | 14 ++++++- .../joinlibrary/OuterLeftJoinTest.java | 21 ++++++++-- .../joinlibrary/OuterRightJoinTest.java | 21 ++++++++-- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 21 +++++----- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 8 +++- .../apache/beam/sdk/io/jdbc/JdbcIOTest.java | 5 ++- 25 files changed, 159 insertions(+), 91 deletions(-) diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java index 436c9595a30a..0fedc47fd055 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java @@ -22,9 +22,9 @@ import com.datatorrent.stram.engine.OperatorContext; import java.io.File; import java.io.FileOutputStream; -import java.util.Collections; import java.util.Properties; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.junit.Assert; @@ -45,7 +45,8 @@ public void testConfigProperties() throws Exception { // default configuration from class path Pipeline p = Pipeline.create(options); - p.apply(operName, Create.of(Collections.emptyList())); + Create.Values empty = Create.empty(VoidCoder.of()); + p.apply(operName, empty); ApexRunnerResult result = (ApexRunnerResult) p.run(); result.cancel(); diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java index f5abc34a6eda..7e678e8485b4 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.apex.translation; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.Arrays; @@ -61,7 +62,8 @@ public void test() throws Exception { Set expected = Sets.newHashSet(); List> pcList = new ArrayList>(); for (String[] collection : collections) { - pcList.add(p.apply(Create.of(collection).withCoder(StringUtf8Coder.of()))); + pcList.add( + p.apply(Create.of(ImmutableList.copyOf(collection)).withCoder(StringUtf8Coder.of()))); expected.addAll(Arrays.asList(collection)); } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java index 505d3a2410e5..3d14a12f7592 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java @@ -130,7 +130,7 @@ public void keyedBundleWorkingCoderSucceedsClonesOutput() { @Test public void bundleEncodeFailsAddFails() { - PCollection pc = p.apply(Create.of().withCoder(new RecordNoEncodeCoder())); + PCollection pc = p.apply(Create.empty(new RecordNoEncodeCoder())); UncommittedBundle bundle = factory.createBundle(pc); thrown.expect(UserCodeException.class); @@ -141,7 +141,7 @@ public void bundleEncodeFailsAddFails() { @Test public void bundleDecodeFailsAddFails() { - PCollection pc = p.apply(Create.of().withCoder(new RecordNoDecodeCoder())); + PCollection pc = p.apply(Create.empty(new RecordNoDecodeCoder())); UncommittedBundle bundle = factory.createBundle(pc); thrown.expect(UserCodeException.class); @@ -152,7 +152,7 @@ public void bundleDecodeFailsAddFails() { @Test public void keyedBundleEncodeFailsAddFails() { - PCollection pc = p.apply(Create.of().withCoder(new RecordNoEncodeCoder())); + PCollection pc = p.apply(Create.empty(new RecordNoEncodeCoder())); UncommittedBundle bundle = factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), pc); @@ -164,7 +164,7 @@ public void keyedBundleEncodeFailsAddFails() { @Test public void keyedBundleDecodeFailsAddFails() { - PCollection pc = p.apply(Create.of().withCoder(new RecordNoDecodeCoder())); + PCollection pc = p.apply(Create.empty(new RecordNoDecodeCoder())); UncommittedBundle bundle = factory.createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), pc); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java index eccb3a66de48..838e0bd4b2ac 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java @@ -56,7 +56,7 @@ public class ImmutabilityCheckingBundleFactoryTest { @Before public void setup() { - created = p.apply(Create.of().withCoder(ByteArrayCoder.of())); + created = p.apply(Create.empty(ByteArrayCoder.of())); transformed = created.apply(ParDo.of(new IdentityDoFn())); DirectGraphVisitor visitor = new DirectGraphVisitor(); p.traverseTopologically(visitor); diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java index 37eedb220bac..6986663af46c 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertNotNull; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; import java.io.File; import java.io.IOException; import java.io.PrintWriter; @@ -78,7 +79,7 @@ public void stopCluster() throws Exception { private static void runProgram(String resultPath) { Pipeline p = FlinkTestPipeline.createForBatch(); - p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of()) + p.apply(Create.of(ImmutableList.copyOf(EXPECTED_RESULT))).setCoder(StringUtf8Coder.of()) .apply("CustomSink", Write.to(new MyCustomSink(resultPath))); p.run(); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5fdbc83a2028..db6a7d9774d7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -404,7 +404,12 @@ public OutputT apply( return windowed; } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass()) && ((PCollectionList) input).size() == 0) { - return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of()); + // This can cause downstream coder inference to be screwy. Most of the time, that won't be + // hugely impactful, because there will never be any elements encoded with this coder; + // the issue stems from flattening this with another PCollection. + return (OutputT) + Pipeline.applyTransform( + input.getPipeline().begin(), Create.empty(VoidCoder.of())); } else if (overrides.containsKey(transform.getClass())) { // It is the responsibility of whoever constructs overrides to ensure this is type safe. @SuppressWarnings("unchecked") diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index d36796a5a65f..530721bf373f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.spark.translation.streaming; +import com.google.common.collect.ImmutableList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -91,7 +92,7 @@ public void testFlattenBoundedUnbounded() throws Exception { PCollection windowedW1 = w1.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))); PCollection w2 = - p.apply(Create.of(WORDS_ARRAY_2)).setCoder(StringUtf8Coder.of()); + p.apply(Create.of(ImmutableList.copyOf(WORDS_ARRAY_2)).withCoder(StringUtf8Coder.of())); PCollection windowedW2 = w2.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))); PCollectionList list = PCollectionList.of(windowedW1).and(windowedW2); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 721d6178ebdb..7094c8673aff 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; @@ -171,7 +172,8 @@ private static SparkPipelineResult run(SparkPipelineOptions options) { Pipeline p = Pipeline.create(options); - PCollection expectedCol = p.apply(Create.of(EXPECTED).withCoder(StringUtf8Coder.of())); + PCollection expectedCol = + p.apply(Create.of(ImmutableList.copyOf(EXPECTED)).withCoder(StringUtf8Coder.of())); final PCollectionView> expectedView = expectedCol.apply(View.asList()); PCollection formattedKV = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java index 085fb4c96aa3..d6839077d51b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java @@ -128,8 +128,12 @@ public static Values of(Iterable elems) { * Otherwise, use {@link Create.Values#withCoder} to set the coder explicitly. */ @SafeVarargs - public static Values of(T... elems) { - return of(Arrays.asList(elems)); + public static Values of(T elem, T... elems) { + // This can't be an ImmutableList, as it may accept nulls + List input = new ArrayList<>(elems.length + 1); + input.add(elem); + input.addAll(Arrays.asList(elems)); + return of(input); } /** @@ -195,8 +199,8 @@ public static TimestampedValues timestamped(Iterable> */ @SafeVarargs public static TimestampedValues timestamped( - @SuppressWarnings("unchecked") TimestampedValue... elems) { - return timestamped(Arrays.asList(elems)); + TimestampedValue elem, @SuppressWarnings("unchecked") TimestampedValue... elems) { + return timestamped(ImmutableList.>builder().add(elem).add(elems).build()); } /** @@ -502,6 +506,11 @@ public void processElement(ProcessContext c) { private static Coder getDefaultCreateCoder(CoderRegistry registry, Iterable elems) throws CannotProvideCoderException { + checkArgument( + !Iterables.isEmpty(elems), + "Elements must be provided to construct the default Create Coder. To Create an empty " + + "PCollection, either call Create.empty(Coder), or call 'withCoder(Coder)' on the " + + "result PTransform"); // First try to deduce a coder using the types of the elements. Class elementClazz = Void.class; for (T elem : elems) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index b669968f7f86..c4b55260c8c1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -351,7 +351,7 @@ private void runTestWrite(String[] expectedElements, int numShards) throws IOExc } else { write = write.withoutSharding(); } - p.apply(Create.of(expectedElements)).apply(write); + p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write); p.run(); String shardNameTemplate = write.getShardNameTemplate(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index 1997bbeef337..e57a254603da 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -350,7 +350,7 @@ public void testWindowedContainsInAnyOrder() throws Exception { @Category(RunnableOnService.class) public void testEmpty() { PCollection vals = - pipeline.apply(Create.of().withCoder(VarLongCoder.of())); + pipeline.apply(Create.empty(VarLongCoder.of())); PAssert.that(vals).empty(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 890a36e35cdf..5b18384d6b5d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -443,7 +443,7 @@ public void testSessionsCombineWithContext() { @Category(RunnableOnService.class) public void testWindowedCombineEmpty() { PCollection mean = pipeline - .apply(Create.of().withCoder(BigEndianIntegerCoder.of())) + .apply(Create.empty(BigEndianIntegerCoder.of())) .apply(Window.into(FixedWindows.of(Duration.millis(1)))) .apply(Combine.globally(new MeanInts()).withoutDefaults()); @@ -610,7 +610,7 @@ public Integer apply(Integer left, Integer right) { @Category(RunnableOnService.class) public void testCombineGloballyAsSingletonView() { final PCollectionView view = pipeline - .apply("CreateEmptySideInput", Create.of().withCoder(BigEndianIntegerCoder.of())) + .apply("CreateEmptySideInput", Create.empty(BigEndianIntegerCoder.of())) .apply(Sum.integersGlobally().asSingletonView()); PCollection output = pipeline diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java index f71b8138b2a8..db8ade569cdf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java @@ -23,7 +23,6 @@ import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; - import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; @@ -37,7 +36,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; - import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -88,9 +86,7 @@ public void testGloballyOutputCoder() { @Test @Category(NeedsRunner.class) public void testGloballyEmptyCollection() { - PCollection emptyInput = p.apply(Create.of() - // Explicitly set coder such that then runner enforces encodability. - .withCoder(VarLongCoder.of())); + PCollection emptyInput = p.apply(Create.empty(VarLongCoder.of())); PCollection output = emptyInput.apply(Latest.globally()); PAssert.that(output).containsInAnyOrder((Long) null); @@ -130,9 +126,8 @@ public void testPerKeyOutputCoder() { @Category(NeedsRunner.class) public void testPerKeyEmptyCollection() { PCollection> output = - p.apply(Create.>of().withCoder(KvCoder.of( - StringUtf8Coder.of(), StringUtf8Coder.of()))) - .apply(Latest.perKey()); + p.apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))) + .apply(Latest.perKey()); PAssert.that(output).empty(); p.run(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 46cbd3225102..75c39ccd5eba 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -417,9 +417,6 @@ public void testParDoWithSideOutputs() { @Test @Category(RunnableOnService.class) public void testParDoEmptyWithSideOutputs() { - - List inputs = Arrays.asList(); - TupleTag mainOutputTag = new TupleTag("main"){}; TupleTag sideOutputTag1 = new TupleTag("side1"){}; TupleTag sideOutputTag2 = new TupleTag("side2"){}; @@ -427,7 +424,7 @@ public void testParDoEmptyWithSideOutputs() { TupleTag sideOutputTagUnwritten = new TupleTag("sideUnwritten"){}; PCollectionTuple outputs = pipeline - .apply(Create.of(inputs)) + .apply(Create.empty(VarIntCoder.of())) .apply(ParDo .of(new TestDoFn( Arrays.>asList(), @@ -437,6 +434,7 @@ public void testParDoEmptyWithSideOutputs() { TupleTagList.of(sideOutputTag3).and(sideOutputTag1) .and(sideOutputTagUnwritten).and(sideOutputTag2))); + List inputs = Collections.emptyList(); PAssert.that(outputs.get(mainOutputTag)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); @@ -457,15 +455,12 @@ public void testParDoEmptyWithSideOutputs() { @Test @Category(RunnableOnService.class) public void testParDoWithEmptySideOutputs() { - - List inputs = Arrays.asList(); - TupleTag mainOutputTag = new TupleTag("main"){}; TupleTag sideOutputTag1 = new TupleTag("side1"){}; TupleTag sideOutputTag2 = new TupleTag("side2"){}; PCollectionTuple outputs = pipeline - .apply(Create.of(inputs)) + .apply(Create.empty(VarIntCoder.of())) .apply(ParDo .of(new TestNoOutputDoFn()) .withOutputTags( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java index e21e07baf900..88b0145eac48 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -197,6 +198,15 @@ public static class VerifyCorrectSample this.expectedSize = expectedSize; } + /** + * expectedSize is the number of elements that the Sample should contain. expected is the set + * of elements that the sample may contain. + */ + VerifyCorrectSample(int expectedSize, Collection expected) { + this.expectedValues = (T[]) expected.toArray(); + this.expectedSize = expectedSize; + } + @Override @SuppressWarnings("unchecked") public Void apply(Iterable in) { @@ -228,10 +238,10 @@ public Void apply(Iterable in) { @Category(RunnableOnService.class) public void testSample() { - PCollection input = pipeline.apply(Create.of(DATA) - .withCoder(BigEndianIntegerCoder.of())); - PCollection> output = input.apply( - Sample.fixedSizeGlobally(3)); + PCollection input = + pipeline.apply( + Create.of(ImmutableList.copyOf(DATA)).withCoder(BigEndianIntegerCoder.of())); + PCollection> output = input.apply(Sample.fixedSizeGlobally(3)); PAssert.thatSingletonIterable(output) .satisfies(new VerifyCorrectSample<>(3, DATA)); @@ -242,8 +252,7 @@ public void testSample() { @Category(RunnableOnService.class) public void testSampleEmpty() { - PCollection input = pipeline.apply(Create.of(EMPTY) - .withCoder(BigEndianIntegerCoder.of())); + PCollection input = pipeline.apply(Create.empty(BigEndianIntegerCoder.of())); PCollection> output = input.apply( Sample.fixedSizeGlobally(3)); @@ -256,7 +265,7 @@ public void testSampleEmpty() { @Category(RunnableOnService.class) public void testSampleZero() { - PCollection input = pipeline.apply(Create.of(DATA) + PCollection input = pipeline.apply(Create.of(ImmutableList.copyOf(DATA)) .withCoder(BigEndianIntegerCoder.of())); PCollection> output = input.apply( Sample.fixedSizeGlobally(0)); @@ -270,8 +279,9 @@ public void testSampleZero() { @Category(RunnableOnService.class) public void testSampleInsufficientElements() { - PCollection input = pipeline.apply(Create.of(DATA) - .withCoder(BigEndianIntegerCoder.of())); + PCollection input = + pipeline.apply( + Create.of(ImmutableList.copyOf(DATA)).withCoder(BigEndianIntegerCoder.of())); PCollection> output = input.apply( Sample.fixedSizeGlobally(10)); @@ -284,8 +294,9 @@ public void testSampleInsufficientElements() { public void testSampleNegative() { pipeline.enableAbandonedNodeEnforcement(false); - PCollection input = pipeline.apply(Create.of(DATA) - .withCoder(BigEndianIntegerCoder.of())); + PCollection input = + pipeline.apply( + Create.of(ImmutableList.copyOf(DATA)).withCoder(BigEndianIntegerCoder.of())); input.apply(Sample.fixedSizeGlobally(-1)); } @@ -293,8 +304,9 @@ public void testSampleNegative() { @Category(RunnableOnService.class) public void testSampleMultiplicity() { - PCollection input = pipeline.apply(Create.of(REPEATED_DATA) - .withCoder(BigEndianIntegerCoder.of())); + PCollection input = + pipeline.apply( + Create.of(ImmutableList.copyOf(REPEATED_DATA)).withCoder(BigEndianIntegerCoder.of())); // At least one value must be selected with multiplicity. PCollection> output = input.apply( Sample.fixedSizeGlobally(6)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java index 89e0076877f6..e1ac54fc6f9c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java @@ -23,7 +23,6 @@ import java.io.Serializable; import java.util.Arrays; -import java.util.Collections; import java.util.Comparator; import java.util.List; import org.apache.beam.sdk.Pipeline; @@ -155,9 +154,7 @@ public void testTopEmptyWithIncompatibleWindows() { p.enableAbandonedNodeEnforcement(false); Bound windowingFn = Window.into(FixedWindows.of(Duration.standardDays(10L))); - PCollection input = - p.apply(Create.timestamped(Collections.emptyList(), Collections.emptyList())) - .apply(windowingFn); + PCollection input = p.apply(Create.empty(StringUtf8Coder.of())).apply(windowingFn); expectedEx.expect(IllegalStateException.class); expectedEx.expectMessage("Top"); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java index 0249ac98e71b..ee8bf407f902 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java @@ -146,7 +146,7 @@ public void processElement(ProcessContext c) { public void testEmptySingletonSideInput() throws Exception { final PCollectionView view = - pipeline.apply("CreateEmptyIntegers", Create.of().withCoder(VarIntCoder.of())) + pipeline.apply("CreateEmptyIntegers", Create.empty(VarIntCoder.of())) .apply(View.asSingleton()); pipeline.apply("Create123", Create.of(1, 2, 3)) @@ -261,7 +261,7 @@ public void processElement(ProcessContext c) { public void testEmptyListSideInput() throws Exception { final PCollectionView> view = - pipeline.apply("CreateEmptyView", Create.of().withCoder(VarIntCoder.of())) + pipeline.apply("CreateEmptyView", Create.empty(VarIntCoder.of())) .apply(View.asList()); PCollection results = @@ -396,7 +396,7 @@ public void processElement(ProcessContext c) { public void testEmptyIterableSideInput() throws Exception { final PCollectionView> view = - pipeline.apply("CreateEmptyView", Create.of().withCoder(VarIntCoder.of())) + pipeline.apply("CreateEmptyView", Create.empty(VarIntCoder.of())) .apply(View.asIterable()); PCollection results = @@ -678,7 +678,7 @@ public void processElement(ProcessContext c) { public void testEmptyMultimapSideInput() throws Exception { final PCollectionView>> view = - pipeline.apply("CreateEmptyView", Create.>of().withCoder( + pipeline.apply("CreateEmptyView", Create.empty( KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))) .apply(View.asMultimap()); @@ -706,9 +706,10 @@ public void processElement(ProcessContext c) { public void testEmptyMultimapSideInputWithNonDeterministicKeyCoder() throws Exception { final PCollectionView>> view = - pipeline.apply("CreateEmptyView", - Create.>of().withCoder( - KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of()))) + pipeline + .apply( + "CreateEmptyView", + Create.empty(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of()))) .apply(View.asMultimap()); PCollection results = @@ -982,8 +983,9 @@ public void processElement(ProcessContext c) { public void testEmptyMapSideInput() throws Exception { final PCollectionView> view = - pipeline.apply("CreateEmptyView", Create.>of().withCoder( - KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))) + pipeline + .apply( + "CreateEmptyView", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))) .apply(View.asMap()); PCollection results = @@ -1010,7 +1012,7 @@ public void processElement(ProcessContext c) { public void testEmptyMapSideInputWithNonDeterministicKeyCoder() throws Exception { final PCollectionView> view = - pipeline.apply("CreateEmptyView", Create.>of().withCoder( + pipeline.apply("CreateEmptyView", Create.empty( KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of()))) .apply(View.asMap()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java index 31ac91303a96..8f57f45b0ac4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformHierarchy; @@ -89,9 +90,12 @@ public Set displayDataForPrimitiveTransforms( final PTransform, ? extends POutput> root, Coder inputCoder) { - Create.Values input = Create.of(); + Create.Values input; if (inputCoder != null) { - input = input.withCoder(inputCoder); + input = Create.empty(inputCoder); + } else { + // These types don't actually work, but the pipeline will never be run + input = (Create.Values) Create.empty(VoidCoder.of()); } Pipeline pipeline = Pipeline.create(options); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java index f7ae5d8d74b4..b0976e432720 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java @@ -189,9 +189,7 @@ public void testWindowPreservation() { @Test @Category(NeedsRunner.class) public void testEmptyInput() { - PCollection input = - p.apply(Create.timestamped() - .withCoder(StringUtf8Coder.of())); + PCollection input = p.apply(Create.empty(StringUtf8Coder.of())); PCollection output = input diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java index 1c120c21faa7..006f57c6031d 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java @@ -19,6 +19,9 @@ import java.util.ArrayList; import java.util.List; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -133,12 +136,19 @@ public void testJoinNoneToNoneMapping() { @Test(expected = NullPointerException.class) public void testJoinLeftCollectionNull() { p.enableAbandonedNodeEnforcement(false); - Join.innerJoin(null, p.apply(Create.of(listRightOfKv))); + Join.innerJoin( + null, + p.apply( + Create.of(listRightOfKv) + .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))); } @Test(expected = NullPointerException.class) public void testJoinRightCollectionNull() { p.enableAbandonedNodeEnforcement(false); - Join.innerJoin(p.apply(Create.of(leftListOfKv)), null); + Join.innerJoin( + p.apply( + Create.of(leftListOfKv).withCoder(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))), + null); } } diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java index 81f4fa3231fc..692f47100932 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java @@ -19,6 +19,9 @@ import java.util.ArrayList; import java.util.List; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -135,21 +138,31 @@ public void testJoinOneToNoneMapping() { @Test(expected = NullPointerException.class) public void testJoinLeftCollectionNull() { p.enableAbandonedNodeEnforcement(false); - Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), ""); + Join.leftOuterJoin( + null, + p.apply( + Create.of(listRightOfKv) + .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))), + ""); } @Test(expected = NullPointerException.class) public void testJoinRightCollectionNull() { p.enableAbandonedNodeEnforcement(false); - Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, ""); + Join.leftOuterJoin( + p.apply( + Create.of(leftListOfKv).withCoder(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))), + null, + ""); } @Test(expected = NullPointerException.class) public void testJoinNullValueIsNull() { p.enableAbandonedNodeEnforcement(false); Join.leftOuterJoin( - p.apply("CreateLeft", Create.of(leftListOfKv)), - p.apply("CreateRight", Create.of(listRightOfKv)), + p.apply("CreateLeft", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))), + p.apply( + "CreateRight", Create.empty(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))), null); } } diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java index 249cea32e2e2..4805a9b80c49 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java @@ -19,6 +19,9 @@ import java.util.ArrayList; import java.util.List; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -135,21 +138,31 @@ public void testJoinNoneToOneMapping() { @Test(expected = NullPointerException.class) public void testJoinLeftCollectionNull() { p.enableAbandonedNodeEnforcement(false); - Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), ""); + Join.rightOuterJoin( + null, + p.apply( + Create.of(listRightOfKv) + .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))), + ""); } @Test(expected = NullPointerException.class) public void testJoinRightCollectionNull() { p.enableAbandonedNodeEnforcement(false); - Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L); + Join.rightOuterJoin( + p.apply( + Create.of(leftListOfKv).withCoder(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))), + null, + -1L); } @Test(expected = NullPointerException.class) public void testJoinNullValueIsNull() { p.enableAbandonedNodeEnforcement(false); Join.rightOuterJoin( - p.apply("CreateLeft", Create.of(leftListOfKv)), - p.apply("CreateRight", Create.of(listRightOfKv)), + p.apply("CreateLeft", Create.empty(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))), + p.apply( + "CreateRight", Create.empty(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))), null); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 25704d1bde49..3aa90cfedfb1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -98,6 +98,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TableRowJsonCoder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CountingInput; @@ -1430,8 +1431,7 @@ public void testBuildWriteWithoutTable() { thrown.expect(IllegalStateException.class); thrown.expectMessage("must set the table reference"); - p.apply(Create.of().withCoder(TableRowJsonCoder.of())) - .apply(BigQueryIO.Write.withoutValidation()); + p.apply(Create.empty(TableRowJsonCoder.of())).apply(BigQueryIO.Write.withoutValidation()); } @Test @@ -1571,7 +1571,8 @@ public TableRow apply(Long input) { })) .setCoder(TableRowJsonCoder.of()); } else { - tableRows = p.apply(Create.of().withCoder(TableRowJsonCoder.of())); + tableRows = p + .apply(Create.empty(TableRowJsonCoder.of())); } thrown.expect(RuntimeException.class); @@ -1710,7 +1711,7 @@ public void testWriteValidateFailsCreateNoSchema() { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("no schema was provided"); p - .apply(Create.of()) + .apply(Create.empty(TableRowJsonCoder.of())) .apply(BigQueryIO.Write .to("dataset.table") .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)); @@ -1723,7 +1724,7 @@ public void testWriteValidateFailsTableAndTableSpec() { thrown.expect(IllegalStateException.class); thrown.expectMessage("Cannot set both a table reference and a table function"); p - .apply(Create.of()) + .apply(Create.empty(TableRowJsonCoder.of())) .apply(BigQueryIO.Write .to("dataset.table") .to(new SerializableFunction() { @@ -1741,7 +1742,7 @@ public void testWriteValidateFailsNoTableAndNoTableSpec() { thrown.expect(IllegalStateException.class); thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform"); p - .apply(Create.of()) + .apply(Create.empty(TableRowJsonCoder.of())) .apply("name", BigQueryIO.Write.withoutValidation()); } @@ -2084,7 +2085,7 @@ void cleanup(PipelineOptions options) throws Exception { @Category(NeedsRunner.class) public void testPassThroughThenCleanupExecuted() throws Exception { - p.apply(Create.of()) + p.apply(Create.empty(VarIntCoder.of())) .apply(new PassThroughThenCleanup(new CleanupOperation() { @Override void cleanup(PipelineOptions options) throws Exception { @@ -2388,7 +2389,7 @@ public void testRuntimeOptionsNotCalledInApplyOutput() { options.getOutputSchema(), new JsonSchemaToTableSchema())) .withoutValidation(); pipeline - .apply(Create.of()) + .apply(Create.empty(TableRowJsonCoder.of())) .apply(write); // Test that this doesn't throw. DisplayData.from(write); @@ -2490,10 +2491,10 @@ public void testUniqueStepIdWrite() { options.getOutputSchema(), new JsonSchemaToTableSchema())) .withoutValidation(); pipeline - .apply(Create.of()) + .apply(Create.empty(TableRowJsonCoder.of())) .apply(write1); pipeline - .apply(Create.of()) + .apply(Create.empty(TableRowJsonCoder.of())) .apply(write2); assertNotEquals(write1.stepUuid, write2.stepUuid); assertNotEquals(write1.jobUuid.get(), write2.jobUuid.get()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index bc71b8c884a7..878785b111d1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -74,7 +74,11 @@ import java.util.TreeMap; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; +import org.apache.beam.sdk.coders.ByteStringCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.protobuf.ProtoCoder; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource; import org.apache.beam.sdk.io.range.ByteKey; @@ -612,7 +616,9 @@ public void testWritingFailsTableDoesNotExist() throws Exception { final String table = "TEST-TABLE"; PCollection>> emptyInput = - p.apply(Create.>>of()); + p.apply( + Create.empty( + KvCoder.of(ByteStringCoder.of(), IterableCoder.of(ProtoCoder.of(Mutation.class))))); // Exception will be thrown by write.validate() when write is applied. thrown.expect(IllegalArgumentException.class); diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index eec7cb856cb9..3dd4df4fd0b6 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -30,10 +30,10 @@ import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; - import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -318,7 +318,8 @@ public void setParameters(KV element, PreparedStatement stateme @Category(NeedsRunner.class) public void testWriteWithEmptyPCollection() throws Exception { - pipeline.apply(Create.of(new ArrayList>())) + pipeline + .apply(Create.empty(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))) .apply(JdbcIO.>write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "org.apache.derby.jdbc.ClientDriver",