Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,8 @@ public static Sink sink(Schema schema) {
return new AutoValue_ParquetIO_Sink.Builder()
.setJsonSchema(schema.toString())
.setCompressionCodec(CompressionCodecName.SNAPPY)
// This resembles the default value for ParquetWriter.rowGroupSize.
.setRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
.build();
}

Expand All @@ -1067,6 +1069,8 @@ public abstract static class Sink implements FileIO.Sink<GenericRecord> {

abstract @Nullable SerializableConfiguration getConfiguration();

abstract int getRowGroupSize();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -1077,6 +1081,8 @@ abstract static class Builder {

abstract Builder setConfiguration(SerializableConfiguration configuration);

abstract Builder setRowGroupSize(int rowGroupSize);

abstract Sink build();
}

Expand All @@ -1097,6 +1103,12 @@ public Sink withConfiguration(Configuration configuration) {
return toBuilder().setConfiguration(new SerializableConfiguration(configuration)).build();
}

/** Specify row-group size; if not set or zero, a default is used by the underlying writer. */
public Sink withRowGroupSize(int rowGroupSize) {
checkArgument(rowGroupSize > 0, "rowGroupSize must be positive");
return toBuilder().setRowGroupSize(rowGroupSize).build();
}

private transient @Nullable ParquetWriter<GenericRecord> writer;

@Override
Expand All @@ -1108,13 +1120,14 @@ public void open(WritableByteChannel channel) throws IOException {
BeamParquetOutputFile beamParquetOutputFile =
new BeamParquetOutputFile(Channels.newOutputStream(channel));

this.writer =
AvroParquetWriter.Builder<GenericRecord> builder =
AvroParquetWriter.<GenericRecord>builder(beamParquetOutputFile)
.withSchema(schema)
.withCompressionCodec(getCompressionCodec())
.withWriteMode(OVERWRITE)
.withConf(SerializableConfiguration.newConfiguration(getConfiguration()))
.build();
.withRowGroupSize(getRowGroupSize());
this.writer = builder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,24 @@ public void testWriteAndRead() {
readPipeline.run().waitUntilFinish();
}

@Test
public void testWriteWithRowGroupSizeAndRead() {
List<GenericRecord> records = generateGenericRecords(1000);

mainPipeline
.apply(Create.of(records).withCoder(AvroCoder.of(SCHEMA)))
.apply(
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(SCHEMA).withRowGroupSize(1500))
.to(temporaryFolder.getRoot().getAbsolutePath()));
mainPipeline.run().waitUntilFinish();
PCollection<GenericRecord> readBack =
readPipeline.apply(
ParquetIO.read(SCHEMA).from(temporaryFolder.getRoot().getAbsolutePath() + "/*"));
PAssert.that(readBack).containsInAnyOrder(records);
readPipeline.run().waitUntilFinish();
}

@Test
public void testWriteAndReadWithSplit() {
List<GenericRecord> records = generateGenericRecords(1000);
Expand Down