Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-6526] Add ReadFiles transform for AvroIO #7672

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
127 changes: 121 additions & 6 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,23 @@ public static <T> ReadAll<T> readAll(Class<T> recordClass) {
.build();
}

/**
* Like {@link #read}, but reads each file in a {@link PCollection} of {@link
* FileIO.ReadableFile}, returned by {@link FileIO#readMatches}.
*/
public static <T> ReadFiles<T> readFiles(Class<T> recordClass) {
return new AutoValue_AvroIO_ReadFiles.Builder<T>()
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
.setRecordClass(recordClass)
.setSchema(ReflectData.get().getSchema(recordClass))
.setInferBeamSchema(false)
// 64MB is a reasonable value that allows to amortize the cost of opening files,
// but is not so large as to exhaust a typical runner's maximum amount of output per
// ProcessElement call.
.setDesiredBundleSizeBytes(64 * 1024 * 1024L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it makes sense to extract a constant with the size and give the comment above it? Currently, multiple places use the same value (not only from here) but it's documented only in two of them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good idea, I will extract the constant and move the doc there.

.build();
}

/** Reads Avro file(s) containing records of the specified schema. */
public static Read<GenericRecord> readGenericRecords(Schema schema) {
return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
Expand All @@ -322,6 +339,17 @@ public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
.build();
}

/** Like {@link #readGenericRecords(Schema)}, but for {@link FileIO.ReadableFile} collections. */
public static ReadFiles<GenericRecord> readFilesGenericRecords(Schema schema) {
return new AutoValue_AvroIO_ReadFiles.Builder<GenericRecord>()
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
.setRecordClass(GenericRecord.class)
.setSchema(schema)
.setInferBeamSchema(false)
.setDesiredBundleSizeBytes(64 * 1024 * 1024L)
.build();
}

/**
* Reads Avro file(s) containing records of the specified schema. The schema is specified as a
* JSON-encoded string.
Expand All @@ -338,6 +366,11 @@ public static ReadAll<GenericRecord> readAllGenericRecords(String schema) {
return readAllGenericRecords(new Schema.Parser().parse(schema));
}

/** Like {@link #readGenericRecords(String)}, but for {@link FileIO.ReadableFile} collections. */
public static ReadFiles<GenericRecord> readFilesGenericRecords(String schema) {
return readFilesGenericRecords(new Schema.Parser().parse(schema));
}

/**
* Reads Avro file(s) containing records of an unspecified schema and converting each record to a
* custom type.
Expand Down Expand Up @@ -667,12 +700,7 @@ public PCollection<T> expand(PCollection<String> input) {
input
.apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
.apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
.apply(
"Read all via FileBasedSource",
new ReadAllViaFileBasedSource<>(
getDesiredBundleSizeBytes(),
new CreateSourceFn<>(getRecordClass(), getSchema().toString()),
AvroCoder.of(getRecordClass(), getSchema())));
.apply(readFiles(getRecordClass()));
return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read;
}

Expand Down Expand Up @@ -705,6 +733,93 @@ public FileBasedSource<T> apply(String input) {

/////////////////////////////////////////////////////////////////////////////

/** Implementation of {@link #readFiles}. */
@AutoValue
public abstract static class ReadFiles<T>
extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {
abstract MatchConfiguration getMatchConfiguration();

@Nullable
abstract Class<T> getRecordClass();

@Nullable
abstract Schema getSchema();

abstract long getDesiredBundleSizeBytes();

abstract boolean getInferBeamSchema();

abstract Builder<T> toBuilder();

@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setMatchConfiguration(MatchConfiguration matchConfiguration);

abstract Builder<T> setRecordClass(Class<T> recordClass);

abstract Builder<T> setSchema(Schema schema);

abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);

abstract Builder<T> setInferBeamSchema(boolean infer);

abstract ReadFiles<T> build();
}

/** Sets the {@link MatchConfiguration}. */
public ReadFiles<T> withMatchConfiguration(MatchConfiguration configuration) {
return toBuilder().setMatchConfiguration(configuration).build();
}

/** Like {@link Read#withEmptyMatchTreatment}. */
public ReadFiles<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
}

/** Like {@link Read#watchForNewFiles}. */
@Experimental(Kind.SPLITTABLE_DO_FN)
public ReadFiles<T> watchForNewFiles(
Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
return withMatchConfiguration(
getMatchConfiguration().continuously(pollInterval, terminationCondition));
}

@VisibleForTesting
ReadFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
}

/**
* If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output
* to be used by SQL and by the schema-transform library.
*/
@Experimental(Kind.SCHEMAS)
public ReadFiles<T> withBeamSchemas(boolean withBeamSchemas) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I like that. It seems to be possible to use it in ParquetIO in the same, right (asking for the sake of future PRs)? People seem to wish it frequently

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it makes sense for ParquetIO too. Worth to fill a JIRA IMO. Notice that I just created that method to be consistent with the existing read() and readAll signatures.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. fyi, there already is a JIRA: https://issues.apache.org/jira/browse/BEAM-4812.

return toBuilder().setInferBeamSchema(withBeamSchemas).build();
}

@Override
public PCollection<T> expand(PCollection<FileIO.ReadableFile> input) {
checkNotNull(getSchema(), "schema");
PCollection<T> read =
input.apply(
"Read all via FileBasedSource",
new ReadAllViaFileBasedSource<>(
getDesiredBundleSizeBytes(),
new CreateSourceFn<>(getRecordClass(), getSchema().toString()),
AvroCoder.of(getRecordClass(), getSchema())));
return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read;
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.include("matchConfiguration", getMatchConfiguration());
}
}

/////////////////////////////////////////////////////////////////////////////

/** Implementation of {@link #parseGenericRecords}. */
@AutoValue
public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io;

import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
import static org.apache.beam.sdk.io.Compression.AUTO;
import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
import static org.apache.beam.sdk.transforms.Contextful.fn;
import static org.apache.beam.sdk.transforms.Requirements.requiresSideInputs;
Expand Down Expand Up @@ -421,6 +422,15 @@ public void testWriteSingleFileThenReadUsingAllMethods() throws Throwable {
.from(outputFile.getAbsolutePath())
.withHintMatchesManyFiles()))
.containsInAnyOrder(values);
PAssert.that(
path.apply(FileIO.matchAll())
.apply(FileIO.readMatches().withCompression(AUTO))
.apply(
"ReadFiles",
AvroIO.readFiles(GenericClass.class)
.withBeamSchemas(withBeamSchemas)
.withDesiredBundleSizeBytes(10)))
.containsInAnyOrder(values);
PAssert.that(
path.apply(
"ReadAll",
Expand Down Expand Up @@ -486,6 +496,16 @@ public void testWriteThenReadMultipleFilepatterns() throws Throwable {
Create.of(
tmpFolder.getRoot().getAbsolutePath() + "/first*",
tmpFolder.getRoot().getAbsolutePath() + "/second*"));
PAssert.that(
paths
.apply(FileIO.matchAll())
.apply(FileIO.readMatches().withCompression(AUTO))
.apply(
"ReadFiles",
AvroIO.readFiles(GenericClass.class)
.withBeamSchemas(withBeamSchemas)
.withDesiredBundleSizeBytes(10)))
.containsInAnyOrder(Iterables.concat(firstValues, secondValues));
PAssert.that(
paths.apply(
"Read all",
Expand Down Expand Up @@ -549,7 +569,8 @@ public void testContinuouslyWriteAndReadMultipleFilepatterns() throws Throwable
.withNumShards(3)
.withWindowedWrites());

// Test read(), readAll(), parse(), and parseAllGenericRecords() with watchForNewFiles().
// Test read(), readFiles(), readAll(), parse(), and parseAllGenericRecords() with
// watchForNewFiles().
PAssert.that(
readPipeline.apply(
"Read",
Expand All @@ -576,6 +597,19 @@ public void testContinuouslyWriteAndReadMultipleFilepatterns() throws Throwable
Create.of(
tmpFolder.getRoot().getAbsolutePath() + "/first*",
tmpFolder.getRoot().getAbsolutePath() + "/second*"));
PAssert.that(
paths
.apply(FileIO.matchAll())
.apply(FileIO.readMatches().withCompression(AUTO))
.apply(
"ReadFiles",
AvroIO.readFiles(GenericClass.class)
.withBeamSchemas(withBeamSchemas)
.watchForNewFiles(
Duration.millis(100),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))
.withDesiredBundleSizeBytes(10)))
.containsInAnyOrder(Iterables.concat(firstValues, secondValues));
PAssert.that(
paths.apply(
"Read all",
Expand Down