From 24cac2f3962c25de6403aaa3f12a79be56efe555 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Wed, 30 Jan 2019 11:47:03 +0100 Subject: [PATCH] [BEAM-6526] Add ReadFiles transform for AvroIO --- .../java/org/apache/beam/sdk/io/AvroIO.java | 127 +++++++++++++++++- .../org/apache/beam/sdk/io/AvroIOTest.java | 36 ++++- 2 files changed, 156 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 2830a04caf9c3..5b90fa0c98d62 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -297,6 +297,23 @@ public static ReadAll readAll(Class recordClass) { .build(); } + /** + * Like {@link #read}, but reads each file in a {@link PCollection} of {@link + * FileIO.ReadableFile}, returned by {@link FileIO#readMatches}. + */ + public static ReadFiles readFiles(Class recordClass) { + return new AutoValue_AvroIO_ReadFiles.Builder() + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) + .setRecordClass(recordClass) + .setSchema(ReflectData.get().getSchema(recordClass)) + .setInferBeamSchema(false) + // 64MB is a reasonable value that allows to amortize the cost of opening files, + // but is not so large as to exhaust a typical runner's maximum amount of output per + // ProcessElement call. + .setDesiredBundleSizeBytes(64 * 1024 * 1024L) + .build(); + } + /** Reads Avro file(s) containing records of the specified schema. */ public static Read readGenericRecords(Schema schema) { return new AutoValue_AvroIO_Read.Builder() @@ -322,6 +339,17 @@ public static ReadAll readAllGenericRecords(Schema schema) { .build(); } + /** Like {@link #readGenericRecords(Schema)}, but for {@link FileIO.ReadableFile} collections. */ + public static ReadFiles readFilesGenericRecords(Schema schema) { + return new AutoValue_AvroIO_ReadFiles.Builder() + .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) + .setRecordClass(GenericRecord.class) + .setSchema(schema) + .setInferBeamSchema(false) + .setDesiredBundleSizeBytes(64 * 1024 * 1024L) + .build(); + } + /** * Reads Avro file(s) containing records of the specified schema. The schema is specified as a * JSON-encoded string. @@ -338,6 +366,11 @@ public static ReadAll readAllGenericRecords(String schema) { return readAllGenericRecords(new Schema.Parser().parse(schema)); } + /** Like {@link #readGenericRecords(String)}, but for {@link FileIO.ReadableFile} collections. */ + public static ReadFiles readFilesGenericRecords(String schema) { + return readFilesGenericRecords(new Schema.Parser().parse(schema)); + } + /** * Reads Avro file(s) containing records of an unspecified schema and converting each record to a * custom type. @@ -667,12 +700,7 @@ public PCollection expand(PCollection input) { input .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration())) .apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT)) - .apply( - "Read all via FileBasedSource", - new ReadAllViaFileBasedSource<>( - getDesiredBundleSizeBytes(), - new CreateSourceFn<>(getRecordClass(), getSchema().toString()), - AvroCoder.of(getRecordClass(), getSchema()))); + .apply(readFiles(getRecordClass())); return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read; } @@ -705,6 +733,93 @@ public FileBasedSource apply(String input) { ///////////////////////////////////////////////////////////////////////////// + /** Implementation of {@link #readFiles}. */ + @AutoValue + public abstract static class ReadFiles + extends PTransform, PCollection> { + abstract MatchConfiguration getMatchConfiguration(); + + @Nullable + abstract Class getRecordClass(); + + @Nullable + abstract Schema getSchema(); + + abstract long getDesiredBundleSizeBytes(); + + abstract boolean getInferBeamSchema(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration); + + abstract Builder setRecordClass(Class recordClass); + + abstract Builder setSchema(Schema schema); + + abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes); + + abstract Builder setInferBeamSchema(boolean infer); + + abstract ReadFiles build(); + } + + /** Sets the {@link MatchConfiguration}. */ + public ReadFiles withMatchConfiguration(MatchConfiguration configuration) { + return toBuilder().setMatchConfiguration(configuration).build(); + } + + /** Like {@link Read#withEmptyMatchTreatment}. */ + public ReadFiles withEmptyMatchTreatment(EmptyMatchTreatment treatment) { + return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); + } + + /** Like {@link Read#watchForNewFiles}. */ + @Experimental(Kind.SPLITTABLE_DO_FN) + public ReadFiles watchForNewFiles( + Duration pollInterval, TerminationCondition terminationCondition) { + return withMatchConfiguration( + getMatchConfiguration().continuously(pollInterval, terminationCondition)); + } + + @VisibleForTesting + ReadFiles withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { + return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); + } + + /** + * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output + * to be used by SQL and by the schema-transform library. + */ + @Experimental(Kind.SCHEMAS) + public ReadFiles withBeamSchemas(boolean withBeamSchemas) { + return toBuilder().setInferBeamSchema(withBeamSchemas).build(); + } + + @Override + public PCollection expand(PCollection input) { + checkNotNull(getSchema(), "schema"); + PCollection read = + input.apply( + "Read all via FileBasedSource", + new ReadAllViaFileBasedSource<>( + getDesiredBundleSizeBytes(), + new CreateSourceFn<>(getRecordClass(), getSchema().toString()), + AvroCoder.of(getRecordClass(), getSchema()))); + return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.include("matchConfiguration", getMatchConfiguration()); + } + } + + ///////////////////////////////////////////////////////////////////////////// + /** Implementation of {@link #parseGenericRecords}. */ @AutoValue public abstract static class Parse extends PTransform> { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 8d706864439fc..acd983e6d3272 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io; import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC; +import static org.apache.beam.sdk.io.Compression.AUTO; import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE; import static org.apache.beam.sdk.transforms.Contextful.fn; import static org.apache.beam.sdk.transforms.Requirements.requiresSideInputs; @@ -421,6 +422,15 @@ public void testWriteSingleFileThenReadUsingAllMethods() throws Throwable { .from(outputFile.getAbsolutePath()) .withHintMatchesManyFiles())) .containsInAnyOrder(values); + PAssert.that( + path.apply(FileIO.matchAll()) + .apply(FileIO.readMatches().withCompression(AUTO)) + .apply( + "ReadFiles", + AvroIO.readFiles(GenericClass.class) + .withBeamSchemas(withBeamSchemas) + .withDesiredBundleSizeBytes(10))) + .containsInAnyOrder(values); PAssert.that( path.apply( "ReadAll", @@ -486,6 +496,16 @@ public void testWriteThenReadMultipleFilepatterns() throws Throwable { Create.of( tmpFolder.getRoot().getAbsolutePath() + "/first*", tmpFolder.getRoot().getAbsolutePath() + "/second*")); + PAssert.that( + paths + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches().withCompression(AUTO)) + .apply( + "ReadFiles", + AvroIO.readFiles(GenericClass.class) + .withBeamSchemas(withBeamSchemas) + .withDesiredBundleSizeBytes(10))) + .containsInAnyOrder(Iterables.concat(firstValues, secondValues)); PAssert.that( paths.apply( "Read all", @@ -549,7 +569,8 @@ public void testContinuouslyWriteAndReadMultipleFilepatterns() throws Throwable .withNumShards(3) .withWindowedWrites()); - // Test read(), readAll(), parse(), and parseAllGenericRecords() with watchForNewFiles(). + // Test read(), readFiles(), readAll(), parse(), and parseAllGenericRecords() with + // watchForNewFiles(). PAssert.that( readPipeline.apply( "Read", @@ -576,6 +597,19 @@ public void testContinuouslyWriteAndReadMultipleFilepatterns() throws Throwable Create.of( tmpFolder.getRoot().getAbsolutePath() + "/first*", tmpFolder.getRoot().getAbsolutePath() + "/second*")); + PAssert.that( + paths + .apply(FileIO.matchAll()) + .apply(FileIO.readMatches().withCompression(AUTO)) + .apply( + "ReadFiles", + AvroIO.readFiles(GenericClass.class) + .withBeamSchemas(withBeamSchemas) + .watchForNewFiles( + Duration.millis(100), + Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3))) + .withDesiredBundleSizeBytes(10))) + .containsInAnyOrder(Iterables.concat(firstValues, secondValues)); PAssert.that( paths.apply( "Read all",