From f404c345b7756cf6ecc08465c459df80fd50e332 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Mon, 24 Jul 2017 16:38:27 -0700 Subject: [PATCH] [BEAM-1820] Source.getDefaultOutputCoder() throws CannotProvideCoderException This is based on https://github.com/apache/beam/pull/3549 by @lgajowy, but it turned out that a lot more needs to be done to fix it properly, and the necessary changes require knowledge of sufficiently dark corners of the SDK and runners that asking an external contributor to do this is unfair, so I did the changes myself. TL;DR: callers of Source.getDefaultOutputCoder shouldn't require it to succeed. Source.getDefaultOutputCoder is only a hint for inferring the Coder of its elements, and it should not be required to succeed. E.g. when a runner is replacing a Read.from(source) transform, and the override needs to know a coder for elements of the source, if the source doesn't provide a coder, the user may have set a coder on the returned PCollection explicitly. In this case, the runner should use that coder. In other cases, when an API uses a Source and needs its coder, it must let the caller provide a Coder explicitly (e.g. SourceTestUtils). --- .../translation/ApexPipelineTranslator.java | 3 +- .../UnboundedReadFromBoundedSource.java | 22 +-- .../UnboundedReadFromBoundedSourceTest.java | 11 +- .../beam/runners/direct/DirectRunnerTest.java | 3 +- .../beam/runners/dataflow/DataflowRunner.java | 80 ++++++++--- .../beam/runners/spark/TestSparkRunner.java | 12 +- .../runners/spark/io/MicrobatchSource.java | 18 ++- .../beam/runners/spark/io/SourceDStream.java | 5 + .../spark/io/SparkUnboundedSource.java | 14 +- .../spark/stateful/StateSpecFunctions.java | 2 +- .../StreamingTransformTranslator.java | 1 + .../io/BoundedReadFromUnboundedSource.java | 53 +++++-- .../apache/beam/sdk/io/CompressedSource.java | 3 +- .../java/org/apache/beam/sdk/io/Read.java | 14 +- .../java/org/apache/beam/sdk/io/Source.java | 12 +- .../beam/sdk/testing/SourceTestUtils.java | 115 ++++++++++++--- .../java/org/apache/beam/sdk/io/ReadTest.java | 135 ++++++++++++++++++ 17 files changed, 414 insertions(+), 89 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java index 02f53eccdc12..82479b28ca85 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,7 +149,7 @@ private static class ReadBoundedTranslator implements TransformTranslator transform, TranslationContext context) { // TODO: adapter is visibleForTesting BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>( - transform.getSource()); + transform.getSource(), context.>getOutput().getCoder()); ApexReadUnboundedInputOperator operator = new ApexReadUnboundedInputOperator<>( unboundedSource, true, context.getPipelineOptions()); context.addOperator(operator, operator.output); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java index 24eb38479268..afdb15cae969 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java @@ -76,23 +76,27 @@ public class UnboundedReadFromBoundedSource extends PTransform source; + // Coder of the actual PCollection being replaced - potentially set explicitly by the user + // via PCollection.setCoder(). + private final Coder outputCoder; /** * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}. */ - public UnboundedReadFromBoundedSource(BoundedSource source) { + public UnboundedReadFromBoundedSource(BoundedSource source, Coder outputCoder) { this.source = source; + this.outputCoder = outputCoder; } @Override public PCollection expand(PBegin input) { return input.getPipeline().apply( - Read.from(new BoundedToUnboundedSourceAdapter<>(source))); + Read.from(new BoundedToUnboundedSourceAdapter<>(source, outputCoder))); } @Override protected Coder getDefaultOutputCoder() { - return source.getDefaultOutputCoder(); + return outputCoder; } @Override @@ -115,10 +119,12 @@ public void populateDisplayData(DisplayData.Builder builder) { public static class BoundedToUnboundedSourceAdapter extends UnboundedSource> { - private BoundedSource boundedSource; + private final BoundedSource boundedSource; + private final Coder outputCoder; - public BoundedToUnboundedSourceAdapter(BoundedSource boundedSource) { + public BoundedToUnboundedSourceAdapter(BoundedSource boundedSource, Coder outputCoder) { this.boundedSource = boundedSource; + this.outputCoder = outputCoder; } @Override @@ -147,7 +153,7 @@ public List> split( new Function, BoundedToUnboundedSourceAdapter>() { @Override public BoundedToUnboundedSourceAdapter apply(BoundedSource input) { - return new BoundedToUnboundedSourceAdapter<>(input); + return new BoundedToUnboundedSourceAdapter<>(input, outputCoder); }}); } catch (Exception e) { LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e); @@ -167,13 +173,13 @@ public Reader createReader(PipelineOptions options, Checkpoint checkpoint) @Override public Coder getDefaultOutputCoder() { - return boundedSource.getDefaultOutputCoder(); + return outputCoder; } @SuppressWarnings({"rawtypes", "unchecked"}) @Override public Coder> getCheckpointMarkCoder() { - return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder()); + return new CheckpointCoder<>(outputCoder); } @VisibleForTesting diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java index 0e48a9dc26b5..ac312563d574 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.FileBasedSource; @@ -106,7 +107,7 @@ public void testBoundedToUnboundedSourceAdapter() throws Exception { long numElements = 100; BoundedSource boundedSource = CountingSource.upTo(numElements); UnboundedSource> unboundedSource = - new BoundedToUnboundedSourceAdapter<>(boundedSource); + new BoundedToUnboundedSourceAdapter<>(boundedSource, VarLongCoder.of()); PCollection output = p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements)); @@ -161,7 +162,7 @@ private void testBoundedToUnboundedSourceAdapterCheckpoint( BoundedSource boundedSource, List expectedElements) throws Exception { BoundedToUnboundedSourceAdapter unboundedSource = - new BoundedToUnboundedSourceAdapter<>(boundedSource); + new BoundedToUnboundedSourceAdapter<>(boundedSource, boundedSource.getDefaultOutputCoder()); PipelineOptions options = PipelineOptionsFactory.create(); BoundedToUnboundedSourceAdapter.Reader reader = @@ -214,7 +215,7 @@ private void testBoundedToUnboundedSourceAdapterCheckpointRestart( BoundedSource boundedSource, List expectedElements) throws Exception { BoundedToUnboundedSourceAdapter unboundedSource = - new BoundedToUnboundedSourceAdapter<>(boundedSource); + new BoundedToUnboundedSourceAdapter<>(boundedSource, boundedSource.getDefaultOutputCoder()); PipelineOptions options = PipelineOptionsFactory.create(); BoundedToUnboundedSourceAdapter.Reader reader = @@ -255,7 +256,7 @@ public void testReadBeforeStart() throws Exception { BoundedSource countingSource = CountingSource.upTo(100); BoundedToUnboundedSourceAdapter unboundedSource = - new BoundedToUnboundedSourceAdapter<>(countingSource); + new BoundedToUnboundedSourceAdapter<>(countingSource, VarLongCoder.of()); PipelineOptions options = PipelineOptionsFactory.create(); unboundedSource.createReader(options, null).getCurrent(); @@ -267,7 +268,7 @@ public void testReadFromCheckpointBeforeStart() throws Exception { BoundedSource countingSource = CountingSource.upTo(100); BoundedToUnboundedSourceAdapter unboundedSource = - new BoundedToUnboundedSourceAdapter<>(countingSource); + new BoundedToUnboundedSourceAdapter<>(countingSource, VarLongCoder.of()); PipelineOptions options = PipelineOptionsFactory.create(); List> elements = diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 943d27c07ad1..8faff7f4352b 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.ListCoder; @@ -573,7 +574,7 @@ public void validate() { } @Override - public Coder getDefaultOutputCoder() { + public Coder getDefaultOutputCoder() throws CannotProvideCoderException { return underlying.getDefaultOutputCoder(); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 762ac9fcd43f..78e0c990b707 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -358,11 +358,11 @@ private List getOverrides(boolean streaming) { // must precede it PTransformOverride.of( PTransformMatchers.classEqualTo(Read.Bounded.class), - new ReflectiveRootOverrideFactory(StreamingBoundedRead.class, this))) + new StreamingBoundedReadOverrideFactory())) .add( PTransformOverride.of( PTransformMatchers.classEqualTo(Read.Unbounded.class), - new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this))) + new StreamingUnboundedReadOverrideFactory())) .add( PTransformOverride.of( PTransformMatchers.classEqualTo(View.CreatePCollectionView.class), @@ -1156,6 +1156,26 @@ public void translate(Impulse transform, TranslationContext context) { } } + private static class StreamingUnboundedReadOverrideFactory + implements PTransformOverrideFactory, Read.Unbounded> { + @Override + public PTransformReplacement> getReplacementTransform( + AppliedPTransform, Read.Unbounded> transform) { + return PTransformReplacement.of( + transform.getPipeline().begin(), + new StreamingUnboundedRead( + transform.getTransform().getSource(), + ((PCollection) Iterables.getOnlyElement(transform.getOutputs().values())) + .getCoder())); + } + + @Override + public Map mapOutputs( + Map, PValue> outputs, PCollection newOutput) { + return ReplacementOutputs.singleton(outputs, newOutput); + } + } + /** * Specialized implementation for * {@link org.apache.beam.sdk.io.Read.Unbounded Read.Unbounded} for the @@ -1166,16 +1186,13 @@ public void translate(Impulse transform, TranslationContext context) { */ private static class StreamingUnboundedRead extends PTransform> { private final UnboundedSource source; + private final Coder outputCoder; /** Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingUnboundedRead(DataflowRunner runner, Read.Unbounded transform) { - this.source = transform.getSource(); - } - - @Override - protected Coder getDefaultOutputCoder() { - return source.getDefaultOutputCoder(); + public StreamingUnboundedRead(UnboundedSource source, Coder outputCoder) { + this.source = source; + this.outputCoder = outputCoder; } @Override @@ -1183,10 +1200,10 @@ public final PCollection expand(PBegin input) { source.validate(); if (source.requiresDeduping()) { - return Pipeline.applyTransform(input, new ReadWithIds<>(source)) + return Pipeline.applyTransform(input, new ReadWithIds<>(source, outputCoder)) .apply(new Deduplicate()); } else { - return Pipeline.applyTransform(input, new ReadWithIds<>(source)) + return Pipeline.applyTransform(input, new ReadWithIds<>(source, outputCoder)) .apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn())); } } @@ -1198,9 +1215,11 @@ public final PCollection expand(PBegin input) { private static class ReadWithIds extends PTransform>> { private final UnboundedSource source; + private final Coder outputCoder; - private ReadWithIds(UnboundedSource source) { + private ReadWithIds(UnboundedSource source, Coder outputCoder) { this.source = source; + this.outputCoder = outputCoder; } @Override @@ -1211,7 +1230,7 @@ public final PCollection> expand(PInput input) { @Override protected Coder> getDefaultOutputCoder() { - return ValueWithRecordId.ValueWithRecordIdCoder.of(source.getDefaultOutputCoder()); + return ValueWithRecordId.ValueWithRecordIdCoder.of(outputCoder); } @Override @@ -1275,29 +1294,46 @@ public void processElement(ProcessContext c) { } } + private static class StreamingBoundedReadOverrideFactory + implements PTransformOverrideFactory, Read.Bounded> { + @Override + public PTransformReplacement> getReplacementTransform( + AppliedPTransform, Read.Bounded> transform) { + return PTransformReplacement.of( + transform.getPipeline().begin(), + new StreamingBoundedRead( + transform.getTransform().getSource(), + ((PCollection) Iterables.getOnlyElement(transform.getOutputs().values())) + .getCoder())); + } + + @Override + public Map mapOutputs( + Map, PValue> outputs, PCollection newOutput) { + return ReplacementOutputs.singleton(outputs, newOutput); + } + } + /** * Specialized implementation for {@link org.apache.beam.sdk.io.Read.Bounded Read.Bounded} for the * Dataflow runner in streaming mode. */ private static class StreamingBoundedRead extends PTransform> { private final BoundedSource source; + private final Coder outputCoder; /** Builds an instance of this class from the overridden transform. */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingBoundedRead(DataflowRunner runner, Read.Bounded transform) { - this.source = transform.getSource(); - } - - @Override - protected Coder getDefaultOutputCoder() { - return source.getDefaultOutputCoder(); + public StreamingBoundedRead(BoundedSource source, Coder outputCoder) { + this.source = source; + this.outputCoder = outputCoder; } @Override public final PCollection expand(PBegin input) { source.validate(); - return Pipeline.applyTransform(input, new UnboundedReadFromBoundedSource<>(source)) + return Pipeline.applyTransform( + input, new UnboundedReadFromBoundedSource<>(source, outputCoder)) .setIsBoundedInternal(IsBounded.BOUNDED); } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index a13a3b141aa4..2c45cbdec076 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -40,6 +40,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; @@ -197,8 +199,16 @@ private static class AdaptedBoundedAsUnbounded extends PTransform expand(PBegin input) { + Coder> coder; + try { + coder = source.getAdaptedSource().getDefaultOutputCoder(); + } catch (CannotProvideCoderException e) { + // UnboundedToBoundedSourceAdapter.getDefaultOutputCoder doesn't throw. + throw new RuntimeException(e); + } PTransform>> replacingTransform = - new UnboundedReadFromBoundedSource<>(source.getAdaptedSource()); + new UnboundedReadFromBoundedSource<>( + source.getAdaptedSource(), coder); return (PCollection) input.apply(replacingTransform) .apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn())); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java index 3b48caf3ef85..d28b5af014fd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java @@ -54,6 +54,7 @@ public class MicrobatchSource, Source.Reader> readerCache; private final UnboundedSource source; + private final Coder outputCoder; private final Duration maxReadTime; private final int numInitialSplits; private final long maxNumRecords; @@ -66,6 +67,7 @@ public class MicrobatchSource source, + final Coder outputCoder, final Duration maxReadTime, final int numInitialSplits, final long maxNumRecords, @@ -73,6 +75,7 @@ public class MicrobatchSource> split(final PipelineOptions options) throws Exception // for example: Kafka should not add partitions if more then one topic is read. result.add( new MicrobatchSource<>( - splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId, readerCacheInterval)); + splits.get(i), + outputCoder, + maxReadTime, + 1, + numRecords[i], + i, + sourceId, + readerCacheInterval)); } return result; } @@ -139,9 +149,13 @@ public void validate() { source.validate(); } + public Coder getOutputCoder() { + return outputCoder; + } + @Override public Coder getDefaultOutputCoder() { - return source.getDefaultOutputCoder(); + return getOutputCoder(); } public Coder getCheckpointMarkCoder() { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java index 20aca5f10fdc..a68d43f8ad1c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java @@ -22,6 +22,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.spark.api.java.JavaSparkContext$; @@ -58,6 +59,7 @@ class SourceDStream private static final Logger LOG = LoggerFactory.getLogger(SourceDStream.class); private final UnboundedSource unboundedSource; + private final Coder outputCoder; private final SparkRuntimeContext runtimeContext; private final Duration boundReadDuration; // Reader cache interval to expire readers if they haven't been accessed in the last microbatch. @@ -81,10 +83,12 @@ class SourceDStream SourceDStream( StreamingContext ssc, UnboundedSource unboundedSource, + Coder outputCoder, SparkRuntimeContext runtimeContext, Long boundMaxRecords) { super(ssc, JavaSparkContext$.MODULE$., CheckpointMarkT>>fakeClassTag()); this.unboundedSource = unboundedSource; + this.outputCoder = outputCoder; this.runtimeContext = runtimeContext; SparkPipelineOptions options = runtimeContext.getPipelineOptions().as( @@ -125,6 +129,7 @@ public scala.Option, CheckpointMarkT>>> compute(Time validT private MicrobatchSource createMicrobatchSource() { return new MicrobatchSource<>(unboundedSource, + outputCoder, boundReadDuration, initialParallelism, computeReadMaxRecords(), diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java index 7106c73866a1..7e3cc4118331 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java @@ -31,6 +31,7 @@ import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; @@ -82,12 +83,13 @@ public static UnboundedDataset re JavaStreamingContext jssc, SparkRuntimeContext rc, UnboundedSource source, + Coder outputCoder, String stepName) { SparkPipelineOptions options = rc.getPipelineOptions().as(SparkPipelineOptions.class); Long maxRecordsPerBatch = options.getMaxRecordsPerBatch(); SourceDStream sourceDStream = - new SourceDStream<>(jssc.ssc(), source, rc, maxRecordsPerBatch); + new SourceDStream<>(jssc.ssc(), source, outputCoder, rc, maxRecordsPerBatch); JavaPairInputDStream, CheckpointMarkT> inputDStream = JavaPairInputDStream$.MODULE$.fromInputDStream(sourceDStream, @@ -114,10 +116,12 @@ public static UnboundedDataset re .register(); // output the actual (deserialized) stream. - WindowedValue.FullWindowedValueCoder coder = - WindowedValue.FullWindowedValueCoder.of( - source.getDefaultOutputCoder(), - GlobalWindow.Coder.INSTANCE); + WindowedValue.FullWindowedValueCoder coder= + + WindowedValue.FullWindowedValueCoder.of( + outputCoder, + GlobalWindow.Coder.INSTANCE); + JavaDStream> readUnboundedStream = mapWithStateDStream .flatMap(new Tuple2byteFlatMapFunction()) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java index 549bd30a11aa..3db065af554d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java @@ -161,7 +161,7 @@ public Tuple2, Metadata> apply( final List readValues = new ArrayList<>(); WindowedValue.FullWindowedValueCoder coder = WindowedValue.FullWindowedValueCoder.of( - source.getDefaultOutputCoder(), + microbatchSource.getOutputCoder(), GlobalWindow.Coder.INSTANCE); try { // measure how long a read takes per-partition. diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index cd5bb3ee5df6..00e2dcbb366e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -127,6 +127,7 @@ public void evaluate(Read.Unbounded transform, EvaluationContext context) { context.getStreamingContext(), context.getRuntimeContext(), transform.getSource(), + context.getOutput(transform).getCoder(), stepName)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index c882447d7ccc..276064c9e772 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -27,6 +27,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Distinct; @@ -50,9 +51,9 @@ */ public class BoundedReadFromUnboundedSource extends PTransform> { private final UnboundedSource source; + private final Coder coder; private final long maxNumRecords; private final Duration maxReadTime; - private final BoundedSource> adaptedSource; private static final FluentBackoff BACKOFF_FACTORY = FluentBackoff.DEFAULT .withInitialBackoff(Duration.millis(10)) @@ -67,7 +68,7 @@ public class BoundedReadFromUnboundedSource extends PTransform withMaxNumRecords(long maxNumRecords) { - return new BoundedReadFromUnboundedSource(source, maxNumRecords, maxReadTime); + return new BoundedReadFromUnboundedSource(source, coder, maxNumRecords, maxReadTime); } /** @@ -76,20 +77,26 @@ public BoundedReadFromUnboundedSource withMaxNumRecords(long maxNumRecords) { * of time to read for. Each split of the source will read for this much time. */ public BoundedReadFromUnboundedSource withMaxReadTime(Duration maxReadTime) { - return new BoundedReadFromUnboundedSource(source, maxNumRecords, maxReadTime); + return new BoundedReadFromUnboundedSource(source, coder, maxNumRecords, maxReadTime); + } + + /** + * Sets the coder for contents of the source, in case the source itself is unable to provide a + * coder. + */ + public BoundedReadFromUnboundedSource withCoder(Coder coder) { + return new BoundedReadFromUnboundedSource(source, coder, maxNumRecords, maxReadTime); } BoundedReadFromUnboundedSource( - UnboundedSource source, long maxNumRecords, Duration maxReadTime) { + UnboundedSource source, + @Nullable Coder coder, + long maxNumRecords, + Duration maxReadTime) { this.source = source; + this.coder = coder; this.maxNumRecords = maxNumRecords; this.maxReadTime = maxReadTime; - this.adaptedSource = - new AutoValue_BoundedReadFromUnboundedSource_UnboundedToBoundedSourceAdapter - .Builder() - .setSource(source) - .setMaxNumRecords(maxNumRecords) - .setMaxReadTime(maxReadTime).build(); } /** @@ -98,7 +105,25 @@ public BoundedReadFromUnboundedSource withMaxReadTime(Duration maxReadTime) { */ @Experimental public BoundedSource> getAdaptedSource() { - return adaptedSource; + Coder coder = this.coder; + if (coder == null) { + try { + coder = source.getDefaultOutputCoder(); + } catch (CannotProvideCoderException e) { + throw new IllegalArgumentException( + "Source " + + source + + " is unable to provide a coder, " + + "specify a coder explicitly using .withCoder()", + e); + } + } + return new AutoValue_BoundedReadFromUnboundedSource_UnboundedToBoundedSourceAdapter.Builder() + .setSource(source) + .setCoder(coder) + .setMaxNumRecords(maxNumRecords) + .setMaxReadTime(maxReadTime) + .build(); } @Override @@ -119,7 +144,7 @@ public byte[] apply(ValueWithRecordId input) { @Override protected Coder getDefaultOutputCoder() { - return source.getDefaultOutputCoder(); + return coder; } @Override @@ -148,6 +173,7 @@ public void populateDisplayData(DisplayData.Builder builder) { abstract static class UnboundedToBoundedSourceAdapter extends BoundedSource> { @Nullable abstract UnboundedSource getSource(); + @Nullable abstract Coder getCoder(); abstract long getMaxNumRecords(); @Nullable abstract Duration getMaxReadTime(); @@ -156,6 +182,7 @@ abstract static class UnboundedToBoundedSourceAdapter @AutoValue.Builder abstract static class Builder { abstract Builder setSource(UnboundedSource source); + abstract Builder setCoder(Coder coder); abstract Builder setMaxNumRecords(long maxNumRecords); abstract Builder setMaxReadTime(Duration maxReadTime); abstract UnboundedToBoundedSourceAdapter build(); @@ -212,7 +239,7 @@ public long getEstimatedSizeBytes(PipelineOptions options) { @Override public Coder> getDefaultOutputCoder() { - return ValueWithRecordId.ValueWithRecordIdCoder.of(getSource().getDefaultOutputCoder()); + return ValueWithRecordId.ValueWithRecordIdCoder.of(getCoder()); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index 4baac367f686..fb121497d72a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -35,6 +35,7 @@ import java.util.zip.ZipInputStream; import javax.annotation.concurrent.GuardedBy; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.options.PipelineOptions; @@ -407,7 +408,7 @@ public void populateDisplayData(DisplayData.Builder builder) { * Returns the delegate source's default output coder. */ @Override - public final Coder getDefaultOutputCoder() { + public final Coder getDefaultOutputCoder() throws CannotProvideCoderException { return sourceDelegate.getDefaultOutputCoder(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index a07fca89457d..d260b75ab7ee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -95,7 +96,7 @@ private Bounded(@Nullable String name, BoundedSource source) { } @Override - protected Coder getDefaultOutputCoder() { + protected Coder getDefaultOutputCoder() throws CannotProvideCoderException { return source.getDefaultOutputCoder(); } @@ -103,9 +104,8 @@ protected Coder getDefaultOutputCoder() { public final PCollection expand(PBegin input) { source.validate(); - return PCollection.createPrimitiveOutputInternal(input.getPipeline(), - WindowingStrategy.globalDefault(), IsBounded.BOUNDED) - .setCoder(getDefaultOutputCoder()); + return PCollection.createPrimitiveOutputInternal(input.getPipeline(), + WindowingStrategy.globalDefault(), IsBounded.BOUNDED); } /** @@ -150,7 +150,7 @@ private Unbounded(@Nullable String name, UnboundedSource source) { * records. */ public BoundedReadFromUnboundedSource withMaxNumRecords(long maxNumRecords) { - return new BoundedReadFromUnboundedSource(source, maxNumRecords, null); + return new BoundedReadFromUnboundedSource(source, null, maxNumRecords, null); } /** @@ -159,11 +159,11 @@ public BoundedReadFromUnboundedSource withMaxNumRecords(long maxNumRecords) { * of time to read for. Each split of the source will read for this much time. */ public BoundedReadFromUnboundedSource withMaxReadTime(Duration maxReadTime) { - return new BoundedReadFromUnboundedSource(source, Long.MAX_VALUE, maxReadTime); + return new BoundedReadFromUnboundedSource(source, null, Long.MAX_VALUE, maxReadTime); } @Override - protected Coder getDefaultOutputCoder() { + protected Coder getDefaultOutputCoder() throws CannotProvideCoderException { return source.getDefaultOutputCoder(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java index 542d91ca791e..e94d4e9775d9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.NoSuchElementException; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; @@ -63,8 +64,17 @@ public abstract class Source implements Serializable, HasDisplayData { /** * Returns the default {@code Coder} to use for the data read from this source. + * + *

If this throws {@link CannotProvideCoderException}, the user will need to explicitly set a + * coder on the result of {@link Read#from}. */ - public abstract Coder getDefaultOutputCoder(); + public Coder getDefaultOutputCoder() throws CannotProvideCoderException { + throw new CannotProvideCoderException( + "Source " + + this + + " does not implement getDefaultOutputCoder(), " + + "set the coder explicitly via PCollection.setCoder()"); + } /** * {@inheritDoc} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java index cde0b946e966..d26df9668f9b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java @@ -38,6 +38,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; @@ -212,7 +213,20 @@ public static void assertSourcesEqualReferenceSource( List> sources, PipelineOptions options) throws Exception { - Coder coder = referenceSource.getDefaultOutputCoder(); + assertSourcesEqualReferenceSource( + referenceSource, sources, referenceSource.getDefaultOutputCoder(), options); + } + + /** + * Same as {@link #assertSourcesEqualReferenceSource(BoundedSource, List, Coder, + * PipelineOptions)}, but the coder is specified explicitly. + */ + public static void assertSourcesEqualReferenceSource( + BoundedSource referenceSource, + List> sources, + Coder coder, + PipelineOptions options) + throws Exception { List referenceRecords = readFromSource(referenceSource, options); List bundleRecords = new ArrayList<>(); for (BoundedSource source : sources) { @@ -239,7 +253,17 @@ public static void assertSourcesEqualReferenceSource( */ public static void assertUnstartedReaderReadsSameAsItsSource( BoundedSource.BoundedReader reader, PipelineOptions options) throws Exception { - Coder coder = reader.getCurrentSource().getDefaultOutputCoder(); + assertUnstartedReaderReadsSameAsItsSource( + reader, reader.getCurrentSource().getDefaultOutputCoder(), options); + } + + /** + * Same as {@link #assertUnstartedReaderReadsSameAsItsSource(BoundedReader, Coder, + * PipelineOptions)}, but the coder is specified explicitly. + */ + public static void assertUnstartedReaderReadsSameAsItsSource( + BoundedSource.BoundedReader reader, Coder coder, PipelineOptions options) + throws Exception { List expected = readFromUnstartedReader(reader); List actual = readFromSource(reader.getCurrentSource(), options); List> expectedStructural = createStructuralValues(coder, expected); @@ -296,7 +320,29 @@ public static SplitAtFractionResult assertSplitAtFractionBehavior( PipelineOptions options) throws Exception { return assertSplitAtFractionBehaviorImpl( - source, readFromSource(source, options), numItemsToReadBeforeSplit, splitFraction, + source, + source.getDefaultOutputCoder(), + readFromSource(source, options), + numItemsToReadBeforeSplit, + splitFraction, + expectedOutcome, + options); + } + + /** + * Same as {@link #assertSplitAtFractionBehavior(BoundedSource, Coder, int, double, + * ExpectedSplitOutcome, PipelineOptions)}, but the coder is specified explicitly. + */ + public static SplitAtFractionResult assertSplitAtFractionBehavior( + BoundedSource source, + Coder coder, + int numItemsToReadBeforeSplit, + double splitFraction, + ExpectedSplitOutcome expectedOutcome, + PipelineOptions options) + throws Exception { + return assertSplitAtFractionBehaviorImpl( + source, coder, readFromSource(source, options), numItemsToReadBeforeSplit, splitFraction, expectedOutcome, options); } @@ -333,8 +379,13 @@ private static void assertListsEqualInOrder( } private static SourceTestUtils.SplitAtFractionResult assertSplitAtFractionBehaviorImpl( - BoundedSource source, List expectedItems, int numItemsToReadBeforeSplit, - double splitFraction, ExpectedSplitOutcome expectedOutcome, PipelineOptions options) + BoundedSource source, + Coder coder, + List expectedItems, + int numItemsToReadBeforeSplit, + double splitFraction, + ExpectedSplitOutcome expectedOutcome, + PipelineOptions options) throws Exception { try (BoundedSource.BoundedReader reader = source.createReader(options)) { BoundedSource originalSource = reader.getCurrentSource(); @@ -381,15 +432,28 @@ private static SourceTestUtils.SplitAtFractionResult assertSplitAtFractionBe currentItems.addAll(readRemainingFromReader(reader, numItemsToReadBeforeSplit > 0)); BoundedSource primary = reader.getCurrentSource(); return verifySingleSplitAtFractionResult( - source, expectedItems, currentItems, primary, residual, - numItemsToReadBeforeSplit, splitFraction, options); + source, + coder, + expectedItems, + currentItems, + primary, + residual, + numItemsToReadBeforeSplit, + splitFraction, + options); } } private static SourceTestUtils.SplitAtFractionResult verifySingleSplitAtFractionResult( - BoundedSource source, List expectedItems, List currentItems, - BoundedSource primary, BoundedSource residual, - int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options) + BoundedSource source, + Coder coder, + List expectedItems, + List currentItems, + BoundedSource primary, + BoundedSource residual, + int numItemsToReadBeforeSplit, + double splitFraction, + PipelineOptions options) throws Exception { List primaryItems = readFromSource(primary, options); if (residual != null) { @@ -415,7 +479,6 @@ private static SourceTestUtils.SplitAtFractionResult verifySingleSplitAtFrac source, primary, residual); - Coder coder = primary.getDefaultOutputCoder(); List> primaryValues = createStructuralValues(coder, primaryItems); List> currentValues = @@ -488,6 +551,7 @@ private static class SplitFractionStatistics { */ private static void assertSplitAtFractionBinary( BoundedSource source, + Coder coder, List expectedItems, int numItemsToBeReadBeforeSplit, double leftFraction, @@ -507,16 +571,16 @@ private static void assertSplitAtFractionBinary( double middleFraction = (rightFraction + leftFraction) / 2; if (leftResult == null) { leftResult = assertSplitAtFractionBehaviorImpl( - source, expectedItems, numItemsToBeReadBeforeSplit, leftFraction, + source, coder, expectedItems, numItemsToBeReadBeforeSplit, leftFraction, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); } if (rightResult == null) { rightResult = assertSplitAtFractionBehaviorImpl( - source, expectedItems, numItemsToBeReadBeforeSplit, rightFraction, + source, coder, expectedItems, numItemsToBeReadBeforeSplit, rightFraction, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); } SplitAtFractionResult middleResult = assertSplitAtFractionBehaviorImpl( - source, expectedItems, numItemsToBeReadBeforeSplit, middleFraction, + source, coder, expectedItems, numItemsToBeReadBeforeSplit, middleFraction, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); if (middleResult.numResidualItems != -1) { stats.successfulFractions.add(middleFraction); @@ -530,12 +594,12 @@ private static void assertSplitAtFractionBinary( // if middle is not equivalent to left or right. if (leftResult.numPrimaryItems != middleResult.numPrimaryItems) { assertSplitAtFractionBinary( - source, expectedItems, numItemsToBeReadBeforeSplit, + source, coder, expectedItems, numItemsToBeReadBeforeSplit, leftFraction, leftResult, middleFraction, middleResult, options, stats); } if (rightResult.numPrimaryItems != middleResult.numPrimaryItems) { assertSplitAtFractionBinary( - source, expectedItems, numItemsToBeReadBeforeSplit, + source, coder, expectedItems, numItemsToBeReadBeforeSplit, middleFraction, middleResult, rightFraction, rightResult, options, stats); } } @@ -551,6 +615,15 @@ private static void assertSplitAtFractionBinary( */ public static void assertSplitAtFractionExhaustive( BoundedSource source, PipelineOptions options) throws Exception { + assertSplitAtFractionExhaustive(source, source.getDefaultOutputCoder(), options); + } + + /** + * Same as {@link #assertSplitAtFractionExhaustive(BoundedSource, Coder, PipelineOptions)}, but + * the coder is specified explicitly. + */ + public static void assertSplitAtFractionExhaustive( + BoundedSource source, Coder coder, PipelineOptions options) throws Exception { List expectedItems = readFromSource(source, options); assertFalse("Empty source", expectedItems.isEmpty()); assertFalse("Source reads a single item", expectedItems.size() == 1); @@ -560,7 +633,7 @@ public static void assertSplitAtFractionExhaustive( boolean anyNonTrivialFractions = false; for (int i = 0; i < expectedItems.size(); i++) { SplitFractionStatistics stats = new SplitFractionStatistics(); - assertSplitAtFractionBinary(source, expectedItems, i, + assertSplitAtFractionBinary(source, coder, expectedItems, i, 0.0, null, 1.0, null, options, stats); if (!stats.successfulFractions.isEmpty()) { anySuccessfulFractions = true; @@ -607,7 +680,7 @@ public static void assertSplitAtFractionExhaustive( break; } if (assertSplitAtFractionConcurrent( - executor, source, expectedItems, i, minNonTrivialFraction, options)) { + executor, source, coder, expectedItems, i, minNonTrivialFraction, options)) { haveSuccess = true; } else { haveFailure = true; @@ -634,7 +707,7 @@ public static void assertSplitAtFractionExhaustive( } private static boolean assertSplitAtFractionConcurrent( - ExecutorService executor, BoundedSource source, List expectedItems, + ExecutorService executor, BoundedSource source, Coder coder, List expectedItems, final int numItemsToReadBeforeSplitting, final double fraction, PipelineOptions options) throws Exception { @SuppressWarnings("resource") // Closed in readerThread @@ -674,7 +747,7 @@ public KV, BoundedSource> call() throws Exception { return false; } SplitAtFractionResult res = verifySingleSplitAtFractionResult( - source, expectedItems, currentItems, splitSources.getKey(), splitSources.getValue(), + source, coder, expectedItems, currentItems, splitSources.getKey(), splitSources.getValue(), numItemsToReadBeforeSplitting, fraction, options); return (res.numResidualItems > 0); } @@ -728,7 +801,7 @@ public void validate() { } @Override - public Coder getDefaultOutputCoder() { + public Coder getDefaultOutputCoder() throws CannotProvideCoderException { return boundedSource.getDefaultOutputCoder(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java index 74acf18764be..56c4096a83bc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java @@ -24,18 +24,26 @@ import java.io.IOException; import java.io.Serializable; +import java.util.Arrays; import java.util.List; +import java.util.NoSuchElementException; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -51,6 +59,9 @@ public class ReadTest implements Serializable{ @Rule public transient ExpectedException thrown = ExpectedException.none(); + @Rule + public transient TestPipeline pipeline = TestPipeline.create(); + @Test public void failsWhenCustomBoundedSourceIsNotSerializable() { thrown.expect(IllegalArgumentException.class); @@ -150,6 +161,130 @@ public void populateDisplayData(DisplayData.Builder builder) { assertThat(unboundedDisplayData, hasItem(includesDisplayDataFor("source", unboundedSource))); } + @Test + @Category(ValidatesRunner.class) + public void testReadBoundedSourceWithoutCoder() { + PCollection res = + pipeline.apply(Read.from(new BoundedSourceWithoutCoder())).setCoder(VarLongCoder.of()); + PAssert.that(res).containsInAnyOrder(0L, 1L, 2L, 3L, 4L); + pipeline.run(); + } + + @Test + @Category(ValidatesRunner.class) + public void testReadBoundedFromUnboundedSourceWithoutCoder() { + PCollection res = + pipeline.apply( + Read.from(new UnboundedSourceWithoutCoder()) + .withMaxNumRecords(1) + .withCoder(VarLongCoder.of())); + PAssert.that(res).containsInAnyOrder(42L); + pipeline.run(); + } + + @Test + @Category(ValidatesRunner.class) + public void testReadFromUnboundedSourceWithoutCoder() { + PCollection res = + pipeline.apply(Read.from(new UnboundedSourceWithoutCoder())).setCoder(VarLongCoder.of()); + PAssert.that(res).containsInAnyOrder(42L); + pipeline.run(); + } + + private static class BoundedSourceWithoutCoder extends BoundedSource { + @Override + public List split( + long desiredBundleSizeBytes, PipelineOptions options) { + return Arrays.asList(this); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return 0; + } + + @Override + public BoundedReader createReader(PipelineOptions options) throws IOException { + return CountingSource.createSourceForSubrange(0, 5).createReader(options); + } + + @Override + public void validate() {} + + // Does not implement getDefaultOutputCoder(). + } + + private static class UnboundedSourceWithoutCoder extends UnboundedSource { + @Override + public List split(int desiredNumSplits, PipelineOptions options) + throws Exception { + return Arrays.asList(this); + } + + @Override + public UnboundedReader createReader( + PipelineOptions options, @Nullable CheckpointMark checkpointMark) throws IOException { + return new Reader(this); + } + + @Override + public Coder getCheckpointMarkCoder() { + return null; + } + + @Override + public void validate() {} + + // Does not implement getDefaultOutputCoder(). + + private static class Reader extends UnboundedReader { + private final UnboundedSourceWithoutCoder source; + + private Reader(UnboundedSourceWithoutCoder source) { + this.source = source; + } + + @Override + public Long getCurrent() throws NoSuchElementException { + return 42L; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return Instant.now(); + } + + @Override + public void close() throws IOException {} + + @Override + public boolean start() throws IOException { + return true; + } + + @Override + public boolean advance() throws IOException { + return false; + } + + @Override + public Instant getWatermark() { + // Terminate the pipeline immediately. + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + @Override + public CheckpointMark getCheckpointMark() { + return new NoOpCheckpointMark(); + } + + @Override + public UnboundedSource getCurrentSource() { + return source; + } + } + } + private abstract static class CustomBoundedSource extends BoundedSource { @Override public List> split(