Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-25583] Support compacting small files for FileSink. #18680

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -225,7 +225,6 @@ org.apache.flink.streaming.api.functions.sink.filesystem.AbstractPartFileWriter
org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter$PendingFile does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$PendingFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedInProgressFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
Expand Down
Expand Up @@ -23,11 +23,21 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.connector.file.sink.committer.FileCommitter;
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.connector.file.sink.compactor.FileCompactor;
import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinatorFactory;
import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinatorStateHandlerFactory;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorFactory;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandlerFactory;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestTypeInfo;
import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
import org.apache.flink.connector.file.sink.writer.FileWriter;
import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
Expand All @@ -36,6 +46,10 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
Expand All @@ -46,6 +60,7 @@
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.types.Either;
import org.apache.flink.util.FlinkRuntimeException;

import java.io.IOException;
Expand Down Expand Up @@ -109,7 +124,8 @@
public class FileSink<IN>
implements StatefulSink<IN, FileWriterBucketState>,
TwoPhaseCommittingSink<IN, FileSinkCommittable>,
WithCompatibleState {
WithCompatibleState,
WithPreCommitTopology<IN, FileSinkCommittable> {

private final BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuilder;

Expand Down Expand Up @@ -177,6 +193,74 @@ public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(
basePath, bulkWriterFactory, new DateTimeBucketAssigner<>());
}

@Override
public DataStream<CommittableMessage<FileSinkCommittable>> addPreCommitTopology(
DataStream<CommittableMessage<FileSinkCommittable>> committableStream) {
FileCompactStrategy strategy = bucketsBuilder.getCompactStrategy();
if (strategy == null) {
// not enabled, handlers will be added to process the remaining states of the compact
// coordinator and the compactor operators.
SingleOutputStreamOperator<
Either<CommittableMessage<FileSinkCommittable>, CompactorRequest>>
coordinatorOp =
committableStream
.forward()
.transform(
"CompactorCoordinator",
new EitherTypeInfo<>(
committableStream.getType(),
new CompactorRequestTypeInfo(
bucketsBuilder
::getCommittableSerializer)),
new CompactCoordinatorStateHandlerFactory(
bucketsBuilder::getCommittableSerializer))
.setParallelism(committableStream.getParallelism())
.uid("FileSinkCompactorCoordinator");

return coordinatorOp
.forward()
.transform(
"CompactorOperator",
committableStream.getType(),
new CompactorOperatorStateHandlerFactory(
bucketsBuilder::getCommittableSerializer,
bucketsBuilder::createBucketWriter))
.setParallelism(committableStream.getParallelism())
.uid("FileSinkCompactorOperator");
}

// explicitly rebalance here is required, or the partitioner will be forward, which is in
// fact the partitioner from the writers to the committers
SingleOutputStreamOperator<CompactorRequest> coordinatorOp =
committableStream
.rebalance()
.transform(
pltbkd marked this conversation as resolved.
Show resolved Hide resolved
"CompactorCoordinator",
new CompactorRequestTypeInfo(
bucketsBuilder::getCommittableSerializer),
new CompactCoordinatorFactory(
strategy, bucketsBuilder::getCommittableSerializer))
.setParallelism(1)
.uid("FileSinkCompactorCoordinator");

// parallelism of the compactors is not configurable at present, since it must be identical
// to that of the committers, or the committable summary and the committables may be
// distributed to different committers, which will cause a failure
TypeInformation<CommittableMessage<FileSinkCommittable>> committableType =
committableStream.getType();
return coordinatorOp
.transform(
"CompactorOperator",
committableType,
new CompactorOperatorFactory(
strategy,
bucketsBuilder.getFileCompactor(),
bucketsBuilder::getCommittableSerializer,
bucketsBuilder::createBucketWriter))
.setParallelism(committableStream.getParallelism())
.uid("FileSinkCompactorOperator");
}

/** The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}. */
@Internal
private abstract static class BucketsBuilder<IN, T extends BucketsBuilder<IN, T>>
Expand Down Expand Up @@ -204,6 +288,15 @@ abstract SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializ
@Internal
abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer()
throws IOException;

@Internal
abstract FileCompactStrategy getCompactStrategy();

@Internal
abstract FileCompactor getFileCompactor();

@Internal
abstract BucketWriter<IN, String> createBucketWriter() throws IOException;
}

/** A builder for configuring the sink for row-wise encoding formats. */
Expand All @@ -226,6 +319,10 @@ public static class RowFormatBuilder<IN, T extends RowFormatBuilder<IN, T>>

private OutputFileConfig outputFileConfig;

private FileCompactStrategy compactStrategy;

private FileCompactor fileCompactor;

protected RowFormatBuilder(
Path basePath, Encoder<IN> encoder, BucketAssigner<IN, String> bucketAssigner) {
this(
Expand Down Expand Up @@ -275,21 +372,40 @@ public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
return self();
}

public T enableCompact(final FileCompactStrategy strategy, final FileCompactor compactor) {
this.compactStrategy = checkNotNull(strategy);
this.fileCompactor = checkNotNull(compactor);
return self();
}

/** Creates the actual sink. */
public FileSink<IN> build() {
return new FileSink<>(this);
}

@Override
FileWriter<IN> createWriter(InitContext context) throws IOException {
OutputFileConfig writerFileConfig;
if (compactStrategy == null) {
writerFileConfig = outputFileConfig;
} else {
// Compaction is enabled. We always commit before compacting, so the file written by
// writer should be hid.
writerFileConfig =
OutputFileConfig.builder()
.withPartPrefix("." + outputFileConfig.getPartPrefix())
.withPartSuffix(outputFileConfig.getPartSuffix())
.build();
}

return new FileWriter<>(
basePath,
context.metricGroup(),
bucketAssigner,
bucketFactory,
createBucketWriter(),
rollingPolicy,
outputFileConfig,
writerFileConfig,
context.getProcessingTimeService(),
bucketCheckInterval);
}
Expand All @@ -299,6 +415,16 @@ FileCommitter createCommitter() throws IOException {
return new FileCommitter(createBucketWriter());
}

@Override
FileCompactStrategy getCompactStrategy() {
return compactStrategy;
}

@Override
FileCompactor getFileCompactor() {
return fileCompactor;
}

@Override
SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializer()
throws IOException {
Expand All @@ -319,7 +445,7 @@ SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer()
bucketWriter.getProperties().getInProgressFileRecoverableSerializer());
}

private BucketWriter<IN, String> createBucketWriter() throws IOException {
BucketWriter<IN, String> createBucketWriter() throws IOException {
return new RowWiseBucketWriter<>(
FileSystem.get(basePath.toUri()).createRecoverableWriter(), encoder);
}
Expand Down Expand Up @@ -357,6 +483,10 @@ public static class BulkFormatBuilder<IN, T extends BulkFormatBuilder<IN, T>>

private OutputFileConfig outputFileConfig;

private FileCompactStrategy compactStrategy;

private FileCompactor fileCompactor;

protected BulkFormatBuilder(
Path basePath,
BulkWriter.Factory<IN> writerFactory,
Expand Down Expand Up @@ -424,21 +554,40 @@ public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
outputFileConfig);
}

public T enableCompact(final FileCompactStrategy strategy, final FileCompactor compactor) {
this.compactStrategy = checkNotNull(strategy);
this.fileCompactor = checkNotNull(compactor);
return self();
}

/** Creates the actual sink. */
public FileSink<IN> build() {
return new FileSink<>(this);
}

@Override
FileWriter<IN> createWriter(InitContext context) throws IOException {
OutputFileConfig writerFileConfig;
if (compactStrategy == null) {
writerFileConfig = outputFileConfig;
} else {
// Compaction is enabled. We always commit before compacting, so the file written by
// writer should be hid.
writerFileConfig =
OutputFileConfig.builder()
.withPartPrefix("." + outputFileConfig.getPartPrefix())
.withPartSuffix(outputFileConfig.getPartSuffix())
.build();
}

return new FileWriter<>(
basePath,
context.metricGroup(),
bucketAssigner,
bucketFactory,
createBucketWriter(),
rollingPolicy,
outputFileConfig,
writerFileConfig,
context.getProcessingTimeService(),
bucketCheckInterval);
}
Expand All @@ -448,6 +597,16 @@ FileCommitter createCommitter() throws IOException {
return new FileCommitter(createBucketWriter());
}

@Override
FileCompactStrategy getCompactStrategy() {
return compactStrategy;
}

@Override
FileCompactor getFileCompactor() {
return fileCompactor;
}

@Override
SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializer()
throws IOException {
Expand All @@ -468,7 +627,7 @@ SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer()
bucketWriter.getProperties().getInProgressFileRecoverableSerializer());
}

private BucketWriter<IN, String> createBucketWriter() throws IOException {
BucketWriter<IN, String> createBucketWriter() throws IOException {
return new BulkBucketWriter<>(
FileSystem.get(basePath.toUri()).createRecoverableWriter(), writerFactory);
}
Expand Down