Skip to content

Commit

Permalink
Merge pull request #13639: [BEAM-11526] Add Beam schema support for P…
Browse files Browse the repository at this point in the history
…arquetIO reads
  • Loading branch information
iemejia committed Jan 4, 2021
2 parents cf74313 + 1a617be commit 2f205f1
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.ReadFn;
import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.SplitReadFn;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -157,9 +157,10 @@
* column in a row is stored interleaved.
*
* <pre>{@code
* * PCollection<GenericRecord> records = pipeline.apply(ParquetIO.read(SCHEMA).from("/foo/bar").withProjection(Projection_schema,Encoder_Schema));
* * ...
* *
* PCollection<GenericRecord> records =
* pipeline
* .apply(
* ParquetIO.read(SCHEMA).from("/foo/bar").withProjection(Projection_schema,Encoder_Schema));
* }</pre>
*
* <h3>Reading records of an unknown schema</h3>
Expand All @@ -183,7 +184,7 @@
* return ...;
* }
* })
* .setFilePattern(...));
* .from(...));
*
* // For reading from files
* PCollection<FileIO.ReadableFile> files = p.apply(...);
Expand All @@ -200,6 +201,27 @@
* }));
* }</pre>
*
* <h3>Inferring Beam schemas from Parquet files</h3>
*
* <p>If you want to use SQL or schema based operations on an Parquet-based PCollection, you must
* configure the read transform to infer the Beam schema and automatically setup the Beam related
* coders by doing:
*
* <pre>{@code
* PCollection<GenericRecord> parquetRecords =
* p.apply(ParquetIO.read(...).from(...).withBeamSchemas(true));
* }</pre>
*
* You can also use it when reading a list of filenams from a {@code PCollection<String>}:
*
* <pre>{@code
* PCollection<String> filePatterns = p.apply(...);
*
* PCollection<GenericRecord> parquetRecords =
* filePatterns
* .apply(ParquetIO.readFiles(...).withBeamSchemas(true));
* }</pre>
*
* <h3>Writing Parquet files</h3>
*
* <p>{@link ParquetIO.Sink} allows you to write a {@link PCollection} of {@link GenericRecord} into
Expand Down Expand Up @@ -241,7 +263,11 @@ public class ParquetIO {
* pattern).
*/
public static Read read(Schema schema) {
return new AutoValue_ParquetIO_Read.Builder().setSchema(schema).setSplittable(false).build();
return new AutoValue_ParquetIO_Read.Builder()
.setSchema(schema)
.setInferBeamSchema(false)
.setSplittable(false)
.build();
}

/**
Expand All @@ -251,6 +277,7 @@ public static Read read(Schema schema) {
public static ReadFiles readFiles(Schema schema) {
return new AutoValue_ParquetIO_ReadFiles.Builder()
.setSplittable(false)
.setInferBeamSchema(false)
.setSchema(schema)
.build();
}
Expand Down Expand Up @@ -294,13 +321,17 @@ public abstract static class Read extends PTransform<PBegin, PCollection<Generic

abstract @Nullable SerializableConfiguration getConfiguration();

abstract boolean getInferBeamSchema();

abstract boolean isSplittable();

abstract Builder toBuilder();

@AutoValue.Builder
abstract static class Builder {

abstract Builder setInferBeamSchema(boolean inferBeamSchema);

abstract Builder setSplittable(boolean split);

abstract Builder setFilepattern(ValueProvider<String> filepattern);
Expand Down Expand Up @@ -341,6 +372,11 @@ public Read withConfiguration(Map<String, String> configuration) {
return toBuilder().setConfiguration(SerializableConfiguration.fromMap(configuration)).build();
}

@Experimental(Kind.SCHEMAS)
public Read withBeamSchemas(boolean inferBeamSchema) {
return toBuilder().setInferBeamSchema(inferBeamSchema).build();
}

/** Enable the Splittable reading. */
public Read withSplit() {
return toBuilder().setSplittable(true).build();
Expand All @@ -366,10 +402,14 @@ public PCollection<GenericRecord> expand(PBegin input) {
return inputFiles.apply(
readFiles(getSchema())
.withSplit()
.withBeamSchemas(getInferBeamSchema())
.withAvroDataModel(getAvroDataModel())
.withProjection(getProjectionSchema(), getEncoderSchema()));
}
return inputFiles.apply(readFiles(getSchema()).withAvroDataModel(getAvroDataModel()));
return inputFiles.apply(
readFiles(getSchema())
.withBeamSchemas(getInferBeamSchema())
.withAvroDataModel(getAvroDataModel()));
}

@Override
Expand Down Expand Up @@ -472,7 +512,7 @@ public ParseFiles<T> withSplit() {
}

@Override
public PCollection<T> expand(PCollection<ReadableFile> input) {
public PCollection<T> expand(PCollection<FileIO.ReadableFile> input) {
checkArgument(!isGenericRecordOutput(), "Parse can't be used for reading as GenericRecord.");

return input
Expand All @@ -481,7 +521,7 @@ public PCollection<T> expand(PCollection<ReadableFile> input) {
}

/** Returns Splittable or normal Parquet file reading DoFn. */
private DoFn<ReadableFile, T> buildFileReadingFn() {
private DoFn<FileIO.ReadableFile, T> buildFileReadingFn() {
return isSplittable()
? new SplitReadFn<>(null, null, getParseFn(), getConfiguration())
: new ReadFn<>(null, getParseFn(), getConfiguration());
Expand Down Expand Up @@ -532,6 +572,8 @@ public abstract static class ReadFiles

abstract @Nullable SerializableConfiguration getConfiguration();

abstract boolean getInferBeamSchema();

abstract boolean isSplittable();

abstract Builder toBuilder();
Expand All @@ -548,6 +590,8 @@ abstract static class Builder {

abstract Builder setConfiguration(SerializableConfiguration configuration);

abstract Builder setInferBeamSchema(boolean inferBeamSchema);

abstract Builder setSplittable(boolean split);

abstract ReadFiles build();
Expand All @@ -573,6 +617,11 @@ public ReadFiles withConfiguration(Map<String, String> configuration) {
return toBuilder().setConfiguration(SerializableConfiguration.fromMap(configuration)).build();
}

@Experimental(Kind.SCHEMAS)
public ReadFiles withBeamSchemas(boolean inferBeamSchema) {
return toBuilder().setInferBeamSchema(inferBeamSchema).build();
}

/** Enable the Splittable reading. */
public ReadFiles withSplit() {
return toBuilder().setSplittable(true).build();
Expand All @@ -581,24 +630,31 @@ public ReadFiles withSplit() {
@Override
public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile> input) {
checkNotNull(getSchema(), "Schema can not be null");
if (isSplittable()) {
Schema coderSchema = getProjectionSchema() == null ? getSchema() : getEncoderSchema();
return input
.apply(
ParDo.of(
new SplitReadFn<>(
getAvroDataModel(),
getProjectionSchema(),
GenericRecordPassthroughFn.create(),
getConfiguration())))
.setCoder(AvroCoder.of(coderSchema));
}
return input
.apply(
ParDo.of(
new ReadFn<>(
getAvroDataModel(), GenericRecordPassthroughFn.create(), getConfiguration())))
.setCoder(AvroCoder.of(getSchema()));
return input.apply(ParDo.of(getReaderFn())).setCoder(getCollectionCoder());
}

/** Returns Parquet file reading function based on {@link #isSplittable()}. */
private DoFn<FileIO.ReadableFile, GenericRecord> getReaderFn() {
return isSplittable()
? new SplitReadFn<>(
getAvroDataModel(),
getProjectionSchema(),
GenericRecordPassthroughFn.create(),
getConfiguration())
: new ReadFn<>(
getAvroDataModel(), GenericRecordPassthroughFn.create(), getConfiguration());
}

/**
* Returns {@link org.apache.beam.sdk.schemas.SchemaCoder} when using Beam schemas, {@link
* AvroCoder} when not using Beam schema.
*/
@Experimental(Kind.SCHEMAS)
private Coder<GenericRecord> getCollectionCoder() {
Schema coderSchema =
getProjectionSchema() != null && isSplittable() ? getEncoderSchema() : getSchema();

return getInferBeamSchema() ? AvroUtils.schemaCoder(coderSchema) : AvroCoder.of(coderSchema);
}

@DoFn.BoundedPerElement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void testWriteAndReadWithProjection() {
}

@Test
public void testBlockTracker() throws Exception {
public void testBlockTracker() {
OffsetRange range = new OffsetRange(0, 1);
ParquetIO.ReadFiles.BlockTracker tracker = new ParquetIO.ReadFiles.BlockTracker(range, 7, 3);
assertEquals(tracker.getProgress().getWorkRemaining(), 1.0, 0.01);
Expand Down Expand Up @@ -205,6 +205,29 @@ public void testWriteAndReadWithSplit() {
readPipeline.run().waitUntilFinish();
}

@Test
public void testWriteAndReadWithBeamSchema() {
List<GenericRecord> records = generateGenericRecords(1000);

mainPipeline
.apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
.apply(
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(SCHEMA))
.to(temporaryFolder.getRoot().getAbsolutePath()));

mainPipeline.run().waitUntilFinish();

PCollection<GenericRecord> readBackRecords =
readPipeline.apply(
ParquetIO.read(SCHEMA)
.from(temporaryFolder.getRoot().getAbsolutePath() + "/*")
.withBeamSchemas(true));

PAssert.that(readBackRecords).containsInAnyOrder(records);
readPipeline.run().waitUntilFinish();
}

@Test
public void testWriteAndReadFilesAsJsonForWithSplitForUnknownSchema() {
List<GenericRecord> records = generateGenericRecords(1000);
Expand Down Expand Up @@ -274,6 +297,7 @@ public void testReadFilesAsJsonForUnknownSchemaFiles() {
}

@Test
@SuppressWarnings({"nullable", "ConstantConditions"} /* forced check. */)
public void testReadFilesUnknownSchemaFilesForGenericRecordThrowException() {
IllegalArgumentException illegalArgumentException =
assertThrows(
Expand Down

0 comments on commit 2f205f1

Please sign in to comment.