From 91f8b632731ba638940281fafb63a685f46e74a3 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Mon, 17 Apr 2017 17:21:34 -0700 Subject: [PATCH 1/5] Gets rid of TFRecordIO.Read.Bound --- .../org/apache/beam/sdk/io/TFRecordIO.java | 261 +++++++----------- .../apache/beam/sdk/io/TFRecordIOTest.java | 10 +- 2 files changed, 107 insertions(+), 164 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index 055223605333..b4fd93cd954e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -18,13 +18,11 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -32,9 +30,7 @@ import java.nio.channels.WritableByteChannel; import java.util.NoSuchElementException; import java.util.regex.Pattern; - import javax.annotation.Nullable; - import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; @@ -62,7 +58,20 @@ public class TFRecordIO { * files matching a pattern) and returns a {@link PCollection} containing * the decoding of each of the records of the TFRecord file(s) as a byte array. */ - public static class Read { + public static Read read() { + return new Read(); + } + + /** Implementation of {@link #read}. */ + public static class Read extends PTransform> { + /** The filepattern to read from. */ + @Nullable private final ValueProvider filepattern; + + /** An option to indicate if input validation is desired. Default is true. */ + private final boolean validate; + + /** Option to indicate the input source's compression type. Default is AUTO. */ + private final TFRecordIO.CompressionType compressionType; /** * Returns a transform for reading TFRecord files that reads from the file(s) @@ -72,16 +81,17 @@ public static class Read { * execution). Standard Java Filesystem glob patterns ("*", "?", "[..]") are supported. */ - public static Bound from(String filepattern) { - return new Bound().from(filepattern); + public Read from(String filepattern) { + return new Read().from(filepattern); } /** * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */ - public static Bound from(ValueProvider filepattern) { - return new Bound().from(filepattern); + public Read from(ValueProvider filepattern) { + return new Read().from(filepattern); } + /** * Returns a transform for reading TFRecord files that has GCS path validation on * pipeline creation disabled. @@ -90,8 +100,8 @@ public static Bound from(ValueProvider filepattern) { * exist at the pipeline creation time, but is expected to be * available at execution time. */ - public static Bound withoutValidation() { - return new Bound().withoutValidation(); + public Read withoutValidation() { + return new Read().withoutValidation(); } /** @@ -104,170 +114,103 @@ public static Bound withoutValidation() { * (e.g., {@code *.gz} is gzipped, {@code *.zlib} is zlib compressed, and all other * extensions are uncompressed). */ - public static Bound withCompressionType(TFRecordIO.CompressionType compressionType) { - return new Bound().withCompressionType(compressionType); + public Read withCompressionType(TFRecordIO.CompressionType compressionType) { + return new Read().withCompressionType(compressionType); } - /** - * A {@link PTransform} that reads from one or more TFRecord files and returns a bounded - * {@link PCollection} containing one element for each record of the input files. - */ - public static class Bound extends PTransform> { - /** The filepattern to read from. */ - @Nullable private final ValueProvider filepattern; - - /** An option to indicate if input validation is desired. Default is true. */ - private final boolean validate; - - /** Option to indicate the input source's compression type. Default is AUTO. */ - private final TFRecordIO.CompressionType compressionType; - - private Bound() { - this(null, null, true, TFRecordIO.CompressionType.AUTO); - } - - private Bound( - @Nullable String name, - @Nullable ValueProvider filepattern, - boolean validate, - TFRecordIO.CompressionType compressionType) { - super(name); - this.filepattern = filepattern; - this.validate = validate; - this.compressionType = compressionType; - } - - /** - * Returns a new transform for reading from TFRecord files that's like this one but that - * reads from the file(s) with the given name or pattern. See {@link TFRecordIO.Read#from} - * for a description of filepatterns. - * - *

Does not modify this object. - - */ - public Bound from(String filepattern) { - checkNotNull(filepattern, "Filepattern cannot be empty."); - return new Bound(name, StaticValueProvider.of(filepattern), validate, compressionType); - } - - /** - * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. - */ - public Bound from(ValueProvider filepattern) { - checkNotNull(filepattern, "Filepattern cannot be empty."); - return new Bound(name, filepattern, validate, compressionType); - } - - /** - * Returns a new transform for reading from TFRecord files that's like this one but - * that has GCS path validation on pipeline creation disabled. - * - *

This can be useful in the case where the GCS input does not - * exist at the pipeline creation time, but is expected to be - * available at execution time. - * - *

Does not modify this object. - */ - public Bound withoutValidation() { - return new Bound(name, filepattern, false, compressionType); - } + private Read() { + this(null, null, true, TFRecordIO.CompressionType.AUTO); + } - /** - * Returns a new transform for reading from TFRecord files that's like this one but - * reads from input sources using the specified compression type. - * - *

If no compression type is specified, the default is - * {@link TFRecordIO.CompressionType#AUTO}. - * See {@link TFRecordIO.Read#withCompressionType} for more details. - * - *

Does not modify this object. - */ - public Bound withCompressionType(TFRecordIO.CompressionType compressionType) { - return new Bound(name, filepattern, validate, compressionType); - } + private Read( + @Nullable String name, + @Nullable ValueProvider filepattern, + boolean validate, + TFRecordIO.CompressionType compressionType) { + super(name); + this.filepattern = filepattern; + this.validate = validate; + this.compressionType = compressionType; + } - @Override - public PCollection expand(PBegin input) { - if (filepattern == null) { + @Override + public PCollection expand(PBegin input) { + if (filepattern == null) { + throw new IllegalStateException( + "Need to set the filepattern of a TFRecordIO.Read transform"); + } + + if (validate) { + checkState(filepattern.isAccessible(), "Cannot validate with a RVP."); + try { + checkState( + !IOChannelUtils.getFactory(filepattern.get()).match(filepattern.get()).isEmpty(), + "Unable to find any files matching %s", + filepattern); + } catch (IOException e) { throw new IllegalStateException( - "Need to set the filepattern of a TFRecordIO.Read transform"); - } - - if (validate) { - checkState(filepattern.isAccessible(), "Cannot validate with a RVP."); - try { - checkState( - !IOChannelUtils.getFactory(filepattern.get()).match(filepattern.get()).isEmpty(), - "Unable to find any files matching %s", - filepattern); - } catch (IOException e) { - throw new IllegalStateException( - String.format("Failed to validate %s", filepattern.get()), e); - } - } - - final Bounded read = org.apache.beam.sdk.io.Read.from(getSource()); - PCollection pcol = input.getPipeline().apply("Read", read); - // Honor the default output coder that would have been used by this PTransform. - pcol.setCoder(getDefaultOutputCoder()); - return pcol; - } - - // Helper to create a source specific to the requested compression type. - protected FileBasedSource getSource() { - switch (compressionType) { - case NONE: - return new TFRecordSource(filepattern); - case AUTO: - return CompressedSource.from(new TFRecordSource(filepattern)); - case GZIP: - return - CompressedSource.from(new TFRecordSource(filepattern)) - .withDecompression(CompressedSource.CompressionMode.GZIP); - case ZLIB: - return - CompressedSource.from(new TFRecordSource(filepattern)) - .withDecompression(CompressedSource.CompressionMode.DEFLATE); - default: - throw new IllegalArgumentException("Unknown compression type: " + compressionType); + String.format("Failed to validate %s", filepattern.get()), e); } } - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); + final Bounded read = org.apache.beam.sdk.io.Read.from(getSource()); + PCollection pcol = input.getPipeline().apply("Read", read); + // Honor the default output coder that would have been used by this PTransform. + pcol.setCoder(getDefaultOutputCoder()); + return pcol; + } - String filepatternDisplay = filepattern.isAccessible() - ? filepattern.get() : filepattern.toString(); - builder - .add(DisplayData.item("compressionType", compressionType.toString()) - .withLabel("Compression Type")) - .addIfNotDefault(DisplayData.item("validation", validate) - .withLabel("Validation Enabled"), true) - .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay) - .withLabel("File Pattern")); + // Helper to create a source specific to the requested compression type. + protected FileBasedSource getSource() { + switch (compressionType) { + case NONE: + return new TFRecordSource(filepattern); + case AUTO: + return CompressedSource.from(new TFRecordSource(filepattern)); + case GZIP: + return + CompressedSource.from(new TFRecordSource(filepattern)) + .withDecompression(CompressedSource.CompressionMode.GZIP); + case ZLIB: + return + CompressedSource.from(new TFRecordSource(filepattern)) + .withDecompression(CompressedSource.CompressionMode.DEFLATE); + default: + throw new IllegalArgumentException("Unknown compression type: " + compressionType); } + } - @Override - protected Coder getDefaultOutputCoder() { - return DEFAULT_BYTE_ARRAY_CODER; - } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); - public String getFilepattern() { - return filepattern.get(); - } + String filepatternDisplay = filepattern.isAccessible() + ? filepattern.get() : filepattern.toString(); + builder + .add(DisplayData.item("compressionType", compressionType.toString()) + .withLabel("Compression Type")) + .addIfNotDefault(DisplayData.item("validation", validate) + .withLabel("Validation Enabled"), true) + .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay) + .withLabel("File Pattern")); + } - public boolean needsValidation() { - return validate; - } + @Override + protected Coder getDefaultOutputCoder() { + return DEFAULT_BYTE_ARRAY_CODER; + } - public TFRecordIO.CompressionType getCompressionType() { - return compressionType; - } + public String getFilepattern() { + return filepattern.get(); } - /** Disallow construction of utility classes. */ - private Read() {} + public boolean needsValidation() { + return validate; + } + + public TFRecordIO.CompressionType getCompressionType() { + return compressionType; + } } ///////////////////////////////////////////////////////////////////////////// diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java index 94530ef0776f..2a455d16d3a3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java @@ -136,15 +136,15 @@ public void testReadNamed() { assertEquals( "TFRecordIO.Read/Read.out", - p.apply(TFRecordIO.Read.withoutValidation().from("foo.*")).getName()); + p.apply(TFRecordIO.read().from("foo.*").withoutValidation()).getName()); assertEquals( "MyRead/Read.out", - p.apply("MyRead", TFRecordIO.Read.withoutValidation().from("foo.*")).getName()); + p.apply("MyRead", TFRecordIO.read().from("foo.*").withoutValidation()).getName()); } @Test public void testReadDisplayData() { - TFRecordIO.Read.Bound read = TFRecordIO.Read + TFRecordIO.Read read = TFRecordIO.read() .from("foo.*") .withCompressionType(GZIP) .withoutValidation(); @@ -241,7 +241,7 @@ private void runTestRead(byte[] data, String[] expected) throws IOException { fos.write(data); fos.close(); - TFRecordIO.Read.Bound read = TFRecordIO.Read.from(filename); + TFRecordIO.Read read = TFRecordIO.read().from(filename); PCollection output = p.apply(read).apply(ParDo.of(new ByteArrayToString())); PAssert.that(output).containsInAnyOrder(expected); @@ -338,7 +338,7 @@ private void runTestRoundTrip(Iterable elems, .apply(write); p.run(); - TFRecordIO.Read.Bound read = TFRecordIO.Read.from(baseFilename + "*") + TFRecordIO.Read read = TFRecordIO.read().from(baseFilename + "*") .withCompressionType(readCompressionType); PCollection output = p2.apply(read).apply(ParDo.of(new ByteArrayToString())); From 1d695f04b9b1386154dc5b9844ca110693ed1d83 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Mon, 17 Apr 2017 17:29:06 -0700 Subject: [PATCH 2/5] Converts TFRecordIO.Read to AutoValue --- .../org/apache/beam/sdk/io/TFRecordIO.java | 99 +++++++++---------- 1 file changed, 44 insertions(+), 55 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index b4fd93cd954e..fb4ff5bdfa65 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; @@ -59,19 +60,32 @@ public class TFRecordIO { * the decoding of each of the records of the TFRecord file(s) as a byte array. */ public static Read read() { - return new Read(); + return new AutoValue_TFRecordIO_Read.Builder() + .setValidate(true) + .setCompressionType(CompressionType.AUTO) + .build(); } /** Implementation of {@link #read}. */ - public static class Read extends PTransform> { - /** The filepattern to read from. */ - @Nullable private final ValueProvider filepattern; + @AutoValue + public abstract static class Read extends PTransform> { + @Nullable + abstract ValueProvider getFilepattern(); - /** An option to indicate if input validation is desired. Default is true. */ - private final boolean validate; + abstract boolean getValidate(); - /** Option to indicate the input source's compression type. Default is AUTO. */ - private final TFRecordIO.CompressionType compressionType; + abstract CompressionType getCompressionType(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setFilepattern(ValueProvider filepattern); + abstract Builder setValidate(boolean validate); + abstract Builder setCompressionType(CompressionType compressionType); + + abstract Read build(); + } /** * Returns a transform for reading TFRecord files that reads from the file(s) @@ -82,14 +96,14 @@ public static class Read extends PTransform> { * >Java Filesystem glob patterns ("*", "?", "[..]") are supported. */ public Read from(String filepattern) { - return new Read().from(filepattern); + return from(StaticValueProvider.of(filepattern)); } /** * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */ public Read from(ValueProvider filepattern) { - return new Read().from(filepattern); + return toBuilder().setFilepattern(filepattern).build(); } /** @@ -101,7 +115,7 @@ public Read from(ValueProvider filepattern) { * available at execution time. */ public Read withoutValidation() { - return new Read().withoutValidation(); + return toBuilder().setValidate(false).build(); } /** @@ -115,41 +129,28 @@ public Read withoutValidation() { * extensions are uncompressed). */ public Read withCompressionType(TFRecordIO.CompressionType compressionType) { - return new Read().withCompressionType(compressionType); - } - - private Read() { - this(null, null, true, TFRecordIO.CompressionType.AUTO); - } - - private Read( - @Nullable String name, - @Nullable ValueProvider filepattern, - boolean validate, - TFRecordIO.CompressionType compressionType) { - super(name); - this.filepattern = filepattern; - this.validate = validate; - this.compressionType = compressionType; + return toBuilder().setCompressionType(compressionType).build(); } @Override public PCollection expand(PBegin input) { - if (filepattern == null) { + if (getFilepattern() == null) { throw new IllegalStateException( "Need to set the filepattern of a TFRecordIO.Read transform"); } - if (validate) { - checkState(filepattern.isAccessible(), "Cannot validate with a RVP."); + if (getValidate()) { + checkState(getFilepattern().isAccessible(), "Cannot validate with a RVP."); try { checkState( - !IOChannelUtils.getFactory(filepattern.get()).match(filepattern.get()).isEmpty(), + !IOChannelUtils.getFactory(getFilepattern().get()) + .match(getFilepattern().get()) + .isEmpty(), "Unable to find any files matching %s", - filepattern); + getFilepattern()); } catch (IOException e) { throw new IllegalStateException( - String.format("Failed to validate %s", filepattern.get()), e); + String.format("Failed to validate %s", getFilepattern().get()), e); } } @@ -162,21 +163,21 @@ public PCollection expand(PBegin input) { // Helper to create a source specific to the requested compression type. protected FileBasedSource getSource() { - switch (compressionType) { + switch (getCompressionType()) { case NONE: - return new TFRecordSource(filepattern); + return new TFRecordSource(getFilepattern()); case AUTO: - return CompressedSource.from(new TFRecordSource(filepattern)); + return CompressedSource.from(new TFRecordSource(getFilepattern())); case GZIP: return - CompressedSource.from(new TFRecordSource(filepattern)) + CompressedSource.from(new TFRecordSource(getFilepattern())) .withDecompression(CompressedSource.CompressionMode.GZIP); case ZLIB: return - CompressedSource.from(new TFRecordSource(filepattern)) + CompressedSource.from(new TFRecordSource(getFilepattern())) .withDecompression(CompressedSource.CompressionMode.DEFLATE); default: - throw new IllegalArgumentException("Unknown compression type: " + compressionType); + throw new IllegalArgumentException("Unknown compression type: " + getCompressionType()); } } @@ -184,12 +185,12 @@ protected FileBasedSource getSource() { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - String filepatternDisplay = filepattern.isAccessible() - ? filepattern.get() : filepattern.toString(); + String filepatternDisplay = getFilepattern().isAccessible() + ? getFilepattern().get() : getFilepattern().toString(); builder - .add(DisplayData.item("compressionType", compressionType.toString()) + .add(DisplayData.item("compressionType", getCompressionType().toString()) .withLabel("Compression Type")) - .addIfNotDefault(DisplayData.item("validation", validate) + .addIfNotDefault(DisplayData.item("validation", getValidate()) .withLabel("Validation Enabled"), true) .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay) .withLabel("File Pattern")); @@ -199,18 +200,6 @@ public void populateDisplayData(DisplayData.Builder builder) { protected Coder getDefaultOutputCoder() { return DEFAULT_BYTE_ARRAY_CODER; } - - public String getFilepattern() { - return filepattern.get(); - } - - public boolean needsValidation() { - return validate; - } - - public TFRecordIO.CompressionType getCompressionType() { - return compressionType; - } } ///////////////////////////////////////////////////////////////////////////// From 5b43df9e12db8a467e60b76ab92c53f0ed3e7a5c Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Mon, 17 Apr 2017 17:38:08 -0700 Subject: [PATCH 3/5] Gets rid of TFRecordIO.Write.Bound --- .../org/apache/beam/sdk/io/TFRecordIO.java | 407 +++++++----------- .../apache/beam/sdk/io/TFRecordIOTest.java | 6 +- 2 files changed, 154 insertions(+), 259 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index fb4ff5bdfa65..13fd4d1a5a31 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -66,6 +66,15 @@ public static Read read() { .build(); } + /** + * A {@link PTransform} that writes a {@link PCollection} to TFRecord file (or + * multiple TFRecord files matching a sharding pattern), with each + * element of the input collection encoded into its own record. + */ + public static Write write() { + return new Write(); + } + /** Implementation of {@link #read}. */ @AutoValue public abstract static class Read extends PTransform> { @@ -204,45 +213,80 @@ protected Coder getDefaultOutputCoder() { ///////////////////////////////////////////////////////////////////////////// - /** - * A {@link PTransform} that writes a {@link PCollection} to TFRecord file (or - * multiple TFRecord files matching a sharding pattern), with each - * element of the input collection encoded into its own record. - */ - public static class Write { + /** Implementation of {@link #write}. */ + public static class Write extends PTransform, PDone> { + private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; + + /** The prefix of each file written, combined with suffix and shardTemplate. */ + private final ValueProvider filenamePrefix; + /** The suffix of each file written, combined with prefix and shardTemplate. */ + private final String filenameSuffix; + + /** Requested number of shards. 0 for automatic. */ + private final int numShards; + + /** The shard template of each file written, combined with prefix and suffix. */ + private final String shardTemplate; + + /** An option to indicate if output validation is desired. Default is true. */ + private final boolean validate; + + /** Option to indicate the output sink's compression type. Default is NONE. */ + private final TFRecordIO.CompressionType compressionType; + + private Write() { + this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, true, TFRecordIO.CompressionType.NONE); + } + + private Write(String name, ValueProvider filenamePrefix, String filenameSuffix, + int numShards, String shardTemplate, boolean validate, + CompressionType compressionType) { + super(name); + this.filenamePrefix = filenamePrefix; + this.filenameSuffix = filenameSuffix; + this.numShards = numShards; + this.shardTemplate = shardTemplate; + this.validate = validate; + this.compressionType = compressionType; + } /** - * Returns a transform for writing to TFRecord files that writes to the file(s) - * with the given prefix. This can be a local filename + * Writes to TFRecord file(s) with the given prefix. This can be a local filename * (if running locally), or a Google Cloud Storage filename of * the form {@code "gs:///"} * (if running locally or using remote execution). * *

The files written will begin with this prefix, followed by - * a shard identifier (see {@link TFRecordIO.Write.Bound#withNumShards(int)}, and end - * in a common extension, if given by {@link TFRecordIO.Write.Bound#withSuffix(String)}. + * a shard identifier (see {@link #withNumShards(int)}, and end + * in a common extension, if given by {@link #withSuffix(String)}. */ - public static Bound to(String prefix) { - return new Bound().to(prefix); + public Write to(String filenamePrefix) { + validateOutputComponent(filenamePrefix); + return new Write(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, numShards, + shardTemplate, validate, compressionType); } /** * Like {@link #to(String)}, but with a {@link ValueProvider}. */ - public static Bound to(ValueProvider prefix) { - return new Bound().to(prefix); + public Write to(ValueProvider filenamePrefix) { + return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate, + compressionType); } /** - * Returns a transform for writing to TFRecord files that appends the specified suffix - * to the created files. + * Writes to the file(s) with the given filename suffix. + * + * @see ShardNameTemplate */ - public static Bound withSuffix(String nameExtension) { - return new Bound().withSuffix(nameExtension); + public Write withSuffix(String nameExtension) { + validateOutputComponent(nameExtension); + return new Write(name, filenamePrefix, nameExtension, numShards, shardTemplate, validate, + compressionType); } /** - * Returns a transform for writing to TFRecord files that uses the provided shard count. + * Writes to the provided number of shards. * *

Constraining the number of shards is likely to reduce * the performance of a pipeline. Setting this value is not recommended @@ -250,280 +294,131 @@ public static Bound withSuffix(String nameExtension) { * * @param numShards the number of shards to use, or 0 to let the system * decide. + * @see ShardNameTemplate */ - public static Bound withNumShards(int numShards) { - return new Bound().withNumShards(numShards); + public Write withNumShards(int numShards) { + checkArgument(numShards >= 0); + return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate, + compressionType); } /** - * Returns a transform for writing to TFRecord files that uses the given shard name - * template. + * Uses the given shard name template. * - *

See {@link ShardNameTemplate} for a description of shard templates. + * @see ShardNameTemplate */ - public static Bound withShardNameTemplate(String shardTemplate) { - return new Bound().withShardNameTemplate(shardTemplate); + public Write withShardNameTemplate(String shardTemplate) { + return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate, + compressionType); } /** - * Returns a transform for writing to TFRecord files that forces a single file as - * output. + * Forces a single file as output. + * + *

Constraining the number of shards is likely to reduce + * the performance of a pipeline. Using this setting is not recommended + * unless you truly require a single output file. + * + *

This is a shortcut for + * {@code .withNumShards(1).withShardNameTemplate("")} */ - public static Bound withoutSharding() { - return new Bound().withoutSharding(); + public Write withoutSharding() { + return new Write(name, filenamePrefix, filenameSuffix, 1, "", + validate, compressionType); } /** - * Returns a transform for writing to text files that has GCS path validation on - * pipeline creation disabled. + * Disables GCS output path validation on pipeline creation. * *

This can be useful in the case where the GCS output location does - * not exist at the pipeline creation time, but is expected to be available - * at execution time. + * not exist at the pipeline creation time, but is expected to be + * available at execution time. */ - public static Bound withoutValidation() { - return new Bound().withoutValidation(); + public Write withoutValidation() { + return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, false, + compressionType); } /** - * Returns a transform for writing to TFRecord files like this one but writes to output files - * using the specified compression type. + * Writes to output files using the specified compression type. * *

If no compression type is specified, the default is * {@link TFRecordIO.CompressionType#NONE}. * See {@link TFRecordIO.Read#withCompressionType} for more details. */ - public static Bound withCompressionType(CompressionType compressionType) { - return new Bound().withCompressionType(compressionType); + public Write withCompressionType(CompressionType compressionType) { + return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate, + compressionType); } - /** - * A PTransform that writes a bounded PCollection to a TFRecord file (or - * multiple TFRecord files matching a sharding pattern), with each - * PCollection element being encoded into its own record. - */ - public static class Bound extends PTransform, PDone> { - private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; - - /** The prefix of each file written, combined with suffix and shardTemplate. */ - private final ValueProvider filenamePrefix; - /** The suffix of each file written, combined with prefix and shardTemplate. */ - private final String filenameSuffix; - - /** Requested number of shards. 0 for automatic. */ - private final int numShards; - - /** The shard template of each file written, combined with prefix and suffix. */ - private final String shardTemplate; - - /** An option to indicate if output validation is desired. Default is true. */ - private final boolean validate; - - /** Option to indicate the output sink's compression type. Default is NONE. */ - private final TFRecordIO.CompressionType compressionType; - - private Bound() { - this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, true, TFRecordIO.CompressionType.NONE); - } - - private Bound(String name, ValueProvider filenamePrefix, String filenameSuffix, - int numShards, String shardTemplate, boolean validate, - CompressionType compressionType) { - super(name); - this.filenamePrefix = filenamePrefix; - this.filenameSuffix = filenameSuffix; - this.numShards = numShards; - this.shardTemplate = shardTemplate; - this.validate = validate; - this.compressionType = compressionType; - } - - /** - * Returns a transform for writing to TFRecord files that's like this one but - * that writes to the file(s) with the given filename prefix. - * - *

See {@link TFRecordIO.Write#to(String) Write.to(String)} for more information. - * - *

Does not modify this object. - */ - public Bound to(String filenamePrefix) { - validateOutputComponent(filenamePrefix); - return new Bound(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, numShards, - shardTemplate, validate, compressionType); - } - - /** - * Like {@link #to(String)}, but with a {@link ValueProvider}. - */ - public Bound to(ValueProvider filenamePrefix) { - return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate, - compressionType); - } - - /** - * Returns a transform for writing to TFRecord files that that's like this one but - * that writes to the file(s) with the given filename suffix. - * - *

Does not modify this object. - * - * @see ShardNameTemplate - */ - public Bound withSuffix(String nameExtension) { - validateOutputComponent(nameExtension); - return new Bound(name, filenamePrefix, nameExtension, numShards, shardTemplate, validate, - compressionType); - } - - /** - * Returns a transform for writing to TFRecord files that's like this one but - * that uses the provided shard count. - * - *

Constraining the number of shards is likely to reduce - * the performance of a pipeline. Setting this value is not recommended - * unless you require a specific number of output files. - * - *

Does not modify this object. - * - * @param numShards the number of shards to use, or 0 to let the system - * decide. - * @see ShardNameTemplate - */ - public Bound withNumShards(int numShards) { - checkArgument(numShards >= 0); - return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate, - compressionType); - } - - /** - * Returns a transform for writing to TFRecord files that's like this one but - * that uses the given shard name template. - * - *

Does not modify this object. - * - * @see ShardNameTemplate - */ - public Bound withShardNameTemplate(String shardTemplate) { - return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate, - compressionType); - } - - /** - * Returns a transform for writing to TFRecord files that's like this one but - * that forces a single file as output. - * - *

Constraining the number of shards is likely to reduce - * the performance of a pipeline. Using this setting is not recommended - * unless you truly require a single output file. - * - *

This is a shortcut for - * {@code .withNumShards(1).withShardNameTemplate("")} - * - *

Does not modify this object. - */ - public Bound withoutSharding() { - return new Bound(name, filenamePrefix, filenameSuffix, 1, "", - validate, compressionType); - } - - /** - * Returns a transform for writing to TFRecord files that's like this one but - * that has GCS output path validation on pipeline creation disabled. - * - *

This can be useful in the case where the GCS output location does - * not exist at the pipeline creation time, but is expected to be - * available at execution time. - * - *

Does not modify this object. - */ - public Bound withoutValidation() { - return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, false, - compressionType); + @Override + public PDone expand(PCollection input) { + if (filenamePrefix == null) { + throw new IllegalStateException( + "need to set the filename prefix of a TFRecordIO.Write transform"); } - - /** - * Returns a transform for writing to TFRecord files like this one but writes to output files - * using the specified compression type. - * - *

If no compression type is specified, the default is - * {@link TFRecordIO.CompressionType#NONE}. - * See {@link TFRecordIO.Read#withCompressionType} for more details. - * - *

Does not modify this object. - */ - public Bound withCompressionType(CompressionType compressionType) { - return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate, - compressionType); + org.apache.beam.sdk.io.Write write = + org.apache.beam.sdk.io.Write.to( + new TFRecordSink(filenamePrefix, filenameSuffix, shardTemplate, compressionType)); + if (getNumShards() > 0) { + write = write.withNumShards(getNumShards()); } + return input.apply("Write", write); + } - @Override - public PDone expand(PCollection input) { - if (filenamePrefix == null) { - throw new IllegalStateException( - "need to set the filename prefix of a TFRecordIO.Write transform"); - } - org.apache.beam.sdk.io.Write write = - org.apache.beam.sdk.io.Write.to( - new TFRecordSink(filenamePrefix, filenameSuffix, shardTemplate, compressionType)); - if (getNumShards() > 0) { - write = write.withNumShards(getNumShards()); - } - return input.apply("Write", write); - } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - String prefixString = filenamePrefix.isAccessible() - ? filenamePrefix.get() : filenamePrefix.toString(); - builder - .addIfNotNull(DisplayData.item("filePrefix", prefixString) - .withLabel("Output File Prefix")) - .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix) - .withLabel("Output File Suffix"), "") - .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate) - .withLabel("Output Shard Name Template"), - DEFAULT_SHARD_TEMPLATE) - .addIfNotDefault(DisplayData.item("validation", validate) - .withLabel("Validation Enabled"), true) - .addIfNotDefault(DisplayData.item("numShards", numShards) - .withLabel("Maximum Output Shards"), 0) - .add(DisplayData - .item("compressionType", compressionType.toString()) - .withLabel("Compression Type")); - } + String prefixString = filenamePrefix.isAccessible() + ? filenamePrefix.get() : filenamePrefix.toString(); + builder + .addIfNotNull(DisplayData.item("filePrefix", prefixString) + .withLabel("Output File Prefix")) + .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix) + .withLabel("Output File Suffix"), "") + .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate) + .withLabel("Output Shard Name Template"), + DEFAULT_SHARD_TEMPLATE) + .addIfNotDefault(DisplayData.item("validation", validate) + .withLabel("Validation Enabled"), true) + .addIfNotDefault(DisplayData.item("numShards", numShards) + .withLabel("Maximum Output Shards"), 0) + .add(DisplayData + .item("compressionType", compressionType.toString()) + .withLabel("Compression Type")); + } - /** - * Returns the current shard name template string. - */ - public String getShardNameTemplate() { - return shardTemplate; - } + /** + * Returns the current shard name template string. + */ + public String getShardNameTemplate() { + return shardTemplate; + } - @Override - protected Coder getDefaultOutputCoder() { - return VoidCoder.of(); - } + @Override + protected Coder getDefaultOutputCoder() { + return VoidCoder.of(); + } - public String getFilenamePrefix() { - return filenamePrefix.get(); - } + public String getFilenamePrefix() { + return filenamePrefix.get(); + } - public String getShardTemplate() { - return shardTemplate; - } + public String getShardTemplate() { + return shardTemplate; + } - public int getNumShards() { - return numShards; - } + public int getNumShards() { + return numShards; + } - public String getFilenameSuffix() { - return filenameSuffix; - } + public String getFilenameSuffix() { + return filenameSuffix; + } - public boolean needsValidation() { - return validate; - } + public boolean needsValidation() { + return validate; } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java index 2a455d16d3a3..9511c2a7315a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java @@ -158,7 +158,7 @@ public void testReadDisplayData() { @Test public void testWriteDisplayData() { - TFRecordIO.Write.Bound write = TFRecordIO.Write + TFRecordIO.Write write = TFRecordIO.write() .to("foo") .withSuffix("bar") .withShardNameTemplate("-SS-of-NN-") @@ -255,7 +255,7 @@ private void runTestWrite(String[] elems, String ...base64) throws IOException { PCollection input = p.apply(Create.of(Arrays.asList(elems))) .apply(ParDo.of(new StringToByteArray())); - TFRecordIO.Write.Bound write = TFRecordIO.Write.to(filename).withoutSharding(); + TFRecordIO.Write write = TFRecordIO.write().to(filename).withoutSharding(); input.apply(write); p.run(); @@ -329,7 +329,7 @@ private void runTestRoundTrip(Iterable elems, Path baseDir = Files.createTempDirectory(tempFolder, "test-rt"); String baseFilename = baseDir.resolve(outputName).toString(); - TFRecordIO.Write.Bound write = TFRecordIO.Write.to(baseFilename) + TFRecordIO.Write write = TFRecordIO.write().to(baseFilename) .withNumShards(numShards) .withSuffix(suffix) .withCompressionType(writeCompressionType); From 7d89bb4661907a3e129c78d29fb8f7e68db12a24 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Mon, 17 Apr 2017 17:50:24 -0700 Subject: [PATCH 4/5] Converts TFRecordIO.Write to AutoValue --- .../org/apache/beam/sdk/io/TFRecordIO.java | 127 ++++++++---------- 1 file changed, 54 insertions(+), 73 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index 13fd4d1a5a31..2e01a80bc8be 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -72,7 +72,13 @@ public static Read read() { * element of the input collection encoded into its own record. */ public static Write write() { - return new Write(); + return new AutoValue_TFRecordIO_Write.Builder() + .setFilenameSuffix("") + .setNumShards(0) + .setValidate(true) + .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE) + .setCompressionType(CompressionType.NONE) + .build(); } /** Implementation of {@link #read}. */ @@ -214,40 +220,46 @@ protected Coder getDefaultOutputCoder() { ///////////////////////////////////////////////////////////////////////////// /** Implementation of {@link #write}. */ - public static class Write extends PTransform, PDone> { + @AutoValue + public abstract static class Write extends PTransform, PDone> { private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; /** The prefix of each file written, combined with suffix and shardTemplate. */ - private final ValueProvider filenamePrefix; + @Nullable + abstract ValueProvider getFilenamePrefix(); + /** The suffix of each file written, combined with prefix and shardTemplate. */ - private final String filenameSuffix; + abstract String getFilenameSuffix(); /** Requested number of shards. 0 for automatic. */ - private final int numShards; + abstract int getNumShards(); /** The shard template of each file written, combined with prefix and suffix. */ - private final String shardTemplate; + abstract String getShardTemplate(); /** An option to indicate if output validation is desired. Default is true. */ - private final boolean validate; + abstract boolean getValidate(); /** Option to indicate the output sink's compression type. Default is NONE. */ - private final TFRecordIO.CompressionType compressionType; + abstract CompressionType getCompressionType(); - private Write() { - this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, true, TFRecordIO.CompressionType.NONE); - } + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setFilenamePrefix(ValueProvider filenamePrefix); + + abstract Builder setFilenameSuffix(String filenameSuffix); + + abstract Builder setNumShards(int numShards); + + abstract Builder setShardTemplate(String shardTemplate); + + abstract Builder setValidate(boolean validate); + + abstract Builder setCompressionType(CompressionType compressionType); - private Write(String name, ValueProvider filenamePrefix, String filenameSuffix, - int numShards, String shardTemplate, boolean validate, - CompressionType compressionType) { - super(name); - this.filenamePrefix = filenamePrefix; - this.filenameSuffix = filenameSuffix; - this.numShards = numShards; - this.shardTemplate = shardTemplate; - this.validate = validate; - this.compressionType = compressionType; + abstract Write build(); } /** @@ -262,16 +274,14 @@ private Write(String name, ValueProvider filenamePrefix, String filename */ public Write to(String filenamePrefix) { validateOutputComponent(filenamePrefix); - return new Write(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, numShards, - shardTemplate, validate, compressionType); + return to(StaticValueProvider.of(filenamePrefix)); } /** * Like {@link #to(String)}, but with a {@link ValueProvider}. */ public Write to(ValueProvider filenamePrefix) { - return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate, - compressionType); + return toBuilder().setFilenamePrefix(filenamePrefix).build(); } /** @@ -281,8 +291,7 @@ public Write to(ValueProvider filenamePrefix) { */ public Write withSuffix(String nameExtension) { validateOutputComponent(nameExtension); - return new Write(name, filenamePrefix, nameExtension, numShards, shardTemplate, validate, - compressionType); + return toBuilder().setFilenameSuffix(nameExtension).build(); } /** @@ -298,8 +307,7 @@ public Write withSuffix(String nameExtension) { */ public Write withNumShards(int numShards) { checkArgument(numShards >= 0); - return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate, - compressionType); + return toBuilder().setNumShards(numShards).build(); } /** @@ -308,8 +316,7 @@ public Write withNumShards(int numShards) { * @see ShardNameTemplate */ public Write withShardNameTemplate(String shardTemplate) { - return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate, - compressionType); + return toBuilder().setShardTemplate(shardTemplate).build(); } /** @@ -323,8 +330,7 @@ public Write withShardNameTemplate(String shardTemplate) { * {@code .withNumShards(1).withShardNameTemplate("")} */ public Write withoutSharding() { - return new Write(name, filenamePrefix, filenameSuffix, 1, "", - validate, compressionType); + return withNumShards(1).withShardNameTemplate(""); } /** @@ -335,8 +341,7 @@ public Write withoutSharding() { * available at execution time. */ public Write withoutValidation() { - return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, false, - compressionType); + return toBuilder().setValidate(false).build(); } /** @@ -347,19 +352,22 @@ public Write withoutValidation() { * See {@link TFRecordIO.Read#withCompressionType} for more details. */ public Write withCompressionType(CompressionType compressionType) { - return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate, - compressionType); + return toBuilder().setCompressionType(compressionType).build(); } @Override public PDone expand(PCollection input) { - if (filenamePrefix == null) { + if (getFilenamePrefix() == null) { throw new IllegalStateException( "need to set the filename prefix of a TFRecordIO.Write transform"); } org.apache.beam.sdk.io.Write write = org.apache.beam.sdk.io.Write.to( - new TFRecordSink(filenamePrefix, filenameSuffix, shardTemplate, compressionType)); + new TFRecordSink( + getFilenamePrefix(), + getFilenameSuffix(), + getShardTemplate(), + getCompressionType())); if (getNumShards() > 0) { write = write.withNumShards(getNumShards()); } @@ -370,56 +378,29 @@ public PDone expand(PCollection input) { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - String prefixString = filenamePrefix.isAccessible() - ? filenamePrefix.get() : filenamePrefix.toString(); + String prefixString = getFilenamePrefix().isAccessible() + ? getFilenamePrefix().get() : getFilenamePrefix().toString(); builder .addIfNotNull(DisplayData.item("filePrefix", prefixString) .withLabel("Output File Prefix")) - .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix) + .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix()) .withLabel("Output File Suffix"), "") - .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate) + .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate()) .withLabel("Output Shard Name Template"), DEFAULT_SHARD_TEMPLATE) - .addIfNotDefault(DisplayData.item("validation", validate) + .addIfNotDefault(DisplayData.item("validation", getValidate()) .withLabel("Validation Enabled"), true) - .addIfNotDefault(DisplayData.item("numShards", numShards) + .addIfNotDefault(DisplayData.item("numShards", getNumShards()) .withLabel("Maximum Output Shards"), 0) .add(DisplayData - .item("compressionType", compressionType.toString()) + .item("compressionType", getCompressionType().toString()) .withLabel("Compression Type")); } - /** - * Returns the current shard name template string. - */ - public String getShardNameTemplate() { - return shardTemplate; - } - @Override protected Coder getDefaultOutputCoder() { return VoidCoder.of(); } - - public String getFilenamePrefix() { - return filenamePrefix.get(); - } - - public String getShardTemplate() { - return shardTemplate; - } - - public int getNumShards() { - return numShards; - } - - public String getFilenameSuffix() { - return filenameSuffix; - } - - public boolean needsValidation() { - return validate; - } } /** From db8a4f3996ed3c50f0dfd34add6c3fd66710a5b5 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Tue, 18 Apr 2017 14:03:13 -0700 Subject: [PATCH 5/5] Removes unused validation parameter --- .../org/apache/beam/sdk/io/TFRecordIO.java | 19 ------------------- .../apache/beam/sdk/io/TFRecordIOTest.java | 4 +--- 2 files changed, 1 insertion(+), 22 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index 2e01a80bc8be..748086dd6750 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -75,7 +75,6 @@ public static Write write() { return new AutoValue_TFRecordIO_Write.Builder() .setFilenameSuffix("") .setNumShards(0) - .setValidate(true) .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE) .setCompressionType(CompressionType.NONE) .build(); @@ -237,9 +236,6 @@ public abstract static class Write extends PTransform, PDone /** The shard template of each file written, combined with prefix and suffix. */ abstract String getShardTemplate(); - /** An option to indicate if output validation is desired. Default is true. */ - abstract boolean getValidate(); - /** Option to indicate the output sink's compression type. Default is NONE. */ abstract CompressionType getCompressionType(); @@ -255,8 +251,6 @@ abstract static class Builder { abstract Builder setShardTemplate(String shardTemplate); - abstract Builder setValidate(boolean validate); - abstract Builder setCompressionType(CompressionType compressionType); abstract Write build(); @@ -333,17 +327,6 @@ public Write withoutSharding() { return withNumShards(1).withShardNameTemplate(""); } - /** - * Disables GCS output path validation on pipeline creation. - * - *

This can be useful in the case where the GCS output location does - * not exist at the pipeline creation time, but is expected to be - * available at execution time. - */ - public Write withoutValidation() { - return toBuilder().setValidate(false).build(); - } - /** * Writes to output files using the specified compression type. * @@ -388,8 +371,6 @@ public void populateDisplayData(DisplayData.Builder builder) { .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate()) .withLabel("Output Shard Name Template"), DEFAULT_SHARD_TEMPLATE) - .addIfNotDefault(DisplayData.item("validation", getValidate()) - .withLabel("Validation Enabled"), true) .addIfNotDefault(DisplayData.item("numShards", getNumShards()) .withLabel("Maximum Output Shards"), 0) .add(DisplayData diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java index 9511c2a7315a..ae3a50da6e51 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java @@ -163,8 +163,7 @@ public void testWriteDisplayData() { .withSuffix("bar") .withShardNameTemplate("-SS-of-NN-") .withNumShards(100) - .withCompressionType(GZIP) - .withoutValidation(); + .withCompressionType(GZIP); DisplayData displayData = DisplayData.from(write); @@ -173,7 +172,6 @@ public void testWriteDisplayData() { assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-")); assertThat(displayData, hasDisplayItem("numShards", 100)); assertThat(displayData, hasDisplayItem("compressionType", GZIP.toString())); - assertThat(displayData, hasDisplayItem("validation", false)); } @Test