Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export gcs options #30772

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
54 changes: 53 additions & 1 deletion sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,12 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class FileIO {
// The record count and buffering duration to trigger flushing records to a tmp file. Mainly
// used for writing unbounded data to avoid generating too many small files.
private static final int FILE_TRIGGERING_RECORD_COUNT = 100000;
private static final int FILE_TRIGGERING_BYTE_COUNT = 64 * 1024 * 1024; // 64MiB as of now
private static final Duration FILE_TRIGGERING_RECORD_BUFFERING_DURATION =
Duration.standardSeconds(5);
private static final Logger LOG = LoggerFactory.getLogger(FileIO.class);

/**
Expand Down Expand Up @@ -395,6 +401,9 @@ public static <InputT> Write<Void, InputT> write() {
.setIgnoreWindowing(false)
.setAutoSharding(false)
.setNoSpilling(false)
.setFileTriggeringRecordCount(FILE_TRIGGERING_RECORD_COUNT)
.setFileTriggeringByteCount(FILE_TRIGGERING_BYTE_COUNT)
.setFileTriggeringRecordBufferingDuration(FILE_TRIGGERING_RECORD_BUFFERING_DURATION)
.build();
}

Expand All @@ -409,6 +418,9 @@ public static <DestT, InputT> Write<DestT, InputT> writeDynamic() {
.setIgnoreWindowing(false)
.setAutoSharding(false)
.setNoSpilling(false)
.setFileTriggeringRecordCount(FILE_TRIGGERING_RECORD_COUNT)
.setFileTriggeringByteCount(FILE_TRIGGERING_BYTE_COUNT)
.setFileTriggeringRecordBufferingDuration(FILE_TRIGGERING_RECORD_BUFFERING_DURATION)
.build();
}

Expand Down Expand Up @@ -1045,6 +1057,12 @@ public static FileNaming relativeFileNaming(

abstract @Nullable ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

public abstract int getFileTriggeringRecordCount();

public abstract int getFileTriggeringByteCount();

public abstract Duration getFileTriggeringRecordBufferingDuration();

abstract Builder<DestinationT, UserT> toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -1096,6 +1114,14 @@ abstract Builder<DestinationT, UserT> setSharding(
abstract Builder<DestinationT, UserT> setBadRecordErrorHandler(
@Nullable ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Builder<DestinationT, UserT> setFileTriggeringRecordCount(
int fileTriggeringRecordCount);

abstract Builder<DestinationT, UserT> setFileTriggeringByteCount(int fileTriggeringByteCount);

abstract Builder<DestinationT, UserT> setFileTriggeringRecordBufferingDuration(
Duration fileTriggeringRecordBufferingDuration);

abstract Write<DestinationT, UserT> build();
}

Expand Down Expand Up @@ -1338,6 +1364,21 @@ public Write<DestinationT, UserT> withBadRecordErrorHandler(
return toBuilder().setBadRecordErrorHandler(errorHandler).build();
}

public Write<DestinationT, UserT> withFileTriggeringRecordCount(int fileTriggeringRecordCount) {
return toBuilder().setFileTriggeringRecordCount(fileTriggeringRecordCount).build();
}

public Write<DestinationT, UserT> withFileTriggeringByteCount(int fileTriggeringByteCount) {
return toBuilder().setFileTriggeringByteCount(fileTriggeringByteCount).build();
}

public Write<DestinationT, UserT> withFileTriggeringRecordBufferingDuration(
Duration fileTriggeringRecordBufferingDuration) {
return toBuilder()
.setFileTriggeringRecordBufferingDuration(fileTriggeringRecordBufferingDuration)
.build();
}

@VisibleForTesting
Contextful<Fn<DestinationT, FileNaming>> resolveFileNamingFn() {
if (getDynamic()) {
Expand Down Expand Up @@ -1424,6 +1465,10 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
resolvedSpec.setIgnoreWindowing(getIgnoreWindowing());
resolvedSpec.setAutoSharding(getAutoSharding());
resolvedSpec.setNoSpilling(getNoSpilling());
resolvedSpec.setFileTriggeringRecordCount(FILE_TRIGGERING_RECORD_COUNT);
resolvedSpec.setFileTriggeringByteCount(FILE_TRIGGERING_BYTE_COUNT);
resolvedSpec.setFileTriggeringRecordBufferingDuration(
FILE_TRIGGERING_RECORD_BUFFERING_DURATION);

Write<DestinationT, UserT> resolved = resolvedSpec.build();
WriteFiles<UserT, DestinationT, ?> writeFiles =
Expand All @@ -1440,8 +1485,15 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
writeFiles = writeFiles.withWindowedWrites();
}
if (getAutoSharding()) {
writeFiles = writeFiles.withAutoSharding();
writeFiles =
writeFiles
.withAutoSharding()
.withFileTriggeringByteCount(getFileTriggeringByteCount())
.withFileTriggeringRecordCount(getFileTriggeringRecordCount())
.withFileTriggeringRecordBufferingDuration(
getFileTriggeringRecordBufferingDuration());
}

if (getNoSpilling()) {
writeFiles = writeFiles.withNoSpilling();
}
Expand Down
64 changes: 63 additions & 1 deletion sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@
})
public class TextIO {
private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;
// The record count and buffering duration to trigger flushing records to a tmp file. Mainly
// used for writing unbounded data to avoid generating too many small files.
private static final int FILE_TRIGGERING_RECORD_COUNT = 100000;
private static final int FILE_TRIGGERING_BYTE_COUNT = 64 * 1024 * 1024; // 64MiB as of now
private static final Duration FILE_TRIGGERING_RECORD_BUFFERING_DURATION =
Duration.standardSeconds(5);

/**
* A {@link PTransform} that reads from one or more text files and returns a bounded {@link
Expand Down Expand Up @@ -278,6 +284,9 @@ public static <UserT> TypedWrite<UserT, Void> writeCustomType() {
.setNoSpilling(false)
.setSkipIfEmpty(false)
.setAutoSharding(false)
.setFileTriggeringRecordCount(FILE_TRIGGERING_RECORD_COUNT)
.setFileTriggeringByteCount(FILE_TRIGGERING_BYTE_COUNT)
.setFileTriggeringRecordBufferingDuration(FILE_TRIGGERING_RECORD_BUFFERING_DURATION)
.build();
}

Expand Down Expand Up @@ -706,6 +715,12 @@ public abstract static class TypedWrite<UserT, DestinationT>
/** Whether to enable autosharding. */
abstract boolean getAutoSharding();

abstract int getFileTriggeringRecordCount();

abstract int getFileTriggeringByteCount();

abstract Duration getFileTriggeringRecordBufferingDuration();

/** Whether to skip the spilling of data caused by having maxNumWritersPerBundle. */
abstract boolean getNoSpilling();

Expand Down Expand Up @@ -761,6 +776,14 @@ abstract Builder<UserT, DestinationT> setNumShards(

abstract Builder<UserT, DestinationT> setAutoSharding(boolean windowedWrites);

abstract Builder<UserT, DestinationT> setFileTriggeringRecordCount(
int fileTriggeringRecordCount);

abstract Builder<UserT, DestinationT> setFileTriggeringByteCount(int fileTriggeringByteCount);

abstract Builder<UserT, DestinationT> setFileTriggeringRecordBufferingDuration(
Duration fileTriggeringRecordBufferingDuration);

abstract Builder<UserT, DestinationT> setNoSpilling(boolean noSpilling);

abstract Builder<UserT, DestinationT> setSkipIfEmpty(boolean noSpilling);
Expand Down Expand Up @@ -1009,6 +1032,23 @@ public TypedWrite<UserT, DestinationT> withAutoSharding() {
return toBuilder().setAutoSharding(true).build();
}

public TypedWrite<UserT, DestinationT> withFileTriggeringRecordCount(
int fileTriggeringRecordCount) {
return toBuilder().setFileTriggeringRecordCount(fileTriggeringRecordCount).build();
}

public TypedWrite<UserT, DestinationT> withFileTriggeringByteCount(
int fileTriggeringByteCount) {
return toBuilder().setFileTriggeringByteCount(fileTriggeringByteCount).build();
}

public TypedWrite<UserT, DestinationT> withFileTriggeringRecordBufferingDuration(
Duration fileTriggeringRecordBufferingDuration) {
return toBuilder()
.setFileTriggeringRecordBufferingDuration(fileTriggeringRecordBufferingDuration)
.build();
}

/** See {@link WriteFiles#withNoSpilling()}. */
public TypedWrite<UserT, DestinationT> withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
Expand Down Expand Up @@ -1125,7 +1165,6 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

resolveDynamicDestinations().populateDisplayData(builder);
builder
.addIfNotNull(
Expand All @@ -1139,6 +1178,15 @@ public void populateDisplayData(DisplayData.Builder builder) {
DisplayData.item(
"writableByteChannelFactory", getWritableByteChannelFactory().toString())
.withLabel("Compression/Transformation Type"));
if (getAutoSharding()) {
builder
.add(DisplayData.item("triggeringByteCount", getFileTriggeringByteCount()))
.add(DisplayData.item("triggeringRecordCount", getFileTriggeringRecordCount()))
.add(
DisplayData.item(
"triggeringRecordBufferingDuration",
getFileTriggeringRecordBufferingDuration().getStandardSeconds()));
}
}
}

Expand Down Expand Up @@ -1286,6 +1334,20 @@ public Write withAutoSharding() {
return new Write(inner.withAutoSharding());
}

public Write withFileTriggeringRecordCount(int fileTriggeringRecordCount) {
return new Write(inner.withFileTriggeringRecordCount(fileTriggeringRecordCount));
}

public Write withFileTriggeringByteCount(int fileTriggeringByteCount) {
return new Write(inner.withFileTriggeringByteCount(fileTriggeringByteCount));
}

public Write withFileTriggeringRecordBufferingDuration(
Duration fileTriggeringRecordBufferingDuration) {
return new Write(
inner.withFileTriggeringRecordBufferingDuration(fileTriggeringRecordBufferingDuration));
}

/** See {@link TypedWrite#withNoSpilling}. */
public Write withNoSpilling() {
return new Write(inner.withNoSpilling());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, Out
.setSkipIfEmpty(false)
.setBadRecordErrorHandler(new DefaultErrorHandler<>())
.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER)
.setFileTriggeringRecordCount(FILE_TRIGGERING_RECORD_COUNT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of repeating these everywhere we create the builder how about creating a static create method that constructs the builder and adds defaults instead of calling the builder constructor directly. Could unify the other defaults as well.

pseudocode:
in AutoValue_WriteFiles.Builder

static Builder create() {
return new AutoValue_WriteFiles.Builder<UserT, DestinationT, OutputT>.Builder()
.setFileTriggeringByteCount(FILE_TRIGGERING_BYTE_COUNT) // 64MiB as of now
}

and make constructor private

I believe that you can override th builder constructor to set the defaults. That seems better in that it will have a single location instead of duplicated across different files.

.setFileTriggeringByteCount(FILE_TRIGGERING_BYTE_COUNT) // 64MiB as of now
.setFileTriggeringRecordBufferingDuration(FILE_TRIGGERING_RECORD_BUFFERING_DURATION)
.build();
}

Expand Down Expand Up @@ -204,6 +207,12 @@ public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, Out

public abstract BadRecordRouter getBadRecordRouter();

public abstract int getFileTriggeringRecordCount();

public abstract int getFileTriggeringByteCount();

public abstract Duration getFileTriggeringRecordBufferingDuration();

abstract Builder<UserT, DestinationT, OutputT> toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -238,6 +247,15 @@ abstract Builder<UserT, DestinationT, OutputT> setBadRecordErrorHandler(
abstract Builder<UserT, DestinationT, OutputT> setBadRecordRouter(
BadRecordRouter badRecordRouter);

abstract Builder<UserT, DestinationT, OutputT> setFileTriggeringRecordCount(
int fileTriggeringRecordCount);

abstract Builder<UserT, DestinationT, OutputT> setFileTriggeringByteCount(
int fileTriggeringByteCount);

abstract Builder<UserT, DestinationT, OutputT> setFileTriggeringRecordBufferingDuration(
Duration fileTriggeringRecordBufferingDuration);

abstract WriteFiles<UserT, DestinationT, OutputT> build();
}

Expand Down Expand Up @@ -369,6 +387,23 @@ public WriteFiles<UserT, DestinationT, OutputT> withBadRecordErrorHandler(
.build();
}

public WriteFiles<UserT, DestinationT, OutputT> withFileTriggeringRecordCount(
int fileTriggeringRecordCount) {
return toBuilder().setFileTriggeringRecordCount(fileTriggeringRecordCount).build();
}

public WriteFiles<UserT, DestinationT, OutputT> withFileTriggeringByteCount(
int fileTriggeringByteCount) {
return toBuilder().setFileTriggeringByteCount(fileTriggeringByteCount).build();
}

public WriteFiles<UserT, DestinationT, OutputT> withFileTriggeringRecordBufferingDuration(
Duration fileTriggeringRecordBufferingDuration) {
return toBuilder()
.setFileTriggeringRecordBufferingDuration(fileTriggeringRecordBufferingDuration)
.build();
}

@Override
public void validate(PipelineOptions options) {
getSink().validate(options);
Expand Down Expand Up @@ -466,6 +501,15 @@ public void populateDisplayData(DisplayData.Builder builder) {
DisplayData.item("numShards", getNumShardsProvider())
.withLabel("Fixed Number of Shards"));
}
if (getWithAutoSharding()) {
builder
.add(DisplayData.item("triggeringByteCount", getFileTriggeringByteCount()))
.add(DisplayData.item("triggeringRecordCount", getFileTriggeringRecordCount()))
.add(
DisplayData.item(
"triggeringRecordBufferingDuration",
getFileTriggeringRecordBufferingDuration().getStandardSeconds()));
}
}

private DynamicDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() {
Expand Down Expand Up @@ -900,7 +944,8 @@ public PCollection<List<FileResult<DestinationT>>> expand(PCollection<UserT> inp
// the same time batches the input records. The sharding behavior depends on runners. The
// batching is per window and we also emit the batches if there are a certain number of
// records buffered or they have been buffered for a certain time, controlled by
// FILE_TRIGGERING_RECORD_COUNT and BUFFERING_DURATION respectively.
// FILE_TRIGGERING_RECORD_COUNT and BUFFERING_DURATION respectively if the user hasn't passed
// an override.
//
// TODO(https://github.com/apache/beam/issues/20928): The implementation doesn't currently
// work with merging windows.
Expand All @@ -919,9 +964,9 @@ public PCollection<List<FileResult<DestinationT>>> expand(PCollection<UserT> inp
.setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
.apply(
"ShardAndBatch",
GroupIntoBatches.<Integer, UserT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
.withByteSize(FILE_TRIGGERING_BYTE_COUNT)
.withMaxBufferingDuration(FILE_TRIGGERING_RECORD_BUFFERING_DURATION)
GroupIntoBatches.<Integer, UserT>ofSize(getFileTriggeringRecordCount())
.withByteSize(getFileTriggeringByteCount())
.withMaxBufferingDuration(getFileTriggeringRecordBufferingDuration())
.withShardedKey())
.setCoder(
KvCoder.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,35 @@ public void testWriteDisplayData() {
assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "UNCOMPRESSED"));
}

@Test
public void testAutoShardedWriteDisplayData() {
// TODO: Java core test failing on windows, https://github.com/apache/beam/issues/20467
assumeFalse(SystemUtils.IS_OS_WINDOWS);
TextIO.Write write =
TextIO.write()
.to("/foo")
.withSuffix("bar")
.withShardNameTemplate("-SS-of-NN-")
.withAutoSharding()
.withFileTriggeringByteCount(1000)
.withFileTriggeringRecordBufferingDuration(Duration.standardSeconds(5))
.withFileTriggeringRecordCount(100)
.withFooter("myFooter")
.withHeader("myHeader");

DisplayData displayData = DisplayData.from(write);

assertThat(displayData, hasDisplayItem("filePrefix", "/foo"));
assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
assertThat(displayData, hasDisplayItem("fileHeader", "myHeader"));
assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
assertThat(displayData, hasDisplayItem("triggeringByteCount", 1000));
assertThat(displayData, hasDisplayItem("triggeringRecordCount", 100));
assertThat(displayData, hasDisplayItem("triggeringRecordBufferingDuration", 5));
assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "UNCOMPRESSED"));
}

@Test
public void testWriteDisplayDataValidateThenHeader() {
TextIO.Write write = TextIO.write().to("foo").withHeader("myHeader");
Expand Down