From df472abc6ee1b3c2ea021f6069beabd6a4439907 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Gajowy?= Date: Mon, 20 Nov 2017 17:00:54 +0100 Subject: [PATCH 1/2] [BEAM-3060] Add Compressed TextIOIT --- .../org/apache/beam/sdk/io/text/TextIOIT.java | 104 ++++++++++++++---- 1 file changed, 84 insertions(+), 20 deletions(-) diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java index ecab1d864971c..348174d75a267 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java @@ -17,8 +17,14 @@ */ package org.apache.beam.sdk.io.text; +import static org.apache.beam.sdk.io.Compression.AUTO; +import static org.apache.beam.sdk.io.Compression.BZIP2; +import static org.apache.beam.sdk.io.Compression.DEFLATE; +import static org.apache.beam.sdk.io.Compression.GZIP; + import com.google.common.base.Function; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import java.io.IOException; @@ -27,6 +33,7 @@ import java.util.Collections; import java.util.Date; import java.util.Map; +import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.TextIO; @@ -46,11 +53,14 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; + /** - * An integration test for {@link org.apache.beam.sdk.io.TextIO}. + * Integration tests for {@link org.apache.beam.sdk.io.TextIO}. * *

Run this test using the command below. Pass in connection information via PipelineOptions: *

@@ -60,15 +70,12 @@
  *  ]'
  * 
* */ -@RunWith(JUnit4.class) +@RunWith(Enclosed.class) public class TextIOIT { private static String filenamePrefix; private static Long numberOfTextLines; - @Rule - public TestPipeline pipeline = TestPipeline.create(); - @BeforeClass public static void setup() throws ParseException { PipelineOptionsFactory.register(IOTestPipelineOptions.class); @@ -83,25 +90,82 @@ private static String appendTimestamp(String filenamePrefix) { return String.format("%s_%s", filenamePrefix, new Date().getTime()); } - @Test - public void writeThenReadAll() { - PCollection testFilenames = pipeline - .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines)) - .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn())) - .apply("Write content to files", TextIO.write().to(filenamePrefix).withOutputFilenames()) - .getPerDestinationOutputFilenames().apply(Values.create()); + /** IO IT with no compression. */ + @RunWith(JUnit4.class) + public static class UncompressedTextIOIT { + + @Rule + public TestPipeline pipeline = TestPipeline.create(); + + @Test + public void writeThenReadAll() { + PCollection testFilenames = pipeline + .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines)) + .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn())) + .apply("Write content to files", TextIO.write().to(filenamePrefix).withOutputFilenames()) + .getPerDestinationOutputFilenames().apply(Values.create()); + + PCollection consolidatedHashcode = testFilenames + .apply("Read all files", TextIO.readAll()) + .apply("Calculate hashcode", Combine.globally(new HashingFn())); + + String expectedHash = getExpectedHashForLineCount(numberOfTextLines); + PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash); + + testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn()) + .withSideInputs(consolidatedHashcode.apply(View.asSingleton()))); + + pipeline.run().waitUntilFinish(); + } + } + + /** IO IT with various compression types. */ + @RunWith(Parameterized.class) + public static class CompressedTextIOIT { + + @Rule + public TestPipeline pipeline = TestPipeline.create(); + + @Parameterized.Parameters() + public static Iterable data() { + return ImmutableList.builder() + .add(GZIP) + .add(DEFLATE) + .add(BZIP2) + .build(); + } + + @Parameterized.Parameter() + public Compression compression; + + @Test + public void writeThenReadAllWithCompression() { + TextIO.TypedWrite write = TextIO + .write() + .to(filenamePrefix) + .withOutputFilenames() + .withCompression(compression); + + TextIO.ReadAll read = TextIO.readAll().withCompression(AUTO); - PCollection consolidatedHashcode = testFilenames - .apply("Read all files", TextIO.readAll()) - .apply("Calculate hashcode", Combine.globally(new HashingFn())); + PCollection testFilenames = pipeline + .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines)) + .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn())) + .apply("Write content to files", write) + .getPerDestinationOutputFilenames().apply(Values.create()); - String expectedHash = getExpectedHashForLineCount(numberOfTextLines); - PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash); + PCollection consolidatedHashcode = testFilenames + .apply("Read all files", read) + .apply("Calculate hashcode", Combine.globally(new HashingFn())); - testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn()) - .withSideInputs(consolidatedHashcode.apply(View.asSingleton()))); + String expectedHash = getExpectedHashForLineCount(numberOfTextLines); + PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash); - pipeline.run().waitUntilFinish(); + testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn()) + .withSideInputs(consolidatedHashcode.apply(View.asSingleton()))); + + pipeline.run().waitUntilFinish(); + } } private static String getExpectedHashForLineCount(Long lineCount) { From cfc2873e7c4108918e5fd8fccd2b34ee677d6ec7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Gajowy?= Date: Fri, 24 Nov 2017 17:18:38 +0100 Subject: [PATCH 2/2] add post-review changes --- .../sdk/io/common/IOTestPipelineOptions.java | 6 + .../org/apache/beam/sdk/io/text/TextIOIT.java | 128 +++++++----------- 2 files changed, 53 insertions(+), 81 deletions(-) diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java index 91b3aa6d3446d..5a29d4f8126e2 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java @@ -100,4 +100,10 @@ public interface IOTestPipelineOptions extends TestPipelineOptions { String getFilenamePrefix(); void setFilenamePrefix(String prefix); + + @Description("File compression type for writing and reading test files") + @Default.String("UNCOMPRESSED") + String getCompressionType(); + + void setCompressionType(String compressionType); } diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java index 348174d75a267..1b9b385a1ff51 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java @@ -15,24 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.beam.sdk.io.text; import static org.apache.beam.sdk.io.Compression.AUTO; -import static org.apache.beam.sdk.io.Compression.BZIP2; -import static org.apache.beam.sdk.io.Compression.DEFLATE; -import static org.apache.beam.sdk.io.Compression.GZIP; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; + import java.io.IOException; import java.text.ParseException; import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.Map; + import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.GenerateSequence; @@ -53,28 +52,32 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.junit.runners.Parameterized; - /** * Integration tests for {@link org.apache.beam.sdk.io.TextIO}. * *

Run this test using the command below. Pass in connection information via PipelineOptions: *

- *  mvn -e -Pio-it verify -pl sdks/java/io/text -DintegrationTestPipelineOptions='[
+ *  mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests
+ *  -Dit.test=org.apache.beam.sdk.io.text.TextIOIT
+ *  -DintegrationTestPipelineOptions='[
  *  "--numberOfRecords=100000",
  *  "--filenamePrefix=TEXTIOIT"
+ *  "--compressionType=GZIP"
  *  ]'
  * 
* */ -@RunWith(Enclosed.class) +@RunWith(JUnit4.class) public class TextIOIT { private static String filenamePrefix; private static Long numberOfTextLines; + private static Compression compressionType; + + @Rule + public TestPipeline pipeline = TestPipeline.create(); @BeforeClass public static void setup() throws ParseException { @@ -84,94 +87,54 @@ public static void setup() throws ParseException { numberOfTextLines = options.getNumberOfRecords(); filenamePrefix = appendTimestamp(options.getFilenamePrefix()); + compressionType = parseCompressionType(options.getCompressionType()); } - private static String appendTimestamp(String filenamePrefix) { - return String.format("%s_%s", filenamePrefix, new Date().getTime()); - } - - /** IO IT with no compression. */ - @RunWith(JUnit4.class) - public static class UncompressedTextIOIT { - - @Rule - public TestPipeline pipeline = TestPipeline.create(); - - @Test - public void writeThenReadAll() { - PCollection testFilenames = pipeline - .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines)) - .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn())) - .apply("Write content to files", TextIO.write().to(filenamePrefix).withOutputFilenames()) - .getPerDestinationOutputFilenames().apply(Values.create()); - - PCollection consolidatedHashcode = testFilenames - .apply("Read all files", TextIO.readAll()) - .apply("Calculate hashcode", Combine.globally(new HashingFn())); - - String expectedHash = getExpectedHashForLineCount(numberOfTextLines); - PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash); - - testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn()) - .withSideInputs(consolidatedHashcode.apply(View.asSingleton()))); - - pipeline.run().waitUntilFinish(); + private static Compression parseCompressionType(String compressionType) { + try { + return Compression.valueOf(compressionType.toUpperCase()); + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException( + String.format("Unsupported compression type: %s", compressionType)); } } - /** IO IT with various compression types. */ - @RunWith(Parameterized.class) - public static class CompressedTextIOIT { - - @Rule - public TestPipeline pipeline = TestPipeline.create(); - - @Parameterized.Parameters() - public static Iterable data() { - return ImmutableList.builder() - .add(GZIP) - .add(DEFLATE) - .add(BZIP2) - .build(); - } - - @Parameterized.Parameter() - public Compression compression; - - @Test - public void writeThenReadAllWithCompression() { - TextIO.TypedWrite write = TextIO - .write() - .to(filenamePrefix) - .withOutputFilenames() - .withCompression(compression); + private static String appendTimestamp(String filenamePrefix) { + return String.format("%s_%s", filenamePrefix, new Date().getTime()); + } - TextIO.ReadAll read = TextIO.readAll().withCompression(AUTO); + @Test + public void writeThenReadAll() { + TextIO.TypedWrite write = TextIO + .write() + .to(filenamePrefix) + .withOutputFilenames() + .withCompression(compressionType); - PCollection testFilenames = pipeline - .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines)) - .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn())) - .apply("Write content to files", write) - .getPerDestinationOutputFilenames().apply(Values.create()); + PCollection testFilenames = pipeline + .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines)) + .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn())) + .apply("Write content to files", write) + .getPerDestinationOutputFilenames().apply(Values.create()); - PCollection consolidatedHashcode = testFilenames - .apply("Read all files", read) - .apply("Calculate hashcode", Combine.globally(new HashingFn())); + PCollection consolidatedHashcode = testFilenames + .apply("Read all files", TextIO.readAll().withCompression(AUTO)) + .apply("Calculate hashcode", Combine.globally(new HashingFn())); - String expectedHash = getExpectedHashForLineCount(numberOfTextLines); - PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash); + String expectedHash = getExpectedHashForLineCount(numberOfTextLines); + PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash); - testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn()) - .withSideInputs(consolidatedHashcode.apply(View.asSingleton()))); + testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn()) + .withSideInputs(consolidatedHashcode.apply(View.asSingleton()))); - pipeline.run().waitUntilFinish(); - } + pipeline.run().waitUntilFinish(); } private static String getExpectedHashForLineCount(Long lineCount) { Map expectedHashes = ImmutableMap.of( 100_000L, "4c8bb3b99dcc59459b20fefba400d446", - 1_000_000L, "9796db06e7a7960f974d5a91164afff1" + 1_000_000L, "9796db06e7a7960f974d5a91164afff1", + 100_000_000L, "6ce05f456e2fdc846ded2abd0ec1de95" ); String hash = expectedHashes.get(lineCount); @@ -183,6 +146,7 @@ private static String getExpectedHashForLineCount(Long lineCount) { } private static class DeterministicallyConstructTestTextLineFn extends DoFn { + @ProcessElement public void processElement(ProcessContext c) { c.output(String.format("IO IT Test line of text. Line seed: %s", c.element())); @@ -190,6 +154,7 @@ public void processElement(ProcessContext c) { } private static class DeleteFileFn extends DoFn { + @ProcessElement public void processElement(ProcessContext c) throws IOException { MatchResult match = Iterables @@ -200,6 +165,7 @@ public void processElement(ProcessContext c) throws IOException { private Collection toResourceIds(MatchResult match) throws IOException { return FluentIterable.from(match.metadata()) .transform(new Function() { + @Override public ResourceId apply(MatchResult.Metadata metadata) { return metadata.resourceId();