From 43ef4d42d7d224b1997278832ec645bccb945792 Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Thu, 5 Oct 2017 11:45:12 +0200 Subject: [PATCH 1/7] [BEAM-3019] Make AvroIOWriteTransformTest more generic make runTestWrite() more generic to be able to use GenericRecord[] as input for writeGenericRecords test in place of AvroGeneratedUser make readAvroFile() generic to be able to read GenericRecords using GenericDatumReader for writeGenericRecords test --- .../beam/sdk/io/AvroIOTransformTest.java | 95 ++++++++++++------- 1 file changed, 59 insertions(+), 36 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index b4f7a7918176..bc0387184213 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -33,6 +33,7 @@ import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; @@ -102,28 +103,28 @@ private static AvroGeneratedUser[] generateAvroObjects() { return new AvroGeneratedUser[] { user1, user2, user3 }; } + private static GenericRecord[] generateAvroGenericRecords() { + final GenericRecord user1 = new GenericData.Record(SCHEMA); + user1.put("name", "Bob"); + user1.put("favorite_number", 256); + + final GenericRecord user2 = new GenericData.Record(SCHEMA); + user2.put("name", "Alice"); + user2.put("favorite_number", 128); + + final GenericRecord user3 = new GenericData.Record(SCHEMA); + user3.put("name", "Ted"); + user3.put("favorite_color", "white"); + + return new GenericRecord[] { user1, user2, user3 }; + } + /** * Tests for AvroIO Read transforms, using classes generated from {@code user.avsc}. */ @RunWith(Parameterized.class) public static class AvroIOReadTransformTest extends AvroIOTransformTest { - private static GenericRecord[] generateAvroGenericRecords() { - final GenericRecord user1 = new GenericData.Record(SCHEMA); - user1.put("name", "Bob"); - user1.put("favorite_number", 256); - - final GenericRecord user2 = new GenericData.Record(SCHEMA); - user2.put("name", "Alice"); - user2.put("favorite_number", 128); - - final GenericRecord user3 = new GenericData.Record(SCHEMA); - user3.put("name", "Ted"); - user3.put("favorite_color", "white"); - - return new GenericRecord[] { user1, user2, user3 }; - } - private void generateAvroFile(final AvroGeneratedUser[] elements, final File avroFile) throws IOException { final DatumWriter userDatumWriter = @@ -247,41 +248,59 @@ public static class AvroIOWriteTransformTest extends AvroIOTransformTest { private static final String WRITE_TRANSFORM_NAME = "AvroIO.Write"; - private List readAvroFile(final File avroFile) throws IOException { - final DatumReader userDatumReader = - new SpecificDatumReader<>(AvroGeneratedUser.class); - final List users = new ArrayList<>(); - try (DataFileReader dataFileReader = - new DataFileReader<>(avroFile, userDatumReader)) { - while (dataFileReader.hasNext()) { - users.add(dataFileReader.next()); + private List readAvroFile(final File avroFile, boolean generic) throws IOException { + if (!generic) { + final DatumReader datumReader = new SpecificDatumReader<>( + AvroGeneratedUser.class); + final List output = new ArrayList<>(); + try (DataFileReader dataFileReader = new DataFileReader<>(avroFile, + datumReader)) { + while (dataFileReader.hasNext()) { + output.add(dataFileReader.next()); + } } + return (List) output; + } else { + final DatumReader datumReader = new GenericDatumReader<>(SCHEMA); + final List genericRecords = new ArrayList<>(); + try (DataFileReader dataFileReader = new DataFileReader<>(avroFile, + datumReader)) { + while (dataFileReader.hasNext()) { + genericRecords.add(dataFileReader.next()); + } + } + return (List) genericRecords; } - return users; } @Parameterized.Parameters(name = "{0}_with_{1}") public static Iterable data() throws IOException { final String generatedClass = "GeneratedClass"; - final String fromSchema = "SchemaObject"; - final String fromSchemaString = "SchemaString"; + final String genericRecordsfromSchema = "GenericRecordsFromSchemaObject"; + final String genericRecordsfromSchemaString = "GenericRecordsFromSchemaString"; return ImmutableList.builder() .add( new Object[] { AvroIO.write(AvroGeneratedUser.class), - generatedClass + generatedClass, + AvroIOTransformTest.generateAvroObjects(), + false }, new Object[] { AvroIO.writeGenericRecords(SCHEMA), - fromSchema + genericRecordsfromSchema, + AvroIOTransformTest.generateAvroGenericRecords(), + true }, new Object[] { AvroIO.writeGenericRecords(SCHEMA_STRING), - fromSchemaString + genericRecordsfromSchemaString, + AvroIOTransformTest.generateAvroGenericRecords(), + true }) .build(); } @@ -293,29 +312,33 @@ public static Iterable data() throws IOException { @Parameterized.Parameter(1) public String testAlias; - private void runTestWrite(final AvroIO.Write writeBuilder) - throws Exception { + @Parameterized.Parameter(2) + public Object[] inputData; + + @Parameterized.Parameter(3) + public Boolean generic; + private void runTestWrite(final AvroIO.Write writeBuilder, T[] inputData, + boolean generic) throws Exception { final File avroFile = tmpFolder.newFile("file.avro"); - final AvroGeneratedUser[] users = generateAvroObjects(); final AvroIO.Write write = writeBuilder.to(avroFile.getPath()); @SuppressWarnings("unchecked") final PCollection input = - pipeline.apply(Create.of(Arrays.asList((T[]) users)) + pipeline.apply(Create.of(Arrays.asList(inputData)) .withCoder((Coder) AvroCoder.of(AvroGeneratedUser.class))); input.apply(write.withoutSharding()); pipeline.run(); assertEquals(WRITE_TRANSFORM_NAME, write.getName()); - assertThat(readAvroFile(avroFile), containsInAnyOrder(users)); + assertThat(readAvroFile(avroFile, generic), containsInAnyOrder((Object[]) inputData)); } @Test @Category(NeedsRunner.class) public void testWrite() throws Exception { - runTestWrite(writeTransform); + runTestWrite(writeTransform, inputData, generic); } // TODO: for Write only, test withSuffix, withNumShards, From 84074e36085d76f569c89d4a29a647fc40b22531 Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Mon, 2 Oct 2017 17:08:55 +0200 Subject: [PATCH 2/7] [BEAM-2993] AvroIO.write without specifying a schema Lazy init (at first write) of the dataFileWriter once we have the value to get the schema from. Make schema optional in ConstantAvroDestination and depreciate write methods that take schema as parameter Cleaning --- .../java/org/apache/beam/sdk/io/AvroIO.java | 40 +++++++++++++----- .../java/org/apache/beam/sdk/io/AvroSink.java | 42 +++++++++++++++---- .../beam/sdk/io/ConstantAvroDestination.java | 17 ++++++-- .../beam/sdk/io/AvroIOTransformTest.java | 18 ++++++++ 4 files changed, 96 insertions(+), 21 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index e2ab9803f421..d42f8ca384fa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -181,23 +181,19 @@ * *

Writing specific or generic records

* - *

To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write - * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes - * a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema in a - * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified - * schema. + *

To write specific records, such as Avro-generated classes, use {@link #write()}. To write + * {@link GenericRecord GenericRecords}, use {@link #writeGenericRecords()} * *

For example: * *

{@code
  * // A simple Write to a local file (only runs locally):
  * PCollection records = ...;
- * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
+ * records.apply(AvroIO.write().to("/path/to/file.avro"));
  *
  * // A Write to a sharded GCS file (runs locally and using remote execution):
- * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
  * PCollection records = ...;
- * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
+ * records.apply("WriteToAvro", AvroIO.writeGenericRecords()
  *     .to("gs://my_bucket/path/to/numbers")
  *     .withSuffix(".avro"));
  * }
@@ -355,6 +351,7 @@ public static ParseAll parseAllGenericRecords( * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding * pattern). */ + @Deprecated public static Write write(Class recordClass) { return new Write<>( AvroIO.defaultWriteBuilder() @@ -363,6 +360,18 @@ public static Write write(Class recordClass) { .build()); } + /** + * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding + * pattern). + */ + public static Write write() { + return new Write<>( + AvroIO.defaultWriteBuilder() + .setGenericRecords(false) + .build()); + } + + @Deprecated /** Writes Avro records of the specified schema. */ public static Write writeGenericRecords(Schema schema) { return new Write<>( @@ -372,6 +381,14 @@ public static Write writeGenericRecords(Schema schema) { .build()); } + /** Writes Avro records determining the schema from the input collection. */ + public static Write writeGenericRecords() { + return new Write<>( + AvroIO.defaultWriteBuilder() + .setGenericRecords(true) + .build()); + } + /** * A {@link PTransform} that writes a {@link PCollection} to an avro file (or multiple avro files * matching a sharding pattern), with each element of the input collection encoded into its own @@ -397,9 +414,8 @@ public static TypedWrite writeCustomType( /** * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is - * {@link GenericRecord}. A schema must be specified either in {@link - * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, by using {@link - * TypedWrite#withSchema(Schema)}. + * {@link GenericRecord}. A schema can be specified in {@link DynamicAvroDestinations#getSchema}. + * If it is not specified then, it will be determined out of the input elements */ public static TypedWrite writeCustomTypeToGenericRecords() { return AvroIO.defaultWriteBuilder().setGenericRecords(true).build(); @@ -408,6 +424,7 @@ public static TypedWrite writeCustomTypeToGe /** * Writes Avro records of the specified schema. The schema is specified as a JSON-encoded string. */ + @Deprecated public static Write writeGenericRecords(String schema) { return writeGenericRecords(new Schema.Parser().parse(schema)); } @@ -1003,6 +1020,7 @@ public TypedWrite to( * Sets the the output schema. Can only be used when the output type is {@link GenericRecord} * and when not using {@link #to(DynamicAvroDestinations)}. */ + @Deprecated public TypedWrite withSchema(Schema schema) { return toBuilder().setSchema(schema).build(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java index 888db856b782..b012e4aaa8aa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java @@ -24,7 +24,9 @@ import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; @@ -78,6 +80,10 @@ private static class AvroWriter extends Writer dataFileWriter; private final DynamicAvroDestinations dynamicDestinations; private final boolean genericRecords; + private WritableByteChannel channel; + private DatumWriter datumWriter; + private boolean dataFileWriterAlreadyLazyInitialized; + private Schema schema; public AvroWriter( WriteOperation writeOperation, @@ -93,13 +99,17 @@ public AvroWriter( protected void prepareWrite(WritableByteChannel channel) throws Exception { DestinationT destination = getDestination(); CodecFactory codec = dynamicDestinations.getCodec(destination); - Schema schema = dynamicDestinations.getSchema(destination); + schema = dynamicDestinations.getSchema(destination); Map metadata = dynamicDestinations.getMetadata(destination); + this.channel = channel; - DatumWriter datumWriter = - genericRecords - ? new GenericDatumWriter(schema) - : new ReflectDatumWriter(schema); + + if (schema != null) { + datumWriter = genericRecords ? new GenericDatumWriter(schema) : new ReflectDatumWriter( + schema); + } else { // lazy init + datumWriter = genericRecords ? new GenericDatumWriter() : new ReflectDatumWriter(); + } dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec); for (Map.Entry entry : metadata.entrySet()) { Object v = entry.getValue(); @@ -115,17 +125,35 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception { + v.getClass().getSimpleName()); } } - dataFileWriter.create(schema, Channels.newOutputStream(channel)); + if (schema != null){ //else lazy init + dataFileWriter.create(schema, Channels.newOutputStream(channel)); + } } @Override public void write(OutputT value) throws Exception { + // lazy init dataFileWriter at first write once we have the value to get the schema from + // if the schema was unspecified. Take care to init only once. + if (schema == null && !dataFileWriterAlreadyLazyInitialized) { + //TODO if empty bundle and unspecified schema, + // write will not be called and dataFileWriter will not be initialized + lazyInitDataFileWriter(value); + dataFileWriterAlreadyLazyInitialized = true; + } dataFileWriter.append(value); } + private void lazyInitDataFileWriter(OutputT value) throws java.io.IOException { + schema = genericRecords ? + ((GenericRecord) value).getSchema() : + ReflectData.get().getSchema(value.getClass()); + datumWriter.setSchema(schema); + dataFileWriter.create(schema, Channels.newOutputStream(channel)); + } + @Override protected void finishWrite() throws Exception { - dataFileWriter.flush(); + dataFileWriter.flush(); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java index b006e26c7831..bd8fc5285808 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java @@ -46,6 +46,7 @@ public Schema apply(@Nullable String input) { // This should be a multiple of 4 to not get a partial encoded byte. private static final int METADATA_BYTES_MAX_LENGTH = 40; private final FilenamePolicy filenamePolicy; + @Nullable private final Supplier schema; private final Map metadata; private final SerializableAvroCodecFactory codec; @@ -77,7 +78,11 @@ public ConstantAvroDestination( CodecFactory codec, SerializableFunction formatFunction) { this.filenamePolicy = filenamePolicy; - this.schema = Suppliers.compose(new SchemaFunction(), Suppliers.ofInstance(schema.toString())); + if (schema != null) { + this.schema = Suppliers.compose(new SchemaFunction(), Suppliers.ofInstance(schema.toString())); + } else { + this.schema = null; + } this.metadata = metadata; this.codec = new SerializableAvroCodecFactory(codec); this.formatFunction = formatFunction; @@ -105,7 +110,11 @@ public FilenamePolicy getFilenamePolicy(Void destination) { @Override public Schema getSchema(Void destination) { - return schema.get(); + if (schema != null) { + return schema.get(); + } else { + return null; + } } @Override @@ -121,7 +130,9 @@ public CodecFactory getCodec(Void destination) { @Override public void populateDisplayData(DisplayData.Builder builder) { filenamePolicy.populateDisplayData(builder); - builder.add(DisplayData.item("schema", schema.get().toString()).withLabel("Record Schema")); + if (schema != null) { + builder.add(DisplayData.item("schema", schema.get().toString()).withLabel("Record Schema")); + } builder.addIfNotDefault( DisplayData.item("codec", codec.getCodec().toString()).withLabel("Avro Compression Codec"), AvroIO.TypedWrite.DEFAULT_SERIALIZABLE_CODEC.toString()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index bc0387184213..41bf5b434921 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -41,11 +41,18 @@ import org.apache.avro.specific.SpecificDatumWriter; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sample; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -273,6 +280,17 @@ private List readAvroFile(final File avroFile, boolean generic) throws IO } } + private List readAvroFileWithGenericRecords(final File avroFile) throws IOException { + final DatumReader datumReader = new GenericDatumReader<>(SCHEMA); + final List genericRecords = new ArrayList<>(); + try (DataFileReader dataFileReader = new DataFileReader<>(avroFile, datumReader)) { + while (dataFileReader.hasNext()) { + genericRecords.add(dataFileReader.next()); + } + } + return genericRecords; + } + @Parameterized.Parameters(name = "{0}_with_{1}") public static Iterable data() throws IOException { From d19c2cb3538e5981e8138522d0c2138b455dec46 Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Thu, 5 Oct 2017 14:08:04 +0200 Subject: [PATCH 3/7] Add tests of the schema less write methods Cleaning --- .../java/org/apache/beam/sdk/io/AvroIO.java | 12 +++++-- .../java/org/apache/beam/sdk/io/AvroSink.java | 13 +++++--- .../beam/sdk/io/ConstantAvroDestination.java | 3 +- .../beam/sdk/io/AvroIOTransformTest.java | 31 +++++++------------ 4 files changed, 32 insertions(+), 27 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index d42f8ca384fa..19a60928df20 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -350,6 +350,7 @@ public static ParseAll parseAllGenericRecords( /** * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding * pattern). + * @deprecated Use {@link #write()} that will determine the schema out of the elements */ @Deprecated public static Write write(Class recordClass) { @@ -371,8 +372,11 @@ public static Write write() { .build()); } + /** Writes Avro records of the specified schema. + * @deprecated Use {@link #writeGenericRecords()} that will determine the schema + * out of the elements + * */ @Deprecated - /** Writes Avro records of the specified schema. */ public static Write writeGenericRecords(Schema schema) { return new Write<>( AvroIO.defaultWriteBuilder() @@ -423,7 +427,10 @@ public static TypedWrite writeCustomTypeToGe /** * Writes Avro records of the specified schema. The schema is specified as a JSON-encoded string. - */ + * @deprecated Use {@link #writeGenericRecords()} that will determine the schema + * out of the elements + * */ + @Deprecated public static Write writeGenericRecords(String schema) { return writeGenericRecords(new Schema.Parser().parse(schema)); @@ -1019,6 +1026,7 @@ public TypedWrite to( /** * Sets the the output schema. Can only be used when the output type is {@link GenericRecord} * and when not using {@link #to(DynamicAvroDestinations)}. + * @deprecated the schema can be determined out of the elements. See {@link #write()} */ @Deprecated public TypedWrite withSchema(Schema schema) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java index b012e4aaa8aa..bcfb39645863 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java @@ -105,10 +105,13 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception { if (schema != null) { - datumWriter = genericRecords ? new GenericDatumWriter(schema) : new ReflectDatumWriter( - schema); + datumWriter = genericRecords + ? new GenericDatumWriter(schema) : + new ReflectDatumWriter(schema); } else { // lazy init - datumWriter = genericRecords ? new GenericDatumWriter() : new ReflectDatumWriter(); + datumWriter = genericRecords + ? new GenericDatumWriter() : + new ReflectDatumWriter(); } dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec); for (Map.Entry entry : metadata.entrySet()) { @@ -144,8 +147,8 @@ public void write(OutputT value) throws Exception { } private void lazyInitDataFileWriter(OutputT value) throws java.io.IOException { - schema = genericRecords ? - ((GenericRecord) value).getSchema() : + schema = genericRecords + ? ((GenericRecord) value).getSchema() : ReflectData.get().getSchema(value.getClass()); datumWriter.setSchema(schema); dataFileWriter.create(schema, Channels.newOutputStream(channel)); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java index bd8fc5285808..b08eddff68b8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java @@ -79,7 +79,8 @@ public ConstantAvroDestination( SerializableFunction formatFunction) { this.filenamePolicy = filenamePolicy; if (schema != null) { - this.schema = Suppliers.compose(new SchemaFunction(), Suppliers.ofInstance(schema.toString())); + this.schema = Suppliers + .compose(new SchemaFunction(), Suppliers.ofInstance(schema.toString())); } else { this.schema = null; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index 41bf5b434921..bc0470767cda 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -41,18 +41,11 @@ import org.apache.avro.specific.SpecificDatumWriter; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sample; -import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -280,17 +273,6 @@ private List readAvroFile(final File avroFile, boolean generic) throws IO } } - private List readAvroFileWithGenericRecords(final File avroFile) throws IOException { - final DatumReader datumReader = new GenericDatumReader<>(SCHEMA); - final List genericRecords = new ArrayList<>(); - try (DataFileReader dataFileReader = new DataFileReader<>(avroFile, datumReader)) { - while (dataFileReader.hasNext()) { - genericRecords.add(dataFileReader.next()); - } - } - return genericRecords; - } - @Parameterized.Parameters(name = "{0}_with_{1}") public static Iterable data() throws IOException { @@ -313,12 +295,23 @@ public static Iterable data() throws IOException { AvroIOTransformTest.generateAvroGenericRecords(), true }, - new Object[] { AvroIO.writeGenericRecords(SCHEMA_STRING), genericRecordsfromSchemaString, AvroIOTransformTest.generateAvroGenericRecords(), true + }, + new Object[] { + AvroIO.write(), + generatedClass, + AvroIOTransformTest.generateAvroObjects(), + false + }, + new Object[] { + AvroIO.writeGenericRecords(), + genericRecordsfromSchema, + AvroIOTransformTest.generateAvroGenericRecords(), + true }) .build(); } From da95342353bd191c55d6d7768d4c052c531b8cf1 Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Thu, 5 Oct 2017 14:43:29 +0200 Subject: [PATCH 4/7] Fixups --- .../core/src/main/java/org/apache/beam/sdk/io/AvroIO.java | 8 +++++--- .../org/apache/beam/sdk/io/ConstantAvroDestination.java | 6 +----- .../java/org/apache/beam/sdk/io/AvroIOTransformTest.java | 6 ++++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 19a60928df20..0007c46a7662 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -385,7 +385,7 @@ public static Write writeGenericRecords(Schema schema) { .build()); } - /** Writes Avro records determining the schema from the input collection. */ + /** Writes Avro {@link GenericRecord}s. */ public static Write writeGenericRecords() { return new Write<>( AvroIO.defaultWriteBuilder() @@ -418,8 +418,10 @@ public static TypedWrite writeCustomType( /** * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is - * {@link GenericRecord}. A schema can be specified in {@link DynamicAvroDestinations#getSchema}. - * If it is not specified then, it will be determined out of the input elements + * {@link GenericRecord}. A schema can be specified in {@link DynamicAvroDestinations#getSchema} + * for example to write objects with different schemas depending on the destination + * (see Writing data to multiple destinations in {@link AvroIO}. + * If the schema is not specified, then it will be determined out of the input elements */ public static TypedWrite writeCustomTypeToGenericRecords() { return AvroIO.defaultWriteBuilder().setGenericRecords(true).build(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java index b08eddff68b8..fbdae1100433 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java @@ -111,11 +111,7 @@ public FilenamePolicy getFilenamePolicy(Void destination) { @Override public Schema getSchema(Void destination) { - if (schema != null) { - return schema.get(); - } else { - return null; - } + return schema != null ? schema.get() : null; } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index bc0470767cda..6993c435b4ea 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -279,6 +279,8 @@ public static Iterable data() throws IOException { final String generatedClass = "GeneratedClass"; final String genericRecordsfromSchema = "GenericRecordsFromSchemaObject"; final String genericRecordsfromSchemaString = "GenericRecordsFromSchemaString"; + final String generatedClassWithoutSchema = "GeneratedClassWithoutSchema"; + final String genericRecordsWithoutSchema = "GenericRecordsWithoutSchema"; return ImmutableList.builder() @@ -303,13 +305,13 @@ public static Iterable data() throws IOException { }, new Object[] { AvroIO.write(), - generatedClass, + generatedClassWithoutSchema, AvroIOTransformTest.generateAvroObjects(), false }, new Object[] { AvroIO.writeGenericRecords(), - genericRecordsfromSchema, + genericRecordsWithoutSchema, AvroIOTransformTest.generateAvroGenericRecords(), true }) From d5e8b273e7e5f3b48eb198d5c307926971cd2924 Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Mon, 23 Oct 2017 14:04:49 +0200 Subject: [PATCH 5/7] Add tests with records of heterogeneous schemas for both schemaless and explicit schema write implementations. --- .../beam/sdk/io/AvroIOTransformTest.java | 112 +++++++++++++++--- 1 file changed, 94 insertions(+), 18 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index 6993c435b4ea..f19e245c9fb9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; +import static org.hamcrest.core.Is.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -30,6 +31,7 @@ import java.util.List; import javax.annotation.Nullable; import org.apache.avro.Schema; +import org.apache.avro.UnresolvedUnionException; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -49,6 +51,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -69,12 +72,16 @@ public class AvroIOTransformTest { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public final transient TestPipeline pipelineWithNoValidation = TestPipeline.create() + .enableAbandonedNodeEnforcement(false); + @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); private static final Schema.Parser parser = new Schema.Parser(); + private static final Schema.Parser parser2 = new Schema.Parser(); - private static final String SCHEMA_STRING = + private static final String SCHEMA_STRING_1 = "{\"namespace\": \"example.avro\",\n" + " \"type\": \"record\",\n" + " \"name\": \"AvroGeneratedUser\",\n" @@ -85,7 +92,19 @@ public class AvroIOTransformTest { + " ]\n" + "}"; - private static final Schema SCHEMA = parser.parse(SCHEMA_STRING); + private static final String SCHEMA_STRING_2 = + "{\"namespace\": \"example.avro\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"AvroGeneratedUser\",\n" + + " \"fields\": [\n" + + " {\"name\": \"name\", \"type\": \"string\"},\n" + + " {\"name\": \"favorite_pet\", \"type\": [\"string\", \"null\"]},\n" + + " {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" + + " ]\n" + + "}"; + + private static final Schema SCHEMA_1 = parser.parse(SCHEMA_STRING_1); + private static final Schema SCHEMA_2 = parser2.parse(SCHEMA_STRING_2); private static AvroGeneratedUser[] generateAvroObjects() { final AvroGeneratedUser user1 = new AvroGeneratedUser(); @@ -104,15 +123,32 @@ private static AvroGeneratedUser[] generateAvroObjects() { } private static GenericRecord[] generateAvroGenericRecords() { - final GenericRecord user1 = new GenericData.Record(SCHEMA); + final GenericRecord user1 = new GenericData.Record(SCHEMA_1); user1.put("name", "Bob"); user1.put("favorite_number", 256); - final GenericRecord user2 = new GenericData.Record(SCHEMA); + final GenericRecord user2 = new GenericData.Record(SCHEMA_1); user2.put("name", "Alice"); user2.put("favorite_number", 128); - final GenericRecord user3 = new GenericData.Record(SCHEMA); + final GenericRecord user3 = new GenericData.Record(SCHEMA_1); + user3.put("name", "Ted"); + user3.put("favorite_color", "white"); + + return new GenericRecord[] { user1, user2, user3 }; + } + + private static GenericRecord[] generateAvroGenericRecordsWithHeterogeneousSchemas() { + final GenericRecord user1 = new GenericData.Record(SCHEMA_1); + user1.put("name", "Bob"); + user1.put("favorite_number", 256); + + // one record with different (from the other records) but correct schema + final GenericRecord user2 = new GenericData.Record(SCHEMA_2); + user2.put("name", "Alice"); + user2.put("favorite_pet", "cat"); + + final GenericRecord user3 = new GenericData.Record(SCHEMA_1); user3.put("name", "Ted"); user3.put("favorite_color", "white"); @@ -186,14 +222,14 @@ public static Iterable data() throws IOException { // test read using schema object new Object[] { null, - AvroIO.readGenericRecords(SCHEMA), + AvroIO.readGenericRecords(SCHEMA_1), "AvroIO.Read/Read.out", generateAvroGenericRecords(), fromSchema }, new Object[] { "MyRead", - AvroIO.readGenericRecords(SCHEMA), + AvroIO.readGenericRecords(SCHEMA_1), "MyRead/Read.out", generateAvroGenericRecords(), fromSchema @@ -202,14 +238,14 @@ public static Iterable data() throws IOException { // test read using schema string new Object[] { null, - AvroIO.readGenericRecords(SCHEMA_STRING), + AvroIO.readGenericRecords(SCHEMA_STRING_1), "AvroIO.Read/Read.out", generateAvroGenericRecords(), fromSchemaString }, new Object[] { "MyRead", - AvroIO.readGenericRecords(SCHEMA_STRING), + AvroIO.readGenericRecords(SCHEMA_STRING_1), "MyRead/Read.out", generateAvroGenericRecords(), fromSchemaString @@ -246,6 +282,9 @@ public void testRead() throws Exception { @RunWith(Parameterized.class) public static class AvroIOWriteTransformTest extends AvroIOTransformTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private static final String WRITE_TRANSFORM_NAME = "AvroIO.Write"; private List readAvroFile(final File avroFile, boolean generic) throws IOException { @@ -261,7 +300,7 @@ private List readAvroFile(final File avroFile, boolean generic) throws IO } return (List) output; } else { - final DatumReader datumReader = new GenericDatumReader<>(SCHEMA); + final DatumReader datumReader = new GenericDatumReader<>(SCHEMA_1); final List genericRecords = new ArrayList<>(); try (DataFileReader dataFileReader = new DataFileReader<>(avroFile, datumReader)) { @@ -281,6 +320,9 @@ public static Iterable data() throws IOException { final String genericRecordsfromSchemaString = "GenericRecordsFromSchemaString"; final String generatedClassWithoutSchema = "GeneratedClassWithoutSchema"; final String genericRecordsWithoutSchema = "GenericRecordsWithoutSchema"; + final String genericRecordsWithHeterogeneousSchema = "GenericRecordsWithHeterogeneousSchema"; + final String genericRecordsWithHeterogeneousSchemaWithLazySchemaGuess = + "GenericRecordsWithHeterogeneousSchemaWithLazySchemaGuess"; return ImmutableList.builder() @@ -289,30 +331,49 @@ public static Iterable data() throws IOException { AvroIO.write(AvroGeneratedUser.class), generatedClass, AvroIOTransformTest.generateAvroObjects(), + false, false }, new Object[] { - AvroIO.writeGenericRecords(SCHEMA), + AvroIO.writeGenericRecords(SCHEMA_1), genericRecordsfromSchema, AvroIOTransformTest.generateAvroGenericRecords(), - true + true, + false }, new Object[] { - AvroIO.writeGenericRecords(SCHEMA_STRING), + AvroIO.writeGenericRecords(SCHEMA_STRING_1), genericRecordsfromSchemaString, AvroIOTransformTest.generateAvroGenericRecords(), - true + true, + false }, new Object[] { AvroIO.write(), generatedClassWithoutSchema, AvroIOTransformTest.generateAvroObjects(), + false, false }, new Object[] { AvroIO.writeGenericRecords(), genericRecordsWithoutSchema, AvroIOTransformTest.generateAvroGenericRecords(), + true, + false + }, + new Object[] { + AvroIO.writeGenericRecords(SCHEMA_1), + genericRecordsWithHeterogeneousSchema, + AvroIOTransformTest.generateAvroGenericRecordsWithHeterogeneousSchemas(), + true, + true + }, + new Object[] { + AvroIO.writeGenericRecords(), + genericRecordsWithHeterogeneousSchemaWithLazySchemaGuess, + AvroIOTransformTest.generateAvroGenericRecordsWithHeterogeneousSchemas(), + true, true }) .build(); @@ -331,18 +392,33 @@ public static Iterable data() throws IOException { @Parameterized.Parameter(3) public Boolean generic; + @Parameterized.Parameter(4) + public boolean expectAvroException; + private void runTestWrite(final AvroIO.Write writeBuilder, T[] inputData, - boolean generic) throws Exception { + boolean generic, boolean expectAvroException) throws Exception { + + + TestPipeline p = pipeline; final File avroFile = tmpFolder.newFile("file.avro"); final AvroIO.Write write = writeBuilder.to(avroFile.getPath()); + if (expectAvroException) { + expectedException.expect(isA(UnresolvedUnionException.class)); + // UnresolvedUnionException is thrown (because of incompatible schema) when the Create.of() + // is applied. When the exception is expected, then it does not interrupt the flow + // and proceeds to pipeline evaluation with the configured enforcements. + // In our case, we want to stop the pipeline evaluation to avoid + // Create.Values == null exception. So we deactivate enforcements. + p = pipelineWithNoValidation; + } @SuppressWarnings("unchecked") final PCollection input = - pipeline.apply(Create.of(Arrays.asList(inputData)) + p.apply(Create.of(Arrays.asList(inputData)) .withCoder((Coder) AvroCoder.of(AvroGeneratedUser.class))); input.apply(write.withoutSharding()); + p.run(); - pipeline.run(); assertEquals(WRITE_TRANSFORM_NAME, write.getName()); assertThat(readAvroFile(avroFile, generic), containsInAnyOrder((Object[]) inputData)); @@ -351,7 +427,7 @@ private void runTestWrite(final AvroIO.Write writeBuilder, T[] inputData, @Test @Category(NeedsRunner.class) public void testWrite() throws Exception { - runTestWrite(writeTransform, inputData, generic); + runTestWrite(writeTransform, inputData, generic, expectAvroException); } // TODO: for Write only, test withSuffix, withNumShards, From 3eb47b0a1a0aa077ad7df194c6d1d6705831fe30 Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Mon, 23 Oct 2017 14:15:35 +0200 Subject: [PATCH 6/7] Add a rulechain so that expectedException could stop the pipeline, in particular the validation --- .../beam/sdk/io/AvroIOTransformTest.java | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index f19e245c9fb9..4ea87f55e3d1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -52,6 +52,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; +import org.junit.rules.RuleChain; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -69,12 +70,6 @@ public class AvroIOTransformTest { // TODO: Stop requiring local files - @Rule - public final transient TestPipeline pipeline = TestPipeline.create(); - - @Rule public final transient TestPipeline pipelineWithNoValidation = TestPipeline.create() - .enableAbandonedNodeEnforcement(false); - @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -161,6 +156,9 @@ private static GenericRecord[] generateAvroGenericRecordsWithHeterogeneousSchema @RunWith(Parameterized.class) public static class AvroIOReadTransformTest extends AvroIOTransformTest { + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + private void generateAvroFile(final AvroGeneratedUser[] elements, final File avroFile) throws IOException { final DatumWriter userDatumWriter = @@ -282,9 +280,13 @@ public void testRead() throws Exception { @RunWith(Parameterized.class) public static class AvroIOWriteTransformTest extends AvroIOTransformTest { - @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); public ExpectedException expectedException = ExpectedException.none(); + // the expectedException must stop the pipeline + @Rule + public final transient RuleChain chain = RuleChain.outerRule(expectedException).around(pipeline); + private static final String WRITE_TRANSFORM_NAME = "AvroIO.Write"; private List readAvroFile(final File avroFile, boolean generic) throws IOException { @@ -405,12 +407,6 @@ private void runTestWrite(final AvroIO.Write writeBuilder, T[] inputData, if (expectAvroException) { expectedException.expect(isA(UnresolvedUnionException.class)); - // UnresolvedUnionException is thrown (because of incompatible schema) when the Create.of() - // is applied. When the exception is expected, then it does not interrupt the flow - // and proceeds to pipeline evaluation with the configured enforcements. - // In our case, we want to stop the pipeline evaluation to avoid - // Create.Values == null exception. So we deactivate enforcements. - p = pipelineWithNoValidation; } @SuppressWarnings("unchecked") final PCollection input = From 5d91d6760ceeac56ac9e11b8685f9d219ae926b3 Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Mon, 23 Oct 2017 14:18:15 +0200 Subject: [PATCH 7/7] Cleaning --- .../java/org/apache/beam/sdk/io/AvroIOTransformTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index 4ea87f55e3d1..90339771f8ee 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -281,11 +281,11 @@ public void testRead() throws Exception { public static class AvroIOWriteTransformTest extends AvroIOTransformTest { public final transient TestPipeline pipeline = TestPipeline.create(); - public ExpectedException expectedException = ExpectedException.none(); + public ExpectedException thrown = ExpectedException.none(); - // the expectedException must stop the pipeline + // the ExpectedException must stop the pipeline @Rule - public final transient RuleChain chain = RuleChain.outerRule(expectedException).around(pipeline); + public final transient RuleChain chain = RuleChain.outerRule(thrown).around(pipeline); private static final String WRITE_TRANSFORM_NAME = "AvroIO.Write"; @@ -406,7 +406,7 @@ private void runTestWrite(final AvroIO.Write writeBuilder, T[] inputData, final AvroIO.Write write = writeBuilder.to(avroFile.getPath()); if (expectAvroException) { - expectedException.expect(isA(UnresolvedUnionException.class)); + thrown.expect(isA(UnresolvedUnionException.class)); } @SuppressWarnings("unchecked") final PCollection input =