From 43ef4d42d7d224b1997278832ec645bccb945792 Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Thu, 5 Oct 2017 11:45:12 +0200 Subject: [PATCH 1/3] [BEAM-3019] Make AvroIOWriteTransformTest more generic make runTestWrite() more generic to be able to use GenericRecord[] as input for writeGenericRecords test in place of AvroGeneratedUser make readAvroFile() generic to be able to read GenericRecords using GenericDatumReader for writeGenericRecords test --- .../beam/sdk/io/AvroIOTransformTest.java | 95 ++++++++++++------- 1 file changed, 59 insertions(+), 36 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index b4f7a7918176..bc0387184213 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -33,6 +33,7 @@ import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; @@ -102,28 +103,28 @@ private static AvroGeneratedUser[] generateAvroObjects() { return new AvroGeneratedUser[] { user1, user2, user3 }; } + private static GenericRecord[] generateAvroGenericRecords() { + final GenericRecord user1 = new GenericData.Record(SCHEMA); + user1.put("name", "Bob"); + user1.put("favorite_number", 256); + + final GenericRecord user2 = new GenericData.Record(SCHEMA); + user2.put("name", "Alice"); + user2.put("favorite_number", 128); + + final GenericRecord user3 = new GenericData.Record(SCHEMA); + user3.put("name", "Ted"); + user3.put("favorite_color", "white"); + + return new GenericRecord[] { user1, user2, user3 }; + } + /** * Tests for AvroIO Read transforms, using classes generated from {@code user.avsc}. */ @RunWith(Parameterized.class) public static class AvroIOReadTransformTest extends AvroIOTransformTest { - private static GenericRecord[] generateAvroGenericRecords() { - final GenericRecord user1 = new GenericData.Record(SCHEMA); - user1.put("name", "Bob"); - user1.put("favorite_number", 256); - - final GenericRecord user2 = new GenericData.Record(SCHEMA); - user2.put("name", "Alice"); - user2.put("favorite_number", 128); - - final GenericRecord user3 = new GenericData.Record(SCHEMA); - user3.put("name", "Ted"); - user3.put("favorite_color", "white"); - - return new GenericRecord[] { user1, user2, user3 }; - } - private void generateAvroFile(final AvroGeneratedUser[] elements, final File avroFile) throws IOException { final DatumWriter userDatumWriter = @@ -247,41 +248,59 @@ public static class AvroIOWriteTransformTest extends AvroIOTransformTest { private static final String WRITE_TRANSFORM_NAME = "AvroIO.Write"; - private List readAvroFile(final File avroFile) throws IOException { - final DatumReader userDatumReader = - new SpecificDatumReader<>(AvroGeneratedUser.class); - final List users = new ArrayList<>(); - try (DataFileReader dataFileReader = - new DataFileReader<>(avroFile, userDatumReader)) { - while (dataFileReader.hasNext()) { - users.add(dataFileReader.next()); + private List readAvroFile(final File avroFile, boolean generic) throws IOException { + if (!generic) { + final DatumReader datumReader = new SpecificDatumReader<>( + AvroGeneratedUser.class); + final List output = new ArrayList<>(); + try (DataFileReader dataFileReader = new DataFileReader<>(avroFile, + datumReader)) { + while (dataFileReader.hasNext()) { + output.add(dataFileReader.next()); + } } + return (List) output; + } else { + final DatumReader datumReader = new GenericDatumReader<>(SCHEMA); + final List genericRecords = new ArrayList<>(); + try (DataFileReader dataFileReader = new DataFileReader<>(avroFile, + datumReader)) { + while (dataFileReader.hasNext()) { + genericRecords.add(dataFileReader.next()); + } + } + return (List) genericRecords; } - return users; } @Parameterized.Parameters(name = "{0}_with_{1}") public static Iterable data() throws IOException { final String generatedClass = "GeneratedClass"; - final String fromSchema = "SchemaObject"; - final String fromSchemaString = "SchemaString"; + final String genericRecordsfromSchema = "GenericRecordsFromSchemaObject"; + final String genericRecordsfromSchemaString = "GenericRecordsFromSchemaString"; return ImmutableList.builder() .add( new Object[] { AvroIO.write(AvroGeneratedUser.class), - generatedClass + generatedClass, + AvroIOTransformTest.generateAvroObjects(), + false }, new Object[] { AvroIO.writeGenericRecords(SCHEMA), - fromSchema + genericRecordsfromSchema, + AvroIOTransformTest.generateAvroGenericRecords(), + true }, new Object[] { AvroIO.writeGenericRecords(SCHEMA_STRING), - fromSchemaString + genericRecordsfromSchemaString, + AvroIOTransformTest.generateAvroGenericRecords(), + true }) .build(); } @@ -293,29 +312,33 @@ public static Iterable data() throws IOException { @Parameterized.Parameter(1) public String testAlias; - private void runTestWrite(final AvroIO.Write writeBuilder) - throws Exception { + @Parameterized.Parameter(2) + public Object[] inputData; + + @Parameterized.Parameter(3) + public Boolean generic; + private void runTestWrite(final AvroIO.Write writeBuilder, T[] inputData, + boolean generic) throws Exception { final File avroFile = tmpFolder.newFile("file.avro"); - final AvroGeneratedUser[] users = generateAvroObjects(); final AvroIO.Write write = writeBuilder.to(avroFile.getPath()); @SuppressWarnings("unchecked") final PCollection input = - pipeline.apply(Create.of(Arrays.asList((T[]) users)) + pipeline.apply(Create.of(Arrays.asList(inputData)) .withCoder((Coder) AvroCoder.of(AvroGeneratedUser.class))); input.apply(write.withoutSharding()); pipeline.run(); assertEquals(WRITE_TRANSFORM_NAME, write.getName()); - assertThat(readAvroFile(avroFile), containsInAnyOrder(users)); + assertThat(readAvroFile(avroFile, generic), containsInAnyOrder((Object[]) inputData)); } @Test @Category(NeedsRunner.class) public void testWrite() throws Exception { - runTestWrite(writeTransform); + runTestWrite(writeTransform, inputData, generic); } // TODO: for Write only, test withSuffix, withNumShards, From 667f3512b200e4affa3a7e691ef78d940904428b Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Fri, 6 Oct 2017 14:54:09 +0200 Subject: [PATCH 2/3] Flip condition --- .../beam/sdk/io/AvroIOTransformTest.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index bc0387184213..dfec5e630a6a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -249,7 +249,17 @@ public static class AvroIOWriteTransformTest extends AvroIOTransformTest { private static final String WRITE_TRANSFORM_NAME = "AvroIO.Write"; private List readAvroFile(final File avroFile, boolean generic) throws IOException { - if (!generic) { + if (generic) { + final DatumReader datumReader = new GenericDatumReader<>(SCHEMA); + final List genericRecords = new ArrayList<>(); + try (DataFileReader dataFileReader = new DataFileReader<>(avroFile, + datumReader)) { + while (dataFileReader.hasNext()) { + genericRecords.add(dataFileReader.next()); + } + } + return (List) genericRecords; + } else { final DatumReader datumReader = new SpecificDatumReader<>( AvroGeneratedUser.class); final List output = new ArrayList<>(); @@ -260,16 +270,6 @@ private List readAvroFile(final File avroFile, boolean generic) throws IO } } return (List) output; - } else { - final DatumReader datumReader = new GenericDatumReader<>(SCHEMA); - final List genericRecords = new ArrayList<>(); - try (DataFileReader dataFileReader = new DataFileReader<>(avroFile, - datumReader)) { - while (dataFileReader.hasNext()) { - genericRecords.add(dataFileReader.next()); - } - } - return (List) genericRecords; } } From f2bfcece00c92d1edfa34023c52f98113912eda2 Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Mon, 16 Oct 2017 11:08:01 +0200 Subject: [PATCH 3/3] Deduplication --- .../beam/sdk/io/AvroIOTransformTest.java | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index dfec5e630a6a..9a4372a2f60a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -251,26 +251,23 @@ public static class AvroIOWriteTransformTest extends AvroIOTransformTest { private List readAvroFile(final File avroFile, boolean generic) throws IOException { if (generic) { final DatumReader datumReader = new GenericDatumReader<>(SCHEMA); - final List genericRecords = new ArrayList<>(); - try (DataFileReader dataFileReader = new DataFileReader<>(avroFile, - datumReader)) { - while (dataFileReader.hasNext()) { - genericRecords.add(dataFileReader.next()); - } - } - return (List) genericRecords; + return (List) fileToList(avroFile, datumReader); } else { final DatumReader datumReader = new SpecificDatumReader<>( AvroGeneratedUser.class); - final List output = new ArrayList<>(); - try (DataFileReader dataFileReader = new DataFileReader<>(avroFile, - datumReader)) { - while (dataFileReader.hasNext()) { - output.add(dataFileReader.next()); - } + return (List) fileToList(avroFile, datumReader); + } + } + + private List fileToList(File avroFile, DatumReader datumReader) + throws IOException { + final List output = new ArrayList<>(); + try (DataFileReader dataFileReader = new DataFileReader<>(avroFile, datumReader)) { + while (dataFileReader.hasNext()) { + output.add(dataFileReader.next()); } - return (List) output; } + return output; } @Parameterized.Parameters(name = "{0}_with_{1}")