From 75e167ec34d6e4b6c120fe45e1abd02d2c7afaa4 Mon Sep 17 00:00:00 2001 From: anantd Date: Wed, 16 Dec 2020 04:20:09 +0800 Subject: [PATCH 1/6] [BEAM-11460] Implement reading unknown schema files for ParquetIO --- .../apache/beam/sdk/io/parquet/ParquetIO.java | 171 +++++++++++++++--- .../beam/sdk/io/parquet/ParquetIOTest.java | 105 ++++++++++- 2 files changed, 245 insertions(+), 31 deletions(-) diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index 5dd9a11d76d79..8195d64b97792 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.parquet; import static java.lang.String.format; +import static org.apache.parquet.Preconditions.checkArgument; import static org.apache.parquet.Preconditions.checkNotNull; import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; @@ -38,6 +39,9 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.fs.ResourceId; @@ -48,11 +52,14 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; @@ -152,6 +159,34 @@ * * * } * + *

Reading records of an unknown schema

+ * + *

To read records from files whose schema is unknown at pipeline construction time or differs + * between files, use {@link #parseFilesGenericRecords(SerializableFunction)} - in this case, you + * will need to specify a parsing function for converting each {@link GenericRecord} into a value of + * your custom type. + * + *

It expects a {@link PCollection} of filepatterns with unknown schema, use {@link FileIO} + * matching plus {@link #parseFilesGenericRecords(SerializableFunction)}. + * + *

For example: + * + *

{@code
+ * Pipeline p = ...;
+ *
+ * PCollection filepatterns = p.apply(...);
+ *
+ * PCollection records =
+ *     filepatterns.apply(FileIO.matchAll())
+ *     .apply(FileIO.readMatches())
+ *     .apply(ParquetIO.parseGenericRecords(new SerializableFunction() {
+ *       public Foo apply(GenericRecord record) {
+ *         // If needed, access the schema of the record using record.getSchema()
+ *         return ...;
+ *       }
+ *     }));
+ * }
+ * *

Writing Parquet files

* *

{@link ParquetIO.Sink} allows you to write a {@link PCollection} of {@link GenericRecord} into @@ -200,10 +235,18 @@ public static Read read(Schema schema) { * Like {@link #read(Schema)}, but reads each file in a {@link PCollection} of {@link * org.apache.beam.sdk.io.FileIO.ReadableFile}, which allows more flexible usage. */ - public static ReadFiles readFiles(Schema schema) { - return new AutoValue_ParquetIO_ReadFiles.Builder() + public static ReadFiles readFiles(Schema schema) { + return parseFilesGenericRecords(GenericRecordPassthroughFn.create()) + .toBuilder() .setSchema(schema) + .build(); + } + + public static ReadFiles parseFilesGenericRecords( + SerializableFunction parseFn) { + return new AutoValue_ParquetIO_ReadFiles.Builder() .setSplittable(false) + .setParseFn(parseFn) .build(); } @@ -302,8 +345,8 @@ public void populateDisplayData(DisplayData.Builder builder) { /** Implementation of {@link #readFiles(Schema)}. */ @AutoValue - public abstract static class ReadFiles - extends PTransform, PCollection> { + public abstract static class ReadFiles + extends PTransform, PCollection> { abstract @Nullable Schema getSchema(); @@ -315,31 +358,35 @@ public abstract static class ReadFiles abstract boolean isSplittable(); - abstract Builder toBuilder(); + abstract SerializableFunction getParseFn(); + + abstract Builder toBuilder(); @AutoValue.Builder - abstract static class Builder { - abstract Builder setSchema(Schema schema); + abstract static class Builder { + abstract Builder setSchema(Schema schema); - abstract Builder setAvroDataModel(GenericData model); + abstract Builder setAvroDataModel(GenericData model); - abstract Builder setEncoderSchema(Schema schema); + abstract Builder setEncoderSchema(Schema schema); - abstract Builder setProjectionSchema(Schema schema); + abstract Builder setProjectionSchema(Schema schema); - abstract Builder setSplittable(boolean split); + abstract Builder setSplittable(boolean split); - abstract ReadFiles build(); + abstract Builder setParseFn(SerializableFunction parseFn); + + abstract ReadFiles build(); } /** * Define the Avro data model; see {@link AvroParquetReader.Builder#withDataModel(GenericData)}. */ - public ReadFiles withAvroDataModel(GenericData model) { + public ReadFiles withAvroDataModel(GenericData model) { return toBuilder().setAvroDataModel(model).build(); } - public ReadFiles withProjection(Schema projectionSchema, Schema encoderSchema) { + public ReadFiles withProjection(Schema projectionSchema, Schema encoderSchema) { return toBuilder() .setProjectionSchema(projectionSchema) .setEncoderSchema(encoderSchema) @@ -347,36 +394,81 @@ public ReadFiles withProjection(Schema projectionSchema, Schema encoderSchema) { .build(); } /** Enable the Splittable reading. */ - public ReadFiles withSplit() { + public ReadFiles withSplit() { return toBuilder().setSplittable(true).build(); } @Override - public PCollection expand(PCollection input) { - checkNotNull(getSchema(), "Schema can not be null"); + public PCollection expand(PCollection input) { + checkArgument( + !isGenericRecordOutput() || getSchema() != null, + "Schema is required for GenericRecord output."); + CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry(); + if (isSplittable()) { - Schema coderSchema = getProjectionSchema() == null ? getSchema() : getEncoderSchema(); return input - .apply(ParDo.of(new SplitReadFn(getAvroDataModel(), getProjectionSchema()))) - .setCoder(AvroCoder.of(coderSchema)); + .apply( + ParDo.of( + new SplitReadFn<>(getAvroDataModel(), getProjectionSchema(), getParseFn()))) + .setCoder(inferCoder(coderRegistry)); } return input - .apply(ParDo.of(new ReadFn(getAvroDataModel()))) - .setCoder(AvroCoder.of(getSchema())); + .apply(ParDo.of(new ReadFn<>(getAvroDataModel(), getParseFn()))) + .setCoder(inferCoder(coderRegistry)); + } + + /** Returns true if expected output is {@code PCollection}. */ + private boolean isGenericRecordOutput() { + String outputType = TypeDescriptors.outputOf(getParseFn()).getType().getTypeName(); + return outputType.equals(GenericRecord.class.getTypeName()); + } + + /** + * Identifies the {@code Coder} to be used for the output PCollection. + * + *

Returns {@link AvroCoder} if expected output is {@link GenericRecord}. + * + * @param coderRegistry the {@link org.apache.beam.sdk.Pipeline}'s CoderRegistry to identify + * Coder for expected output type of {@link #getParseFn()} + */ + @SuppressWarnings("unchecked") // Validation is done via #isGenericRecordOutput + private Coder inferCoder(CoderRegistry coderRegistry) { + if (isGenericRecordOutput()) { + Schema coderSchema = getSchema(); + + if (isSplittable()) { + coderSchema = getProjectionSchema() == null ? getSchema() : getEncoderSchema(); + } + return (Coder) + AvroCoder.of(checkNotNull(coderSchema, "coder schema should not be null")); + } + + // If not GenericRecord infer it from ParseFn. + try { + return coderRegistry.getCoder(TypeDescriptors.outputOf(getParseFn())); + } catch (CannotProvideCoderException e) { + throw new IllegalArgumentException( + "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().", + e); + } } @DoFn.BoundedPerElement - static class SplitReadFn extends DoFn { + static class SplitReadFn extends DoFn { private Class modelClass; private static final Logger LOG = LoggerFactory.getLogger(SplitReadFn.class); private String requestSchemaString; // Default initial splitting the file into blocks of 64MB. Unit of SPLIT_LIMIT is byte. private static final long SPLIT_LIMIT = 64000000; - SplitReadFn(GenericData model, Schema requestSchema) { + private final SerializableFunction parseFn; + + SplitReadFn( + GenericData model, Schema requestSchema, SerializableFunction parseFn) { this.modelClass = model != null ? model.getClass() : null; this.requestSchemaString = requestSchema != null ? requestSchema.toString() : null; + this.parseFn = checkNotNull(parseFn, "GenericRecord parse function is null"); } ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws Exception { @@ -388,7 +480,7 @@ ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws Exceptio public void processElement( @Element FileIO.ReadableFile file, RestrictionTracker tracker, - OutputReceiver outputReceiver) + OutputReceiver outputReceiver) throws Exception { LOG.debug( "start " @@ -468,7 +560,7 @@ record = recordReader.read(); file.toString()); continue; } - outputReceiver.output(record); + outputReceiver.output(parseFn.apply(record)); } catch (RuntimeException e) { throw new ParquetDecodingException( @@ -618,12 +710,16 @@ public Progress getProgress() { } } - static class ReadFn extends DoFn { + static class ReadFn extends DoFn { private Class modelClass; - ReadFn(GenericData model) { + private final SerializableFunction parseFn; + + ReadFn(GenericData model, SerializableFunction parseFn) { + this.modelClass = model != null ? model.getClass() : null; + this.parseFn = checkNotNull(parseFn, "GenericRecord parse function is null"); } @ProcessElement @@ -647,7 +743,7 @@ public void processElement(ProcessContext processContext) throws Exception { try (ParquetReader reader = builder.build()) { GenericRecord read; while ((read = reader.read()) != null) { - processContext.output(read); + processContext.output(parseFn.apply(read)); } } } @@ -838,6 +934,23 @@ public void close() throws IOException { } } + /** + * Passthrough function to provide seamless backward compatibility to ParquetIO's functionality. + */ + @VisibleForTesting + static class GenericRecordPassthroughFn + implements SerializableFunction { + + static GenericRecordPassthroughFn create() { + return new GenericRecordPassthroughFn(); + } + + @Override + public GenericRecord apply(GenericRecord input) { + return input; + } + } + /** Disallow construction of utility class. */ private ParquetIO() {} } diff --git a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java index 684ff5f8be101..5e19004cd74d4 100644 --- a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java +++ b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java @@ -17,26 +17,35 @@ */ package org.apache.beam.sdk.io.parquet; +import static java.util.stream.Collectors.toList; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.io.JsonEncoder; import org.apache.avro.reflect.ReflectData; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.parquet.ParquetIO.GenericRecordPassthroughFn; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; @@ -136,8 +145,10 @@ public void testBlockTracker() throws Exception { @Test public void testSplitBlockWithLimit() { - ParquetIO.ReadFiles.SplitReadFn testFn = new ParquetIO.ReadFiles.SplitReadFn(null, null); - ArrayList blockList = new ArrayList(); + ParquetIO.ReadFiles.SplitReadFn testFn = + new ParquetIO.ReadFiles.SplitReadFn<>( + null, null, ParquetIO.GenericRecordPassthroughFn.create()); + ArrayList blockList = new ArrayList<>(); ArrayList rangeList; BlockMetaData testBlock = mock(BlockMetaData.class); when(testBlock.getTotalByteSize()).thenReturn((long) 60); @@ -194,6 +205,31 @@ public void testWriteAndReadWithSplit() { readPipeline.run().waitUntilFinish(); } + @Test + public void testWriteAndReadFilesAsJsonForWithSplitForUnknownSchema() { + List records = generateGenericRecords(1000); + + mainPipeline + .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA))) + .apply( + FileIO.write() + .via(ParquetIO.sink(SCHEMA)) + .to(temporaryFolder.getRoot().getAbsolutePath())); + mainPipeline.run().waitUntilFinish(); + + PCollection readBackAsJsonWithSplit = + readPipeline + .apply(Create.of(temporaryFolder.getRoot().getAbsolutePath() + "/*")) + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches()) + .apply( + ParquetIO.parseFilesGenericRecords(ParseGenericRecordAsJsonFn.create()) + .withSplit()); + + PAssert.that(readBackAsJsonWithSplit).containsInAnyOrder(convertRecordsToJson(records)); + readPipeline.run().waitUntilFinish(); + } + @Test public void testWriteAndReadFiles() { List records = generateGenericRecords(1000); @@ -216,6 +252,43 @@ public void testWriteAndReadFiles() { mainPipeline.run().waitUntilFinish(); } + @Test + public void testReadFilesAsJsonForUnknownSchemaFiles() { + List records = generateGenericRecords(1000); + List expectedJsonRecords = convertRecordsToJson(records); + + PCollection writeThenRead = + mainPipeline + .apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA))) + .apply( + FileIO.write() + .via(ParquetIO.sink(SCHEMA)) + .to(temporaryFolder.getRoot().getAbsolutePath())) + .getPerDestinationOutputFilenames() + .apply(Values.create()) + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches()) + .apply(ParquetIO.parseFilesGenericRecords(ParseGenericRecordAsJsonFn.create())); + + assertEquals(1000, expectedJsonRecords.size()); + PAssert.that(writeThenRead).containsInAnyOrder(expectedJsonRecords); + + mainPipeline.run().waitUntilFinish(); + } + + @Test + public void testReadFilesUnknownSchemaFilesForGenericRecordThrowException() { + IllegalArgumentException illegalArgumentException = + assertThrows( + IllegalArgumentException.class, + () -> + ParquetIO.parseFilesGenericRecords(GenericRecordPassthroughFn.create()) + .expand(null)); + + assertEquals( + "Schema is required for GenericRecord output.", illegalArgumentException.getMessage()); + } + private List generateGenericRecords(long count) { ArrayList data = new ArrayList<>(); GenericRecordBuilder builder = new GenericRecordBuilder(SCHEMA); @@ -345,4 +418,32 @@ public void testWriteAndReadwithSplitUsingReflectDataSchemaWithDataModel() { PAssert.that(readBack).containsInAnyOrder(records); readPipeline.run().waitUntilFinish(); } + + /** Returns list of JSON representation of GenericRecords. */ + private static List convertRecordsToJson(List records) { + return records.stream().map(ParseGenericRecordAsJsonFn.create()::apply).collect(toList()); + } + + /** Sample Parse function that converts GenericRecord as JSON. for testing. */ + private static class ParseGenericRecordAsJsonFn + implements SerializableFunction { + + public static ParseGenericRecordAsJsonFn create() { + return new ParseGenericRecordAsJsonFn(); + } + + @Override + public String apply(GenericRecord input) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + try { + JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(input.getSchema(), baos, true); + new GenericDatumWriter(input.getSchema()).write(input, jsonEncoder); + jsonEncoder.flush(); + } catch (IOException ioException) { + throw new RuntimeException("error converting record to JSON", ioException); + } + return baos.toString(); + } + } } From 06868797ecee81d764489f5d94ca312ba280c8a4 Mon Sep 17 00:00:00 2001 From: anantd Date: Wed, 16 Dec 2020 04:42:07 +0800 Subject: [PATCH 2/6] Update ParquertIO capability in CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index d5f9fdd89e07d..7a57db4306143 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -58,6 +58,7 @@ * ReadFromMongoDB/WriteToMongoDB will mask password in display_data (Python) ([BEAM-11444](https://issues.apache.org/jira/browse/BEAM-11444).) * Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). * There is a new transform `ReadAllFromBigQuery` that can receive multiple requests to read data from BigQuery at pipeline runtime. See [PR 13170](https://github.com/apache/beam/pull/13170), and [BEAM-9650](https://issues.apache.org/jira/browse/BEAM-9650). +* ParquetIO can now read files with an unknown schema. See [PR-13554](https://github.com/apache/beam/pull/13554) and ([BEAM-11460](https://issues.apache.org/jira/browse/BEAM-11460)) ## New Features / Improvements From de6d2686e813cbb9a5e510aed46f8d990d351992 Mon Sep 17 00:00:00 2001 From: anantd Date: Wed, 16 Dec 2020 06:46:21 +0800 Subject: [PATCH 3/6] improve backward compatibility by creating separate `Parse` and `ParseFiles` implementation for supporting files with unknown schema. --- .../apache/beam/sdk/io/parquet/ParquetIO.java | 247 ++++++++++++------ .../beam/sdk/io/parquet/ParquetIOTest.java | 13 +- 2 files changed, 177 insertions(+), 83 deletions(-) diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index 8195d64b97792..d499ef57b9a6a 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -44,8 +44,11 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileIO.ReadableFile; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; +import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.ReadFn; +import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.SplitReadFn; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Create; @@ -162,29 +165,33 @@ *

Reading records of an unknown schema

* *

To read records from files whose schema is unknown at pipeline construction time or differs - * between files, use {@link #parseFilesGenericRecords(SerializableFunction)} - in this case, you - * will need to specify a parsing function for converting each {@link GenericRecord} into a value of - * your custom type. - * - *

It expects a {@link PCollection} of filepatterns with unknown schema, use {@link FileIO} - * matching plus {@link #parseFilesGenericRecords(SerializableFunction)}. + * between files, use {@link #parseGenericRecords(SerializableFunction)} - in this case, you will + * need to specify a parsing function for converting each {@link GenericRecord} into a value of your + * custom type. * *

For example: * *

{@code
  * Pipeline p = ...;
  *
- * PCollection filepatterns = p.apply(...);
- *
  * PCollection records =
- *     filepatterns.apply(FileIO.matchAll())
- *     .apply(FileIO.readMatches())
- *     .apply(ParquetIO.parseGenericRecords(new SerializableFunction() {
+ *     p.apply(ParquetIO.parseGenericRecords(new SerializableFunction() {
  *       public Foo apply(GenericRecord record) {
  *         // If needed, access the schema of the record using record.getSchema()
  *         return ...;
- *       }
- *     }));
+ *       }}));
+ *
+ * // For reading from filepatterns
+ *  PCollection filepatterns = p.apply(...);
+ *
+ *  PCollection records =
+ *     filepatterns
+ *       .apply(ParquetIO.parseFilesGenericRecords(new SerializableFunction() {
+ *         public Foo apply(GenericRecord record) {
+ *         // If needed, access the schema of the record using record.getSchema()
+ *         return ...;
+ *         }
+ *       }));
  * }
* *

Writing Parquet files

@@ -235,18 +242,33 @@ public static Read read(Schema schema) { * Like {@link #read(Schema)}, but reads each file in a {@link PCollection} of {@link * org.apache.beam.sdk.io.FileIO.ReadableFile}, which allows more flexible usage. */ - public static ReadFiles readFiles(Schema schema) { - return parseFilesGenericRecords(GenericRecordPassthroughFn.create()) - .toBuilder() + public static ReadFiles readFiles(Schema schema) { + return new AutoValue_ParquetIO_ReadFiles.Builder() + .setSplittable(false) .setSchema(schema) .build(); } - public static ReadFiles parseFilesGenericRecords( - SerializableFunction parseFn) { - return new AutoValue_ParquetIO_ReadFiles.Builder() + /** + * Reads {@link GenericRecord} from a Parquet file (or multiple Parquet files matching the + * pattern) and converts to user defined type using provided parseFn. + */ + public static Parse parseGenericRecords(SerializableFunction parseFn) { + return new AutoValue_ParquetIO_Parse.Builder() + .setParseFn(parseFn) .setSplittable(false) + .build(); + } + + /** + * Reads {@link GenericRecord} from Parquet files and converts to user defined type using provided + * {@code parseFn}. + */ + public static ParseFiles parseFilesGenericRecords( + SerializableFunction parseFn) { + return new AutoValue_ParquetIO_ParseFiles.Builder() .setParseFn(parseFn) + .setSplittable(false) .build(); } @@ -343,78 +365,89 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - /** Implementation of {@link #readFiles(Schema)}. */ + /** Implementation of {@link #parseGenericRecords(SerializableFunction)}. */ @AutoValue - public abstract static class ReadFiles - extends PTransform, PCollection> { - - abstract @Nullable Schema getSchema(); - - abstract @Nullable GenericData getAvroDataModel(); - - abstract @Nullable Schema getEncoderSchema(); + public abstract static class Parse extends PTransform> { + abstract @Nullable ValueProvider getFilepattern(); - abstract @Nullable Schema getProjectionSchema(); + abstract SerializableFunction getParseFn(); abstract boolean isSplittable(); - abstract SerializableFunction getParseFn(); - abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { - abstract Builder setSchema(Schema schema); + abstract Builder setFilepattern(ValueProvider inputFiles); - abstract Builder setAvroDataModel(GenericData model); + abstract Builder setParseFn(SerializableFunction parseFn); - abstract Builder setEncoderSchema(Schema schema); + abstract Builder setSplittable(boolean splittable); - abstract Builder setProjectionSchema(Schema schema); + abstract Parse build(); + } - abstract Builder setSplittable(boolean split); + public Parse from(ValueProvider inputFiles) { + return toBuilder().setFilepattern(inputFiles).build(); + } - abstract Builder setParseFn(SerializableFunction parseFn); + public Parse from(String inputFiles) { + return from(ValueProvider.StaticValueProvider.of(inputFiles)); + } - abstract ReadFiles build(); + public Parse withSplit() { + return toBuilder().setSplittable(true).build(); } - /** - * Define the Avro data model; see {@link AvroParquetReader.Builder#withDataModel(GenericData)}. - */ - public ReadFiles withAvroDataModel(GenericData model) { - return toBuilder().setAvroDataModel(model).build(); + @Override + public PCollection expand(PBegin input) { + checkNotNull(getFilepattern(), "Filepattern cannot be null."); + return input + .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches()) + .apply( + parseFilesGenericRecords(getParseFn()) + .toBuilder() + .setSplittable(isSplittable()) + .build()); } + } - public ReadFiles withProjection(Schema projectionSchema, Schema encoderSchema) { - return toBuilder() - .setProjectionSchema(projectionSchema) - .setEncoderSchema(encoderSchema) - .setSplittable(true) - .build(); + /** Implementation of {@link #parseFilesGenericRecords(SerializableFunction)}. */ + @AutoValue + public abstract static class ParseFiles + extends PTransform, PCollection> { + + abstract SerializableFunction getParseFn(); + + abstract boolean isSplittable(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setParseFn(SerializableFunction parseFn); + + abstract Builder setSplittable(boolean split); + + abstract ParseFiles build(); } - /** Enable the Splittable reading. */ - public ReadFiles withSplit() { + + public ParseFiles withSplit() { return toBuilder().setSplittable(true).build(); } @Override - public PCollection expand(PCollection input) { - checkArgument( - !isGenericRecordOutput() || getSchema() != null, - "Schema is required for GenericRecord output."); - CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry(); + public PCollection expand(PCollection input) { + checkArgument(!isGenericRecordOutput(), "Parse can't be used for reading as GenericRecord."); - if (isSplittable()) { - return input - .apply( - ParDo.of( - new SplitReadFn<>(getAvroDataModel(), getProjectionSchema(), getParseFn()))) - .setCoder(inferCoder(coderRegistry)); - } - return input - .apply(ParDo.of(new ReadFn<>(getAvroDataModel(), getParseFn()))) - .setCoder(inferCoder(coderRegistry)); + PCollection parsedRecords = + isSplittable() + ? input.apply(ParDo.of(new SplitReadFn<>(null, null, getParseFn()))) + : input.apply(ParDo.of(new ReadFn<>(null, getParseFn()))); + + return parsedRecords.setCoder(inferCoder(input.getPipeline().getCoderRegistry())); } /** Returns true if expected output is {@code PCollection}. */ @@ -431,16 +464,9 @@ private boolean isGenericRecordOutput() { * @param coderRegistry the {@link org.apache.beam.sdk.Pipeline}'s CoderRegistry to identify * Coder for expected output type of {@link #getParseFn()} */ - @SuppressWarnings("unchecked") // Validation is done via #isGenericRecordOutput private Coder inferCoder(CoderRegistry coderRegistry) { if (isGenericRecordOutput()) { - Schema coderSchema = getSchema(); - - if (isSplittable()) { - coderSchema = getProjectionSchema() == null ? getSchema() : getEncoderSchema(); - } - return (Coder) - AvroCoder.of(checkNotNull(coderSchema, "coder schema should not be null")); + throw new IllegalArgumentException("Parse can't be used for reading as GenericRecord."); } // If not GenericRecord infer it from ParseFn. @@ -452,6 +478,77 @@ private Coder inferCoder(CoderRegistry coderRegistry) { e); } } + } + + /** Implementation of {@link #readFiles(Schema)}. */ + @AutoValue + public abstract static class ReadFiles + extends PTransform, PCollection> { + + abstract @Nullable Schema getSchema(); + + abstract @Nullable GenericData getAvroDataModel(); + + abstract @Nullable Schema getEncoderSchema(); + + abstract @Nullable Schema getProjectionSchema(); + + abstract boolean isSplittable(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setSchema(Schema schema); + + abstract Builder setAvroDataModel(GenericData model); + + abstract Builder setEncoderSchema(Schema schema); + + abstract Builder setProjectionSchema(Schema schema); + + abstract Builder setSplittable(boolean split); + + abstract ReadFiles build(); + } + + /** + * Define the Avro data model; see {@link AvroParquetReader.Builder#withDataModel(GenericData)}. + */ + public ReadFiles withAvroDataModel(GenericData model) { + return toBuilder().setAvroDataModel(model).build(); + } + + public ReadFiles withProjection(Schema projectionSchema, Schema encoderSchema) { + return toBuilder() + .setProjectionSchema(projectionSchema) + .setEncoderSchema(encoderSchema) + .setSplittable(true) + .build(); + } + /** Enable the Splittable reading. */ + public ReadFiles withSplit() { + return toBuilder().setSplittable(true).build(); + } + + @Override + public PCollection expand(PCollection input) { + checkNotNull(getSchema(), "Schema can not be null"); + if (isSplittable()) { + Schema coderSchema = getProjectionSchema() == null ? getSchema() : getEncoderSchema(); + return input + .apply( + ParDo.of( + new SplitReadFn<>( + getAvroDataModel(), + getProjectionSchema(), + GenericRecordPassthroughFn.create()))) + .setCoder(AvroCoder.of(coderSchema)); + } + return input + .apply(ParDo.of(new ReadFn<>(getAvroDataModel(), GenericRecordPassthroughFn.create()))) + .setCoder(AvroCoder.of(getSchema())); + } @DoFn.BoundedPerElement static class SplitReadFn extends DoFn { @@ -468,7 +565,7 @@ static class SplitReadFn extends DoFn { this.modelClass = model != null ? model.getClass() : null; this.requestSchemaString = requestSchema != null ? requestSchema.toString() : null; - this.parseFn = checkNotNull(parseFn, "GenericRecord parse function is null"); + this.parseFn = checkNotNull(parseFn, "GenericRecord parse function can't be null"); } ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws Exception { diff --git a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java index 5e19004cd74d4..76831d13425d8 100644 --- a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java +++ b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java @@ -218,13 +218,10 @@ public void testWriteAndReadFilesAsJsonForWithSplitForUnknownSchema() { mainPipeline.run().waitUntilFinish(); PCollection readBackAsJsonWithSplit = - readPipeline - .apply(Create.of(temporaryFolder.getRoot().getAbsolutePath() + "/*")) - .apply(FileIO.matchAll()) - .apply(FileIO.readMatches()) - .apply( - ParquetIO.parseFilesGenericRecords(ParseGenericRecordAsJsonFn.create()) - .withSplit()); + readPipeline.apply( + ParquetIO.parseGenericRecords(ParseGenericRecordAsJsonFn.create()) + .from(temporaryFolder.getRoot().getAbsolutePath() + "/*") + .withSplit()); PAssert.that(readBackAsJsonWithSplit).containsInAnyOrder(convertRecordsToJson(records)); readPipeline.run().waitUntilFinish(); @@ -286,7 +283,7 @@ public void testReadFilesUnknownSchemaFilesForGenericRecordThrowException() { .expand(null)); assertEquals( - "Schema is required for GenericRecord output.", illegalArgumentException.getMessage()); + "Parse can't be used for reading as GenericRecord.", illegalArgumentException.getMessage()); } private List generateGenericRecords(long count) { From 431831bd818a9ba4a60050a762ec83f250bcf696 Mon Sep 17 00:00:00 2001 From: Anant Damle Date: Thu, 24 Dec 2020 22:59:14 +0800 Subject: [PATCH 4/6] Fix review comments by @iemejia 1. Fix Javadoc example by using consistent words 1. Other indentation and space fixes --- .../apache/beam/sdk/io/parquet/ParquetIO.java | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index d499ef57b9a6a..044656d03154b 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -175,23 +175,29 @@ * Pipeline p = ...; * * PCollection records = - * p.apply(ParquetIO.parseGenericRecords(new SerializableFunction() { - * public Foo apply(GenericRecord record) { - * // If needed, access the schema of the record using record.getSchema() - * return ...; - * }})); + * p.apply( + * ParquetIO.parseGenericRecords( + * new SerializableFunction() { + * public Foo apply(GenericRecord record) { + * // If needed, access the schema of the record using record.getSchema() + * return ...; + * } + * }) + * .setFilePattern(...)); * - * // For reading from filepatterns - * PCollection filepatterns = p.apply(...); + * // For reading from files + * PCollection files = p.apply(...); * * PCollection records = - * filepatterns - * .apply(ParquetIO.parseFilesGenericRecords(new SerializableFunction() { - * public Foo apply(GenericRecord record) { - * // If needed, access the schema of the record using record.getSchema() - * return ...; - * } - * })); + * files + * .apply( + * ParquetIO.parseFilesGenericRecords( + * new SerializableFunction() { + * public Foo apply(GenericRecord record) { + * // If needed, access the schema of the record using record.getSchema() + * return ...; + * } + * })); * } * *

Writing Parquet files

@@ -814,7 +820,6 @@ static class ReadFn extends DoFn { private final SerializableFunction parseFn; ReadFn(GenericData model, SerializableFunction parseFn) { - this.modelClass = model != null ? model.getClass() : null; this.parseFn = checkNotNull(parseFn, "GenericRecord parse function is null"); } From 6faeda36070e4cec5aee1a7c91eeee9bf3d48872 Mon Sep 17 00:00:00 2001 From: Anant Damle Date: Thu, 24 Dec 2020 23:02:15 +0800 Subject: [PATCH 5/6] Make GenericRecordPassthroughFn singleton --- .../main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index 044656d03154b..ddfbbeb682028 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -1043,8 +1043,10 @@ public void close() throws IOException { static class GenericRecordPassthroughFn implements SerializableFunction { + private static final GenericRecordPassthroughFn singleton = new GenericRecordPassthroughFn(); + static GenericRecordPassthroughFn create() { - return new GenericRecordPassthroughFn(); + return singleton; } @Override From afe11c646ee7cd0098844f927d33b002c877ded0 Mon Sep 17 00:00:00 2001 From: anantd Date: Thu, 24 Dec 2020 23:34:57 +0800 Subject: [PATCH 6/6] fix spotless Apply --- .../src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index ddfbbeb682028..74099cff3875f 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -1044,7 +1044,7 @@ static class GenericRecordPassthroughFn implements SerializableFunction { private static final GenericRecordPassthroughFn singleton = new GenericRecordPassthroughFn(); - + static GenericRecordPassthroughFn create() { return singleton; }