From f09bdfc7e79b390aeea0325cc4aecbc0fa292daf Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Tue, 7 Feb 2017 22:56:17 +0200 Subject: [PATCH 1/4] [BEAM-1433] Remove coder from TextIO --- examples/java/pom.xml | 5 + .../complete/TopWikipediaSessions.java | 31 +- .../FlinkStreamingTransformTranslators.java | 18 +- .../translation/TransformTranslator.java | 12 +- .../java/org/apache/beam/sdk/io/TextIO.java | 357 +++++++----------- .../org/apache/beam/sdk/io/TextIOTest.java | 160 +++----- 6 files changed, 232 insertions(+), 351 deletions(-) diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 9da814b1388d..8c4e73e39a3b 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -493,6 +493,11 @@ true + + com.fasterxml.jackson.core + jackson-databind + + diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java index 7eb80b769190..c3c2646451d1 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java @@ -17,10 +17,12 @@ */ package org.apache.beam.examples.complete; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; import java.util.List; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; @@ -29,9 +31,11 @@ import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableComparator; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.CalendarWindows; @@ -40,6 +44,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Duration; import org.joda.time.Instant; @@ -136,6 +141,20 @@ public void processElement(ProcessContext c, BoundedWindow window) { } } + static class ParseTableRowJson implements SerializableFunction { + private final ObjectMapper mapper = + new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); + + @Override + public TableRow apply(String input) { + try { + return mapper.readValue(input, TableRow.class); + } catch (IOException e) { + throw new RuntimeException("Failed parsing table row json", e); + } + } + } + static class ComputeTopSessions extends PTransform, PCollection> { private final double samplingThreshold; @@ -193,11 +212,11 @@ public static void main(String[] args) { double samplingThreshold = 0.1; - p.apply(TextIO.Read - .from(options.getInput()) - .withCoder(TableRowJsonCoder.of())) - .apply(new ComputeTopSessions(samplingThreshold)) - .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput())); + p.apply(TextIO.Read.from(options.getInput())) + .apply(MapElements.via(new ParseTableRowJson()) + .withOutputType(TypeDescriptor.of(TableRow.class))) + .apply(new ComputeTopSessions(samplingThreshold)) + .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput())); p.run().waitUntilFinish(); } diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 1195c82ffaf9..b9b50595d53c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -144,19 +144,18 @@ public static FlinkStreamingPipelineTranslator.StreamTransformTranslator getT // Transformation Implementations // -------------------------------------------------------------------------------------------- - private static class TextIOWriteBoundStreamingTranslator - extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - TextIO.Write.Bound> { + private static class TextIOWriteBoundStreamingTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator { private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class); @Override public void translateNode( - TextIO.Write.Bound transform, + TextIO.Write.Bound transform, FlinkStreamingTranslationContext context) { PValue input = context.getInput(transform); - DataStream> inputDataStream = context.getInputDataStream(input); + DataStream> inputDataStream = context.getInputDataStream(input); String filenamePrefix = transform.getFilenamePrefix(); String filenameSuffix = transform.getFilenameSuffix(); @@ -176,10 +175,13 @@ public void translateNode( shardNameTemplate); DataStream dataSink = inputDataStream - .flatMap(new FlatMapFunction, String>() { + .flatMap(new FlatMapFunction, String>() { @Override - public void flatMap(WindowedValue value, Collector out) throws Exception { - out.collect(value.getValue().toString()); + public void flatMap( + WindowedValue value, + Collector out) + throws Exception { + out.collect(value.getValue()); } }); DataStreamSink output = diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index fa5ae95bec9a..f0e339a74eca 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -293,10 +293,10 @@ public void evaluate(ParDo.BoundMulti transform, EvaluationCont } - private static TransformEvaluator> readText() { - return new TransformEvaluator>() { + private static TransformEvaluator readText() { + return new TransformEvaluator() { @Override - public void evaluate(TextIO.Read.Bound transform, EvaluationContext context) { + public void evaluate(TextIO.Read.Bound transform, EvaluationContext context) { String pattern = transform.getFilepattern(); JavaRDD> rdd = context.getSparkContext().textFile(pattern) .map(WindowingHelpers.windowFunction()); @@ -305,10 +305,10 @@ public void evaluate(TextIO.Read.Bound transform, EvaluationContext context) }; } - private static TransformEvaluator> writeText() { - return new TransformEvaluator>() { + private static TransformEvaluator writeText() { + return new TransformEvaluator() { @Override - public void evaluate(TextIO.Write.Bound transform, EvaluationContext context) { + public void evaluate(TextIO.Write.Bound transform, EvaluationContext context) { @SuppressWarnings("unchecked") JavaPairRDD last = ((BoundedDataset) context.borrowDataset(transform)).getRDD() diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 16b871e479f8..d78a73b0091a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -65,12 +65,10 @@ * filename or filename pattern of the form * {@code "gs:///"}). * - *

By default, {@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, - * each corresponding to one line of an input UTF-8 text file. To convert directly from the raw - * bytes (split into lines delimited by '\n', '\r', or '\r\n') to another object of type {@code T}, - * supply a {@code Coder} using {@link TextIO.Read#withCoder(Coder)}. + *

{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, + * each corresponding to one line of an input UTF-8 text file. * - *

See the following examples: + *

Example: * *

{@code
  * Pipeline p = ...;
@@ -78,12 +76,6 @@
  * // A simple Read of a local file (only runs locally):
  * PCollection lines =
  *     p.apply(TextIO.Read.from("/local/path/to/file.txt"));
- *
- * // A fully-specified Read from a GCS file:
- * PCollection numbers =
- *     p.apply("ReadNumbers", TextIO.Read
- *         .from("gs://my_bucket/path/to/numbers-*.txt")
- *         .withCoder(TextualIntegerCoder.of()));
  * }
* *

To write a {@link PCollection} to one or more text files, use @@ -91,8 +83,7 @@ * the path of the file to write to (e.g., a local filename or sharded * filename pattern if running locally, or a Google Cloud Storage * filename or sharded filename pattern of the form - * {@code "gs:///"}). You can use {@link TextIO.Write#withCoder(Coder)} - * to specify the {@link Coder} to use to encode the Java values into text lines. + * {@code "gs:///"}). * *

Any existing files with the same names as generated output files * will be overwritten. @@ -103,19 +94,10 @@ * PCollection lines = ...; * lines.apply(TextIO.Write.to("/path/to/file.txt")); * - * // A fully-specified Write to a sharded GCS file: - * PCollection numbers = ...; - * numbers.apply("WriteNumbers", TextIO.Write - * .to("gs://my_bucket/path/to/numbers") - * .withSuffix(".txt") - * .withCoder(TextualIntegerCoder.of())); - * * // Same as above, only with Gzip compression: - * PCollection numbers = ...; - * numbers.apply("WriteNumbers", TextIO.Write - * .to("gs://my_bucket/path/to/numbers") + * PCollection lines = ...; + * lines.apply(TextIO.Write.to("/path/to/file.txt")); * .withSuffix(".txt") - * .withCoder(TextualIntegerCoder.of()) * .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)); * } */ @@ -126,9 +108,7 @@ public class TextIO { /** * A {@link PTransform} that reads from a text file (or multiple text * files matching a pattern) and returns a {@link PCollection} containing - * the decoding of each of the lines of the text file(s). The - * default decoding just returns each line as a {@link String}, but you may call - * {@link #withCoder(Coder)} to change the return type. + * the decoding of each of the lines of the text file(s) as a {@link String}. */ public static class Read { @@ -140,30 +120,15 @@ public static class Read { * service). Standard Java Filesystem glob patterns ("*", "?", "[..]") are supported. */ - public static Bound from(String filepattern) { - return new Bound<>(DEFAULT_TEXT_CODER).from(filepattern); + public static Bound from(String filepattern) { + return new Bound().from(filepattern); } /** * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */ - public static Bound from(ValueProvider filepattern) { - return new Bound<>(DEFAULT_TEXT_CODER).from(filepattern); - } - - /** - * Returns a transform for reading text files that uses the given - * {@code Coder} to decode each of the lines of the file into a - * value of type {@code T}. - * - *

By default, uses {@link StringUtf8Coder}, which just - * returns the text lines as Java strings. - * - * @param the type of the decoded elements, and the elements - * of the resulting PCollection - */ - public static Bound withCoder(Coder coder) { - return new Bound<>(coder); + public static Bound from(ValueProvider filepattern) { + return new Bound().from(filepattern); } /** @@ -174,8 +139,8 @@ public static Bound withCoder(Coder coder) { * exist at the pipeline creation time, but is expected to be * available at execution time. */ - public static Bound withoutValidation() { - return new Bound<>(DEFAULT_TEXT_CODER).withoutValidation(); + public static Bound withoutValidation() { + return new Bound().withoutValidation(); } /** @@ -187,8 +152,8 @@ public static Bound withoutValidation() { * (e.g., {@code *.gz} is gzipped, {@code *.bz2} is bzipped, and all other extensions are * uncompressed). */ - public static Bound withCompressionType(TextIO.CompressionType compressionType) { - return new Bound<>(DEFAULT_TEXT_CODER).withCompressionType(compressionType); + public static Bound withCompressionType(TextIO.CompressionType compressionType) { + return new Bound().withCompressionType(compressionType); } // TODO: strippingNewlines, etc. @@ -196,33 +161,27 @@ public static Bound withCompressionType(TextIO.CompressionType compressi /** * A {@link PTransform} that reads from one or more text files and returns a bounded * {@link PCollection} containing one element for each line of the input files. - * - * @param the type of each of the elements of the resulting - * {@link PCollection}. By default, each line is returned as a {@link String}, however you - * may use {@link #withCoder(Coder)} to supply a {@code Coder} to produce a - * {@code PCollection} instead. */ - public static class Bound extends PTransform> { + public static class Bound extends PTransform> { /** The filepattern to read from. */ @Nullable private final ValueProvider filepattern; - /** The Coder to use to decode each line. */ - private final Coder coder; - /** An option to indicate if input validation is desired. Default is true. */ private final boolean validate; /** Option to indicate the input source's compression type. Default is AUTO. */ private final TextIO.CompressionType compressionType; - Bound(Coder coder) { - this(null, null, coder, true, TextIO.CompressionType.AUTO); + private Bound() { + this(null, null, true, TextIO.CompressionType.AUTO); } - private Bound(@Nullable String name, @Nullable ValueProvider filepattern, - Coder coder, boolean validate, TextIO.CompressionType compressionType) { + private Bound( + @Nullable String name, + @Nullable ValueProvider filepattern, + boolean validate, + TextIO.CompressionType compressionType) { super(name); - this.coder = coder; this.filepattern = filepattern; this.validate = validate; this.compressionType = compressionType; @@ -236,32 +195,18 @@ private Bound(@Nullable String name, @Nullable ValueProvider filepattern *

Does not modify this object. */ - public Bound from(String filepattern) { + public Bound from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); - return new Bound<>(name, StaticValueProvider.of(filepattern), coder, validate, + return new Bound(name, StaticValueProvider.of(filepattern), validate, compressionType); } /** * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */ - public Bound from(ValueProvider filepattern) { + public Bound from(ValueProvider filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); - return new Bound<>(name, filepattern, coder, validate, compressionType); - } - - /** - * Returns a new transform for reading from text files that's like this one but - * that uses the given {@link Coder Coder<X>} to decode each of the - * lines of the file into a value of type {@code X}. - * - *

Does not modify this object. - * - * @param the type of the decoded elements, and the - * elements of the resulting PCollection - */ - public Bound withCoder(Coder coder) { - return new Bound<>(name, filepattern, coder, validate, compressionType); + return new Bound(name, filepattern, validate, compressionType); } /** @@ -274,8 +219,8 @@ public Bound withCoder(Coder coder) { * *

Does not modify this object. */ - public Bound withoutValidation() { - return new Bound<>(name, filepattern, coder, false, compressionType); + public Bound withoutValidation() { + return new Bound(name, filepattern, false, compressionType); } /** @@ -287,12 +232,12 @@ public Bound withoutValidation() { * *

Does not modify this object. */ - public Bound withCompressionType(TextIO.CompressionType compressionType) { - return new Bound<>(name, filepattern, coder, validate, compressionType); + public Bound withCompressionType(TextIO.CompressionType compressionType) { + return new Bound(name, filepattern, validate, compressionType); } @Override - public PCollection expand(PBegin input) { + public PCollection expand(PBegin input) { if (filepattern == null) { throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform"); } @@ -310,31 +255,31 @@ public PCollection expand(PBegin input) { } } - final Bounded read = org.apache.beam.sdk.io.Read.from(getSource()); - PCollection pcol = input.getPipeline().apply("Read", read); + final Bounded read = org.apache.beam.sdk.io.Read.from(getSource()); + PCollection pcol = input.getPipeline().apply("Read", read); // Honor the default output coder that would have been used by this PTransform. pcol.setCoder(getDefaultOutputCoder()); return pcol; } // Helper to create a source specific to the requested compression type. - protected FileBasedSource getSource() { + protected FileBasedSource getSource() { switch (compressionType) { case UNCOMPRESSED: - return new TextSource(filepattern, coder); + return new TextSource(filepattern); case AUTO: - return CompressedSource.from(new TextSource(filepattern, coder)); + return CompressedSource.from(new TextSource(filepattern)); case BZIP2: return - CompressedSource.from(new TextSource(filepattern, coder)) + CompressedSource.from(new TextSource(filepattern)) .withDecompression(CompressedSource.CompressionMode.BZIP2); case GZIP: return - CompressedSource.from(new TextSource(filepattern, coder)) + CompressedSource.from(new TextSource(filepattern)) .withDecompression(CompressedSource.CompressionMode.GZIP); case ZIP: return - CompressedSource.from(new TextSource(filepattern, coder)) + CompressedSource.from(new TextSource(filepattern)) .withDecompression(CompressedSource.CompressionMode.ZIP); default: throw new IllegalArgumentException("Unknown compression type: " + compressionType); @@ -357,8 +302,8 @@ public void populateDisplayData(DisplayData.Builder builder) { } @Override - protected Coder getDefaultOutputCoder() { - return coder; + protected Coder getDefaultOutputCoder() { + return DEFAULT_TEXT_CODER; } public String getFilepattern() { @@ -399,23 +344,23 @@ public static class Write { * a shard identifier (see {@link Bound#withNumShards(int)}, and end * in a common extension, if given by {@link Bound#withSuffix(String)}. */ - public static Bound to(String prefix) { - return new Bound<>(DEFAULT_TEXT_CODER).to(prefix); + public static Bound to(String prefix) { + return new Bound().to(prefix); } /** * Like {@link #to(String)}, but with a {@link ValueProvider}. */ - public static Bound to(ValueProvider prefix) { - return new Bound<>(DEFAULT_TEXT_CODER).to(prefix); + public static Bound to(ValueProvider prefix) { + return new Bound().to(prefix); } /** * Returns a transform for writing to text files that appends the specified suffix * to the created files. */ - public static Bound withSuffix(String nameExtension) { - return new Bound<>(DEFAULT_TEXT_CODER).withSuffix(nameExtension); + public static Bound withSuffix(String nameExtension) { + return new Bound().withSuffix(nameExtension); } /** @@ -428,8 +373,8 @@ public static Bound withSuffix(String nameExtension) { * @param numShards the number of shards to use, or 0 to let the system * decide. */ - public static Bound withNumShards(int numShards) { - return new Bound<>(DEFAULT_TEXT_CODER).withNumShards(numShards); + public static Bound withNumShards(int numShards) { + return new Bound().withNumShards(numShards); } /** @@ -438,30 +383,16 @@ public static Bound withNumShards(int numShards) { * *

See {@link ShardNameTemplate} for a description of shard templates. */ - public static Bound withShardNameTemplate(String shardTemplate) { - return new Bound<>(DEFAULT_TEXT_CODER).withShardNameTemplate(shardTemplate); + public static Bound withShardNameTemplate(String shardTemplate) { + return new Bound().withShardNameTemplate(shardTemplate); } /** * Returns a transform for writing to text files that forces a single file as * output. */ - public static Bound withoutSharding() { - return new Bound<>(DEFAULT_TEXT_CODER).withoutSharding(); - } - - /** - * Returns a transform for writing to text files that uses the given - * {@link Coder} to encode each of the elements of the input - * {@link PCollection} into an output text line. - * - *

By default, uses {@link StringUtf8Coder}, which writes input - * Java strings directly as output lines. - * - * @param the type of the elements of the input {@link PCollection} - */ - public static Bound withCoder(Coder coder) { - return new Bound<>(coder); + public static Bound withoutSharding() { + return new Bound().withoutSharding(); } /** @@ -472,8 +403,8 @@ public static Bound withCoder(Coder coder) { * not exist at the pipeline creation time, but is expected to be available * at execution time. */ - public static Bound withoutValidation() { - return new Bound<>(DEFAULT_TEXT_CODER).withoutValidation(); + public static Bound withoutValidation() { + return new Bound().withoutValidation(); } /** @@ -484,8 +415,8 @@ public static Bound withoutValidation() { * * @param header the string to be added as file header */ - public static Bound withHeader(@Nullable String header) { - return new Bound<>(DEFAULT_TEXT_CODER).withHeader(header); + public static Bound withHeader(@Nullable String header) { + return new Bound().withHeader(header); } /** @@ -496,8 +427,8 @@ public static Bound withHeader(@Nullable String header) { * * @param footer the string to be added as file footer */ - public static Bound withFooter(@Nullable String footer) { - return new Bound<>(DEFAULT_TEXT_CODER).withFooter(footer); + public static Bound withFooter(@Nullable String footer) { + return new Bound().withFooter(footer); } /** @@ -509,10 +440,9 @@ public static Bound withFooter(@Nullable String footer) { * * @param writableByteChannelFactory the factory to be used during output */ - public static Bound withWritableByteChannelFactory( + public static Bound withWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory) { - return new Bound<>(DEFAULT_TEXT_CODER) - .withWritableByteChannelFactory(writableByteChannelFactory); + return new Bound().withWritableByteChannelFactory(writableByteChannelFactory); } // TODO: appendingNewlines, etc. @@ -521,10 +451,8 @@ public static Bound withWritableByteChannelFactory( * A PTransform that writes a bounded PCollection to a text file (or * multiple text files matching a sharding pattern), with each * PCollection element being encoded into its own line. - * - * @param the type of the elements of the input PCollection */ - public static class Bound extends PTransform, PDone> { + public static class Bound extends PTransform, PDone> { private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; /** The prefix of each file written, combined with suffix and shardTemplate. */ @@ -538,9 +466,6 @@ public static class Bound extends PTransform, PDone> { /** An optional footer to add to each file. */ @Nullable private final String footer; - /** The Coder to use to decode each line. */ - private final Coder coder; - /** Requested number of shards. 0 for automatic. */ private final int numShards; @@ -556,19 +481,18 @@ public static class Bound extends PTransform, PDone> { */ private final WritableByteChannelFactory writableByteChannelFactory; - Bound(Coder coder) { - this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true, + private Bound() { + this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE, true, FileBasedSink.CompressionType.UNCOMPRESSED); } private Bound(String name, ValueProvider filenamePrefix, String filenameSuffix, - @Nullable String header, @Nullable String footer, Coder coder, int numShards, + @Nullable String header, @Nullable String footer, int numShards, String shardTemplate, boolean validate, WritableByteChannelFactory writableByteChannelFactory) { super(name); this.header = header; this.footer = footer; - this.coder = coder; this.filenamePrefix = filenamePrefix; this.filenameSuffix = filenameSuffix; this.numShards = numShards; @@ -586,18 +510,18 @@ private Bound(String name, ValueProvider filenamePrefix, String filename * *

Does not modify this object. */ - public Bound to(String filenamePrefix) { + public Bound to(String filenamePrefix) { validateOutputComponent(filenamePrefix); - return new Bound<>(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, - header, footer, coder, numShards, shardTemplate, validate, + return new Bound(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, + header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } /** * Like {@link #to(String)}, but with a {@link ValueProvider}. */ - public Bound to(ValueProvider filenamePrefix) { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + public Bound to(ValueProvider filenamePrefix) { + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } @@ -609,9 +533,9 @@ public Bound to(ValueProvider filenamePrefix) { * * @see ShardNameTemplate */ - public Bound withSuffix(String nameExtension) { + public Bound withSuffix(String nameExtension) { validateOutputComponent(nameExtension); - return new Bound<>(name, filenamePrefix, nameExtension, header, footer, coder, numShards, + return new Bound(name, filenamePrefix, nameExtension, header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } @@ -629,9 +553,9 @@ public Bound withSuffix(String nameExtension) { * decide. * @see ShardNameTemplate */ - public Bound withNumShards(int numShards) { + public Bound withNumShards(int numShards) { checkArgument(numShards >= 0); - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } @@ -643,8 +567,8 @@ public Bound withNumShards(int numShards) { * * @see ShardNameTemplate */ - public Bound withShardNameTemplate(String shardTemplate) { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + public Bound withShardNameTemplate(String shardTemplate) { + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } @@ -661,24 +585,11 @@ public Bound withShardNameTemplate(String shardTemplate) { * *

Does not modify this object. */ - public Bound withoutSharding() { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, 1, "", + public Bound withoutSharding() { + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 1, "", validate, writableByteChannelFactory); } - /** - * Returns a transform for writing to text files that's like this one - * but that uses the given {@link Coder Coder<X>} to encode each of - * the elements of the input {@link PCollection PCollection<X>} into an - * output text line. Does not modify this object. - * - * @param the type of the elements of the input {@link PCollection} - */ - public Bound withCoder(Coder coder) { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, - shardTemplate, validate, writableByteChannelFactory); - } - /** * Returns a transform for writing to text files that's like this one but * that has GCS output path validation on pipeline creation disabled. @@ -689,8 +600,8 @@ public Bound withCoder(Coder coder) { * *

Does not modify this object. */ - public Bound withoutValidation() { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + public Bound withoutValidation() { + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, shardTemplate, false, writableByteChannelFactory); } @@ -704,8 +615,8 @@ public Bound withoutValidation() { * * @param header the string to be added as file header */ - public Bound withHeader(@Nullable String header) { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + public Bound withHeader(@Nullable String header) { + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } @@ -719,8 +630,8 @@ public Bound withHeader(@Nullable String header) { * * @param footer the string to be added as file footer */ - public Bound withFooter(@Nullable String footer) { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + public Bound withFooter(@Nullable String footer) { + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } @@ -735,22 +646,22 @@ public Bound withFooter(@Nullable String footer) { * * @param writableByteChannelFactory the factory to be used during output */ - public Bound withWritableByteChannelFactory( + public Bound withWritableByteChannelFactory( WritableByteChannelFactory writableByteChannelFactory) { - return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards, + return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards, shardTemplate, validate, writableByteChannelFactory); } @Override - public PDone expand(PCollection input) { + public PDone expand(PCollection input) { if (filenamePrefix == null) { throw new IllegalStateException( "need to set the filename prefix of a TextIO.Write transform"); } - org.apache.beam.sdk.io.Write.Bound write = + org.apache.beam.sdk.io.Write.Bound write = org.apache.beam.sdk.io.Write.to( - new TextSink<>(filenamePrefix, filenameSuffix, header, footer, shardTemplate, - coder, writableByteChannelFactory)); + new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate, + writableByteChannelFactory)); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -812,10 +723,6 @@ public String getFilenameSuffix() { return filenameSuffix; } - public Coder getCoder() { - return coder; - } - @Nullable public String getHeader() { return header; @@ -901,40 +808,38 @@ private TextIO() {} * representing the beginning of the first record to be decoded. */ @VisibleForTesting - static class TextSource extends FileBasedSource { + static class TextSource extends FileBasedSource { /** The Coder to use to decode each line. */ - private final Coder coder; - @VisibleForTesting - TextSource(String fileSpec, Coder coder) { + TextSource(String fileSpec) { super(fileSpec, 1L); - this.coder = coder; } @VisibleForTesting - TextSource(ValueProvider fileSpec, Coder coder) { + TextSource(ValueProvider fileSpec) { super(fileSpec, 1L); - this.coder = coder; } - private TextSource(String fileName, long start, long end, Coder coder) { + private TextSource(String fileName, long start, long end) { super(fileName, 1L, start, end); - this.coder = coder; } @Override - protected FileBasedSource createForSubrangeOfFile(String fileName, long start, long end) { - return new TextSource<>(fileName, start, end, coder); + protected FileBasedSource createForSubrangeOfFile( + String fileName, + long start, + long end) { + return new TextSource(fileName, start, end); } @Override - protected FileBasedReader createSingleFileReader(PipelineOptions options) { - return new TextBasedReader<>(this); + protected FileBasedReader createSingleFileReader(PipelineOptions options) { + return new TextBasedReader(this); } @Override - public Coder getDefaultOutputCoder() { - return coder; + public Coder getDefaultOutputCoder() { + return DEFAULT_TEXT_CODER; } /** @@ -944,9 +849,8 @@ public Coder getDefaultOutputCoder() { *

See {@link TextSource} for further details. */ @VisibleForTesting - static class TextBasedReader extends FileBasedReader { + static class TextBasedReader extends FileBasedReader { private static final int READ_BUFFER_SIZE = 8192; - private final Coder coder; private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE); private ByteString buffer; private int startOfSeparatorInBuffer; @@ -955,12 +859,11 @@ static class TextBasedReader extends FileBasedReader { private volatile long startOfNextRecord; private volatile boolean eof; private volatile boolean elementIsPresent; - private T currentValue; + private String currentValue; private ReadableByteChannel inChannel; - private TextBasedReader(TextSource source) { + private TextBasedReader(TextSource source) { super(source); - coder = source.coder; buffer = ByteString.EMPTY; } @@ -981,7 +884,7 @@ public long getSplitPointsRemaining() { } @Override - public T getCurrent() throws NoSuchElementException { + public String getCurrent() throws NoSuchElementException { if (!elementIsPresent) { throw new NoSuchElementException(); } @@ -1078,7 +981,7 @@ protected boolean readNextRecord() throws IOException { */ private void decodeCurrentElement() throws IOException { ByteString dataToDecode = buffer.substring(0, startOfSeparatorInBuffer); - currentValue = coder.decode(dataToDecode.newInput(), Context.OUTER); + currentValue = DEFAULT_TEXT_CODER.decode(dataToDecode.newInput(), Context.OUTER); elementIsPresent = true; buffer = buffer.substring(endOfSeparatorInBuffer); } @@ -1107,48 +1010,46 @@ private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws IOExcep * Each record (including the last) is terminated. */ @VisibleForTesting - static class TextSink extends FileBasedSink { - private final Coder coder; + static class TextSink extends FileBasedSink { @Nullable private final String header; @Nullable private final String footer; @VisibleForTesting TextSink( - ValueProvider baseOutputFilename, String extension, - @Nullable String header, @Nullable String footer, - String fileNameTemplate, Coder coder, + ValueProvider baseOutputFilename, + String extension, + @Nullable String header, + @Nullable String footer, + String fileNameTemplate, WritableByteChannelFactory writableByteChannelFactory) { super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory); - this.coder = coder; this.header = header; this.footer = footer; } @Override - public FileBasedSink.FileBasedWriteOperation createWriteOperation(PipelineOptions options) { - return new TextWriteOperation<>(this, coder, header, footer); + public FileBasedSink.FileBasedWriteOperation createWriteOperation( + PipelineOptions options) { + return new TextWriteOperation(this, header, footer); } /** * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation * FileBasedWriteOperation} for text files. */ - private static class TextWriteOperation extends FileBasedWriteOperation { - private final Coder coder; + private static class TextWriteOperation extends FileBasedWriteOperation { @Nullable private final String header; @Nullable private final String footer; - private TextWriteOperation(TextSink sink, Coder coder, - @Nullable String header, @Nullable String footer) { + private TextWriteOperation(TextSink sink, @Nullable String header, @Nullable String footer) { super(sink); - this.coder = coder; this.header = header; this.footer = footer; } @Override - public FileBasedWriter createWriter(PipelineOptions options) throws Exception { - return new TextWriter<>(this, coder, header, footer); + public FileBasedWriter createWriter(PipelineOptions options) throws Exception { + return new TextWriter(this, header, footer); } } @@ -1156,20 +1057,20 @@ public FileBasedWriter createWriter(PipelineOptions options) throws Exception * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter} * for text files. */ - private static class TextWriter extends FileBasedWriter { + private static class TextWriter extends FileBasedWriter { private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); - private final Coder coder; @Nullable private final String header; @Nullable private final String footer; private OutputStream out; - public TextWriter(FileBasedWriteOperation writeOperation, Coder coder, - @Nullable String header, @Nullable String footer) { + public TextWriter( + FileBasedWriteOperation writeOperation, + @Nullable String header, + @Nullable String footer) { super(writeOperation); this.header = header; this.footer = footer; this.mimeType = MimeTypes.TEXT; - this.coder = coder; } /** @@ -1198,8 +1099,8 @@ protected void writeFooter() throws Exception { } @Override - public void write(T value) throws Exception { - coder.encode(value, out, Context.OUTER); + public void write(String value) throws Exception { + DEFAULT_TEXT_CODER.encode(value, out, Context.OUTER); out.write(NEWLINE); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index b8b28eb3b4b1..b3f4d85ae7c4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -17,10 +17,8 @@ */ package org.apache.beam.sdk.io; -import static org.apache.beam.sdk.TestUtils.INTS_ARRAY; import static org.apache.beam.sdk.TestUtils.LINES2_ARRAY; import static org.apache.beam.sdk.TestUtils.LINES_ARRAY; -import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; import static org.apache.beam.sdk.io.TextIO.CompressionType.AUTO; import static org.apache.beam.sdk.io.TextIO.CompressionType.BZIP2; @@ -79,8 +77,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.TextualIntegerCoder; -import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; import org.apache.beam.sdk.io.TextIO.CompressionType; @@ -215,28 +211,21 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOEx }); } - private void runTestRead(T[] expected, Coder coder) throws Exception { + private void runTestRead(String[] expected) throws Exception { File tmpFile = Files.createTempFile(tempFolder, "file", "txt").toFile(); String filename = tmpFile.getPath(); try (PrintStream writer = new PrintStream(new FileOutputStream(tmpFile))) { - for (T elem : expected) { - byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem); + for (String elem : expected) { + byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem); String line = new String(encodedElem); writer.println(line); } } - TextIO.Read.Bound read; - if (coder.equals(StringUtf8Coder.of())) { - TextIO.Read.Bound readStrings = TextIO.Read.from(filename); - // T==String - read = (TextIO.Read.Bound) readStrings; - } else { - read = TextIO.Read.from(filename).withCoder(coder); - } + TextIO.Read.Bound read = TextIO.Read.from(filename); - PCollection output = p.apply(read); + PCollection output = p.apply(read); PAssert.that(output).containsInAnyOrder(expected); p.run(); @@ -245,31 +234,13 @@ private void runTestRead(T[] expected, Coder coder) throws Exception { @Test @Category(NeedsRunner.class) public void testReadStrings() throws Exception { - runTestRead(LINES_ARRAY, StringUtf8Coder.of()); + runTestRead(LINES_ARRAY); } @Test @Category(NeedsRunner.class) public void testReadEmptyStrings() throws Exception { - runTestRead(NO_LINES_ARRAY, StringUtf8Coder.of()); - } - - @Test - @Category(NeedsRunner.class) - public void testReadInts() throws Exception { - runTestRead(INTS_ARRAY, TextualIntegerCoder.of()); - } - - @Test - @Category(NeedsRunner.class) - public void testReadEmptyInts() throws Exception { - runTestRead(NO_INTS_ARRAY, TextualIntegerCoder.of()); - } - - @Test - @Category(NeedsRunner.class) - public void testReadNulls() throws Exception { - runTestRead(new Void[] {null, null, null}, VoidCoder.of()); + runTestRead(NO_LINES_ARRAY); } @Test @@ -286,7 +257,7 @@ public void testReadNamed() throws Exception { @Test public void testReadDisplayData() { - TextIO.Read.Bound read = TextIO.Read + TextIO.Read.Bound read = TextIO.Read .from("foo.*") .withCompressionType(BZIP2) .withoutValidation(); @@ -303,7 +274,7 @@ public void testReadDisplayData() { public void testPrimitiveReadDisplayData() { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - TextIO.Read.Bound read = TextIO.Read + TextIO.Read.Bound read = TextIO.Read .from("foobar") .withoutValidation(); @@ -312,36 +283,32 @@ public void testPrimitiveReadDisplayData() { displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); } - private void runTestWrite(T[] elems, Coder coder) throws Exception { - runTestWrite(elems, null, null, coder, 1); + private void runTestWrite(String[] elems) throws Exception { + runTestWrite(elems, null, null, 1); } - private void runTestWrite(T[] elems, Coder coder, int numShards) throws Exception { - runTestWrite(elems, null, null, coder, numShards); + private void runTestWrite(String[] elems, int numShards) throws Exception { + runTestWrite(elems, null, null, numShards); } - private void runTestWrite(T[] elems, Coder coder, String header, String footer) + private void runTestWrite(String[] elems, String header, String footer) throws Exception { - runTestWrite(elems, header, footer, coder, 1); + runTestWrite(elems, header, footer, 1); } - private void runTestWrite( - T[] elems, String header, String footer, Coder coder, int numShards) throws Exception { + private void runTestWrite( + String[] elems, String header, String footer, int numShards) throws Exception { String outputName = "file.txt"; Path baseDir = Files.createTempDirectory(tempFolder, "testwrite"); String baseFilename = baseDir.resolve(outputName).toString(); - PCollection input = p.apply(Create.of(Arrays.asList(elems)).withCoder(coder)); + PCollection input = + p.apply(Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of())); - TextIO.Write.Bound write; - if (coder.equals(StringUtf8Coder.of())) { - TextIO.Write.Bound writeStrings = TextIO.Write.to(baseFilename); - // T==String - write = (TextIO.Write.Bound) writeStrings; - } else { - write = TextIO.Write.to(baseFilename).withCoder(coder); - } - write = write.withHeader(header).withFooter(footer); + TextIO.Write.Bound write = + TextIO.Write.to(baseFilename) + .withHeader(header) + .withFooter(footer); if (numShards == 1) { write = write.withoutSharding(); @@ -353,15 +320,14 @@ private void runTestWrite( p.run(); - assertOutputFiles(elems, header, footer, coder, numShards, baseDir, outputName, + assertOutputFiles(elems, header, footer, numShards, baseDir, outputName, write.getShardNameTemplate()); } - public static void assertOutputFiles( - T[] elems, + public static void assertOutputFiles( + String[] elems, final String header, final String footer, - Coder coder, int numShards, Path rootLocation, String outputName, @@ -400,8 +366,8 @@ public static void assertOutputFiles( } List expectedElements = new ArrayList<>(elems.length); - for (T elem : elems) { - byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem); + for (String elem : elems) { + byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem); String line = new String(encodedElem); expectedElements.add(line); } @@ -453,55 +419,43 @@ public boolean apply(List fileLines) { @Test @Category(NeedsRunner.class) public void testWriteStrings() throws Exception { - runTestWrite(LINES_ARRAY, StringUtf8Coder.of()); + runTestWrite(LINES_ARRAY); } @Test @Category(NeedsRunner.class) public void testWriteEmptyStringsNoSharding() throws Exception { - runTestWrite(NO_LINES_ARRAY, StringUtf8Coder.of(), 0); + runTestWrite(NO_LINES_ARRAY, 0); } @Test @Category(NeedsRunner.class) public void testWriteEmptyStrings() throws Exception { - runTestWrite(NO_LINES_ARRAY, StringUtf8Coder.of()); - } - - @Test - @Category(NeedsRunner.class) - public void testWriteInts() throws Exception { - runTestWrite(INTS_ARRAY, TextualIntegerCoder.of()); - } - - @Test - @Category(NeedsRunner.class) - public void testWriteEmptyInts() throws Exception { - runTestWrite(NO_INTS_ARRAY, TextualIntegerCoder.of()); + runTestWrite(NO_LINES_ARRAY); } @Test @Category(NeedsRunner.class) public void testShardedWrite() throws Exception { - runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), 5); + runTestWrite(LINES_ARRAY, 5); } @Test @Category(NeedsRunner.class) public void testWriteWithHeader() throws Exception { - runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), MY_HEADER, null); + runTestWrite(LINES_ARRAY, MY_HEADER, null); } @Test @Category(NeedsRunner.class) public void testWriteWithFooter() throws Exception { - runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), null, MY_FOOTER); + runTestWrite(LINES_ARRAY, null, MY_FOOTER); } @Test @Category(NeedsRunner.class) public void testWriteWithHeaderAndFooter() throws Exception { - runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), MY_HEADER, MY_FOOTER); + runTestWrite(LINES_ARRAY, MY_HEADER, MY_FOOTER); } @Test @@ -515,7 +469,7 @@ public void testWriteWithWritableByteChannelFactory() throws Exception { final WritableByteChannelFactory writableByteChannelFactory = new DrunkWritableByteChannelFactory(); - TextIO.Write.Bound write = TextIO.Write.to(baseDir.resolve(outputName).toString()) + TextIO.Write.Bound write = TextIO.Write.to(baseDir.resolve(outputName).toString()) .withoutSharding().withWritableByteChannelFactory(writableByteChannelFactory); DisplayData displayData = DisplayData.from(write); assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "DRUNK")); @@ -529,13 +483,13 @@ public void testWriteWithWritableByteChannelFactory() throws Exception { drunkElems.add(elem + elem); drunkElems.add(""); } - assertOutputFiles(drunkElems.toArray(new String[0]), null, null, coder, 1, baseDir, + assertOutputFiles(drunkElems.toArray(new String[0]), null, null, 1, baseDir, outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardNameTemplate()); } @Test public void testWriteDisplayData() { - TextIO.Write.Bound write = TextIO.Write + TextIO.Write.Bound write = TextIO.Write .to("foo") .withSuffix("bar") .withShardNameTemplate("-SS-of-NN-") @@ -558,7 +512,7 @@ public void testWriteDisplayData() { @Test public void testWriteDisplayDataValidateThenHeader() { - TextIO.Write.Bound write = TextIO.Write + TextIO.Write.Bound write = TextIO.Write .to("foo") .withHeader("myHeader"); @@ -570,7 +524,7 @@ public void testWriteDisplayDataValidateThenHeader() { @Test public void testWriteDisplayDataValidateThenFooter() { - TextIO.Write.Bound write = TextIO.Write + TextIO.Write.Bound write = TextIO.Write .to("foo") .withFooter("myFooter"); @@ -590,7 +544,7 @@ public void testPrimitiveWriteDisplayData() throws IOException { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - TextIO.Write.Bound write = TextIO.Write.to(outputPath); + TextIO.Write.Bound write = TextIO.Write.to(outputPath); Set displayData = evaluator.displayDataForPrimitiveTransforms(write); assertThat("TextIO.Write should include the file prefix in its primitive display data", @@ -649,21 +603,21 @@ public void testRuntimeOptionsNotCalledInApply() throws Exception { @Test public void testReadWithoutValidationFlag() throws Exception { - TextIO.Read.Bound read = TextIO.Read.from("gs://bucket/foo*/baz"); + TextIO.Read.Bound read = TextIO.Read.from("gs://bucket/foo*/baz"); assertTrue(read.needsValidation()); assertFalse(read.withoutValidation().needsValidation()); } @Test public void testWriteWithoutValidationFlag() throws Exception { - TextIO.Write.Bound write = TextIO.Write.to("gs://bucket/foo/baz"); + TextIO.Write.Bound write = TextIO.Write.to("gs://bucket/foo/baz"); assertTrue(write.needsValidation()); assertFalse(write.withoutValidation().needsValidation()); } @Test public void testCompressionTypeIsSet() throws Exception { - TextIO.Read.Bound read = TextIO.Read.from("gs://bucket/test"); + TextIO.Read.Bound read = TextIO.Read.from("gs://bucket/test"); assertEquals(AUTO, read.getCompressionType()); read = TextIO.Read.from("gs://bucket/test").withCompressionType(GZIP); assertEquals(GZIP, read.getCompressionType()); @@ -688,7 +642,7 @@ private static void writeToStreamAndClose(String[] lines, OutputStream outputStr private void assertReadingCompressedFileMatchesExpected( File file, CompressionType compressionType, String[] expected) { - TextIO.Read.Bound read = + TextIO.Read.Bound read = TextIO.Read.from(file.getPath()).withCompressionType(compressionType); PCollection output = p.apply("Read_" + file + "_" + compressionType.toString(), read); @@ -1064,74 +1018,74 @@ public void testReadFileWithMixedDelimitersAndNonEmptyBytesAtEnd() throws Except } private void runTestReadWithData(byte[] data, List expectedResults) throws Exception { - TextSource source = prepareSource(data); + TextSource source = prepareSource(data); List actual = SourceTestUtils.readFromSource(source, PipelineOptionsFactory.create()); assertThat(actual, containsInAnyOrder(new ArrayList<>(expectedResults).toArray(new String[0]))); } @Test public void testSplittingSourceWithEmptyLines() throws Exception { - TextSource source = prepareSource("\n\n\n".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("\n\n\n".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithLineFeedDelimiter() throws Exception { - TextSource source = prepareSource("asdf\nhjkl\nxyz\n".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\nhjkl\nxyz\n".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithCarriageReturnDelimiter() throws Exception { - TextSource source = prepareSource("asdf\rhjkl\rxyz\r".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\rhjkl\rxyz\r".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithCarriageReturnAndLineFeedDelimiter() throws Exception { - TextSource source = prepareSource( + TextSource source = prepareSource( "asdf\r\nhjkl\r\nxyz\r\n".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithMixedDelimiters() throws Exception { - TextSource source = prepareSource( + TextSource source = prepareSource( "asdf\rhjkl\r\nxyz\n".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithLineFeedDelimiterAndNonEmptyBytesAtEnd() throws Exception { - TextSource source = prepareSource("asdf\nhjkl\nxyz".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\nhjkl\nxyz".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithCarriageReturnDelimiterAndNonEmptyBytesAtEnd() throws Exception { - TextSource source = prepareSource("asdf\rhjkl\rxyz".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\rhjkl\rxyz".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithCarriageReturnAndLineFeedDelimiterAndNonEmptyBytesAtEnd() throws Exception { - TextSource source = prepareSource( + TextSource source = prepareSource( "asdf\r\nhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } @Test public void testSplittingSourceWithMixedDelimitersAndNonEmptyBytesAtEnd() throws Exception { - TextSource source = prepareSource("asdf\rhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8)); + TextSource source = prepareSource("asdf\rhjkl\r\nxyz".getBytes(StandardCharsets.UTF_8)); SourceTestUtils.assertSplitAtFractionExhaustive(source, PipelineOptionsFactory.create()); } - private TextSource prepareSource(byte[] data) throws IOException { + private TextSource prepareSource(byte[] data) throws IOException { Path path = Files.createTempFile(tempFolder, "tempfile", "ext"); Files.write(path, data); - return new TextSource<>(path.toString(), StringUtf8Coder.of()); + return new TextSource(path.toString()); } @Test From a94dd3c0b8c53fb3afeb07e0dd96006e084fbf8c Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sat, 11 Feb 2017 09:21:30 +0200 Subject: [PATCH 2/4] Changes after review --- examples/java/pom.xml | 5 --- .../complete/TopWikipediaSessions.java | 8 ++--- .../java/org/apache/beam/sdk/io/TextIO.java | 32 ++++++++++++------- .../org/apache/beam/sdk/io/TextIOTest.java | 4 +-- 4 files changed, 24 insertions(+), 25 deletions(-) diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 8c4e73e39a3b..9da814b1388d 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -493,11 +493,6 @@ true - - com.fasterxml.jackson.core - jackson-databind - - diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java index c3c2646451d1..2078b217f292 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java @@ -17,8 +17,6 @@ */ package org.apache.beam.examples.complete; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; import com.google.api.services.bigquery.model.TableRow; import java.io.IOException; import java.util.List; @@ -42,6 +40,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; @@ -142,13 +141,10 @@ public void processElement(ProcessContext c, BoundedWindow window) { } static class ParseTableRowJson implements SerializableFunction { - private final ObjectMapper mapper = - new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); - @Override public TableRow apply(String input) { try { - return mapper.readValue(input, TableRow.class); + return Transport.getJsonFactory().fromString(input, TableRow.class); } catch (IOException e) { throw new RuntimeException("Failed parsing table row json", e); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index d78a73b0091a..726411c78260 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -26,7 +26,7 @@ import com.google.protobuf.ByteString; import java.io.IOException; -import java.io.OutputStream; +import java.io.OutputStreamWriter; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; @@ -39,7 +39,6 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; @@ -66,7 +65,8 @@ * {@code "gs:///"}). * *

{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, - * each corresponding to one line of an input UTF-8 text file. + * each corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', + * '\r', or '\r\n'). * *

Example: * @@ -981,7 +981,7 @@ protected boolean readNextRecord() throws IOException { */ private void decodeCurrentElement() throws IOException { ByteString dataToDecode = buffer.substring(0, startOfSeparatorInBuffer); - currentValue = DEFAULT_TEXT_CODER.decode(dataToDecode.newInput(), Context.OUTER); + currentValue = dataToDecode.toStringUtf8(); elementIsPresent = true; buffer = buffer.substring(endOfSeparatorInBuffer); } @@ -1058,10 +1058,10 @@ public FileBasedWriter createWriter(PipelineOptions options) throws Exception { * for text files. */ private static class TextWriter extends FileBasedWriter { - private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); + private static final String NEWLINE = "\n"; @Nullable private final String header; @Nullable private final String footer; - private OutputStream out; + private OutputStreamWriter out; public TextWriter( FileBasedWriteOperation writeOperation, @@ -1074,18 +1074,25 @@ public TextWriter( } /** - * Writes {@code value} followed by a newline if {@code value} is not null. + * Writes {@code value} followed by a newline character if {@code value} is not null. */ private void writeIfNotNull(@Nullable String value) throws IOException { if (value != null) { - out.write(value.getBytes(StandardCharsets.UTF_8)); - out.write(NEWLINE); + writeLine(value); } } + /** + * Writes {@code value} followed by newline character. + */ + private void writeLine(String value) throws IOException { + out.write(value); + out.write(NEWLINE); + } + @Override protected void prepareWrite(WritableByteChannel channel) throws Exception { - out = Channels.newOutputStream(channel); + out = new OutputStreamWriter(Channels.newOutputStream(channel), StandardCharsets.UTF_8); } @Override @@ -1096,12 +1103,13 @@ protected void writeHeader() throws Exception { @Override protected void writeFooter() throws Exception { writeIfNotNull(footer); + // Flush here because there is currently no other natural place to do this. [BEAM-1465] + out.flush(); } @Override public void write(String value) throws Exception { - DEFAULT_TEXT_CODER.encode(value, out, Context.OUTER); - out.write(NEWLINE); + writeLine(value); } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index b3f4d85ae7c4..630460308302 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -480,8 +480,8 @@ public void testWriteWithWritableByteChannelFactory() throws Exception { final List drunkElems = new ArrayList<>(LINES2_ARRAY.length * 2 + 2); for (String elem : LINES2_ARRAY) { - drunkElems.add(elem + elem); - drunkElems.add(""); + drunkElems.add(elem); + drunkElems.add(elem); } assertOutputFiles(drunkElems.toArray(new String[0]), null, null, 1, baseDir, outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardNameTemplate()); From d1f480763775a2ecbbec96e6e5a796e7229b9c44 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sun, 12 Feb 2017 22:01:05 +0200 Subject: [PATCH 3/4] Use SimpleFunction to simplify transformation --- .../apache/beam/examples/complete/TopWikipediaSessions.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java index 2078b217f292..274cb447eea2 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableComparator; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.CalendarWindows; @@ -140,7 +141,7 @@ public void processElement(ProcessContext c, BoundedWindow window) { } } - static class ParseTableRowJson implements SerializableFunction { + static class ParseTableRowJson extends SimpleFunction { @Override public TableRow apply(String input) { try { @@ -209,8 +210,7 @@ public static void main(String[] args) { double samplingThreshold = 0.1; p.apply(TextIO.Read.from(options.getInput())) - .apply(MapElements.via(new ParseTableRowJson()) - .withOutputType(TypeDescriptor.of(TableRow.class))) + .apply(MapElements.via(new ParseTableRowJson())) .apply(new ComputeTopSessions(samplingThreshold)) .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput())); From 977b0ea0f25c05f7229408d2bcff7f7e77b70c57 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Mon, 13 Feb 2017 07:23:17 +0200 Subject: [PATCH 4/4] Fixed checkstyle violations. --- .../apache/beam/examples/complete/TopWikipediaSessions.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java index 274cb447eea2..4c07ca42e14c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java @@ -33,7 +33,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableComparator; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -44,7 +43,6 @@ import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Duration; import org.joda.time.Instant; @@ -141,7 +139,7 @@ public void processElement(ProcessContext c, BoundedWindow window) { } } - static class ParseTableRowJson extends SimpleFunction { + static class ParseTableRowJson extends SimpleFunction { @Override public TableRow apply(String input) { try {