From 1b0baa162bd87efd69040eb787de8d6624f14c85 Mon Sep 17 00:00:00 2001 From: kkloudas Date: Mon, 30 Jul 2018 11:37:41 +0200 Subject: [PATCH] [FLINK-10029][DataStream API] Refactoring the StreamingFileSink code. --- .../api/functions/sink/filesystem/Bucket.java | 281 +++++++++++------- .../{Bucketer.java => BucketAssigner.java} | 30 +- .../sink/filesystem/BucketFactory.java | 4 +- .../sink/filesystem/BucketState.java | 56 ++-- .../filesystem/BucketStateSerializer.java | 10 +- .../functions/sink/filesystem/Buckets.java | 222 +++++++------- .../sink/filesystem/BulkPartWriter.java | 2 +- ...ory.java => DefaultBucketFactoryImpl.java} | 14 +- .../sink/filesystem/PartFileInfo.java | 2 +- .../sink/filesystem/RowWisePartWriter.java | 2 +- .../sink/filesystem/StreamingFileSink.java | 78 ++--- .../BasePathBucketAssigner.java} | 12 +- .../DateTimeBucketAssigner.java} | 18 +- .../SimpleVersionedStringSerializer.java | 4 +- .../rollingpolicies/DefaultRollingPolicy.java | 35 ++- .../OnCheckpointRollingPolicy.java | 8 + .../filesystem/BucketStateSerializerTest.java | 12 +- .../sink/filesystem/BucketsTest.java | 6 +- .../sink/filesystem/BulkWriterTest.java | 2 +- .../LocalStreamingFileSinkTest.java | 18 +- .../sink/filesystem/RollingPolicyTest.java | 8 +- .../functions/sink/filesystem/TestUtils.java | 16 +- 22 files changed, 457 insertions(+), 383 deletions(-) rename flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/{Bucketer.java => BucketAssigner.java} (69%) rename flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/{DefaultBucketFactory.java => DefaultBucketFactoryImpl.java} (85%) rename flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/{bucketers/BasePathBucketer.java => bucketassigners/BasePathBucketAssigner.java} (84%) rename flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/{bucketers/DateTimeBucketer.java => bucketassigners/DateTimeBucketAssigner.java} (87%) rename flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/{bucketers => bucketassigners}/SimpleVersionedStringSerializer.java (96%) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java index a350096e38b31..fbedaac28e659 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.util.Preconditions; @@ -34,11 +34,11 @@ /** * A bucket is the directory organization of the output of the {@link StreamingFileSink}. * - *

For each incoming element in the {@code BucketingSink}, the user-specified - * {@link Bucketer Bucketer} is - * queried to see in which bucket this element should be written to. + *

For each incoming element in the {@code StreamingFileSink}, the user-specified + * {@link BucketAssigner Bucketer} is queried to see in which bucket this element should + * be written to. */ -@PublicEvolving +@Internal public class Bucket { private static final String PART_PREFIX = "part"; @@ -53,57 +53,27 @@ public class Bucket { private final RecoverableWriter fsWriter; - private final Map> pendingPerCheckpoint = new HashMap<>(); + private final RollingPolicy rollingPolicy; - private long partCounter; - - private PartFileWriter currentPart; - - private List pending; - - /** - * Constructor to restore a bucket from checkpointed state. - */ - public Bucket( - RecoverableWriter fsWriter, - int subtaskIndex, - long initialPartCounter, - PartFileWriter.PartFileFactory partFileFactory, - BucketState bucketState) throws IOException { + private final Map> pendingPartsPerCheckpoint = new HashMap<>(); - this(fsWriter, subtaskIndex, bucketState.getBucketId(), bucketState.getBucketPath(), initialPartCounter, partFileFactory); - - // the constructor must have already initialized the filesystem writer - Preconditions.checkState(fsWriter != null); - - // we try to resume the previous in-progress file, if the filesystem - // supports such operation. If not, we just commit the file and start fresh. + private long partCounter; - final RecoverableWriter.ResumeRecoverable resumable = bucketState.getInProgress(); - if (resumable != null) { - currentPart = partFileFactory.resumeFrom( - bucketId, fsWriter, resumable, bucketState.getCreationTime()); - } + private PartFileWriter inProgressPart; - // we commit pending files for previous checkpoints to the last successful one - // (from which we are recovering from) - for (List commitables: bucketState.getPendingPerCheckpoint().values()) { - for (RecoverableWriter.CommitRecoverable commitable: commitables) { - fsWriter.recoverForCommit(commitable).commitAfterRecovery(); - } - } - } + private List pendingPartsForCurrentCheckpoint; /** * Constructor to create a new empty bucket. */ - public Bucket( - RecoverableWriter fsWriter, - int subtaskIndex, - BucketID bucketId, - Path bucketPath, - long initialPartCounter, - PartFileWriter.PartFileFactory partFileFactory) { + private Bucket( + final RecoverableWriter fsWriter, + final int subtaskIndex, + final BucketID bucketId, + final Path bucketPath, + final long initialPartCounter, + final PartFileWriter.PartFileFactory partFileFactory, + final RollingPolicy rollingPolicy) { this.fsWriter = Preconditions.checkNotNull(fsWriter); this.subtaskIndex = subtaskIndex; @@ -111,117 +81,214 @@ public Bucket( this.bucketPath = Preconditions.checkNotNull(bucketPath); this.partCounter = initialPartCounter; this.partFileFactory = Preconditions.checkNotNull(partFileFactory); + this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy); - this.pending = new ArrayList<>(); + this.pendingPartsForCurrentCheckpoint = new ArrayList<>(); } /** - * Gets the information available for the currently - * open part file, i.e. the one we are currently writing to. - * - *

This will be null if there is no currently open part file. This - * is the case when we have a new, just created bucket or a bucket - * that has not received any data after the closing of its previously - * open in-progress file due to the specified rolling policy. - * - * @return The information about the currently in-progress part file - * or {@code null} if there is no open part file. + * Constructor to restore a bucket from checkpointed state. */ - public PartFileInfo getInProgressPartInfo() { - return currentPart; + private Bucket( + final RecoverableWriter fsWriter, + final int subtaskIndex, + final long initialPartCounter, + final PartFileWriter.PartFileFactory partFileFactory, + final RollingPolicy rollingPolicy, + final BucketState bucketState) throws IOException { + + this( + fsWriter, + subtaskIndex, + bucketState.getBucketId(), + bucketState.getBucketPath(), + initialPartCounter, + partFileFactory, + rollingPolicy); + + restoreInProgressFile(bucketState); + commitRecoveredPendingFiles(bucketState); + } + + private void restoreInProgressFile(final BucketState state) throws IOException { + // we try to resume the previous in-progress file + if (state.hasInProgressResumableFile()) { + final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile(); + inProgressPart = partFileFactory.resumeFrom( + bucketId, fsWriter, resumable, state.getInProgressFileCreationTime()); + } } - public BucketID getBucketId() { + private void commitRecoveredPendingFiles(final BucketState state) throws IOException { + + // we commit pending files for checkpoints that precess the last successful one, from which we are recovering + for (List committables: state.getCommittableFilesPerCheckpoint().values()) { + for (RecoverableWriter.CommitRecoverable committable: committables) { + fsWriter.recoverForCommit(committable).commitAfterRecovery(); + } + } + } + + BucketID getBucketId() { return bucketId; } - public Path getBucketPath() { + Path getBucketPath() { return bucketPath; } - public long getPartCounter() { + long getPartCounter() { return partCounter; } - public boolean isActive() { - return currentPart != null || !pending.isEmpty() || !pendingPerCheckpoint.isEmpty(); + boolean isActive() { + return inProgressPart != null || !pendingPartsForCurrentCheckpoint.isEmpty() || !pendingPartsPerCheckpoint.isEmpty(); + } + + void merge(final Bucket bucket) throws IOException { + Preconditions.checkNotNull(bucket); + Preconditions.checkState(Objects.equals(bucket.bucketPath, bucketPath)); + + // There should be no pending files in the "to-merge" states. + // The reason is that: + // 1) the pendingPartsForCurrentCheckpoint is emptied whenever we take a snapshot (see prepareBucketForCheckpointing()). + // So a snapshot, including the one we are recovering from, will never contain such files. + // 2) the files in pendingPartsPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()). + + Preconditions.checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty()); + Preconditions.checkState(bucket.pendingPartsPerCheckpoint.isEmpty()); + + RecoverableWriter.CommitRecoverable committable = bucket.closePartFile(); + if (committable != null) { + pendingPartsForCurrentCheckpoint.add(committable); + } } void write(IN element, long currentTime) throws IOException { - Preconditions.checkState(currentPart != null, "bucket has been closed"); - currentPart.write(element, currentTime); + if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) { + rollPartFile(currentTime); + } + inProgressPart.write(element, currentTime); } - void rollPartFile(final long currentTime) throws IOException { + private void rollPartFile(final long currentTime) throws IOException { closePartFile(); - currentPart = partFileFactory.openNew(bucketId, fsWriter, getNewPartPath(), currentTime); + inProgressPart = partFileFactory.openNew(bucketId, fsWriter, assembleNewPartPath(), currentTime); partCounter++; } - void merge(final Bucket bucket) throws IOException { - Preconditions.checkNotNull(bucket); - Preconditions.checkState(Objects.equals(bucket.getBucketPath(), bucketPath)); + private Path assembleNewPartPath() { + return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter); + } - // there should be no pending files in the "to-merge" states. - Preconditions.checkState(bucket.pending.isEmpty()); - Preconditions.checkState(bucket.pendingPerCheckpoint.isEmpty()); + private RecoverableWriter.CommitRecoverable closePartFile() throws IOException { + RecoverableWriter.CommitRecoverable committable = null; + if (inProgressPart != null) { + committable = inProgressPart.closeForCommit(); + pendingPartsForCurrentCheckpoint.add(committable); + inProgressPart = null; + } + return committable; + } - RecoverableWriter.CommitRecoverable commitable = bucket.closePartFile(); - if (commitable != null) { - pending.add(commitable); + void disposePartFile() { + if (inProgressPart != null) { + inProgressPart.dispose(); } } - RecoverableWriter.CommitRecoverable closePartFile() throws IOException { - RecoverableWriter.CommitRecoverable commitable = null; - if (currentPart != null) { - commitable = currentPart.closeForCommit(); - pending.add(commitable); - currentPart = null; + BucketState onReceptionOfCheckpoint(long checkpointId) throws IOException { + prepareBucketForCheckpointing(checkpointId); + + RecoverableWriter.ResumeRecoverable inProgressResumable = null; + long inProgressFileCreationTime = Long.MAX_VALUE; + + if (inProgressPart != null) { + inProgressResumable = inProgressPart.persist(); + inProgressFileCreationTime = inProgressPart.getCreationTime(); } - return commitable; + + return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint); } - public void dispose() { - if (currentPart != null) { - currentPart.dispose(); + private void prepareBucketForCheckpointing(long checkpointId) throws IOException { + if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) { + closePartFile(); + } + + if (!pendingPartsForCurrentCheckpoint.isEmpty()) { + pendingPartsPerCheckpoint.put(checkpointId, pendingPartsForCurrentCheckpoint); + pendingPartsForCurrentCheckpoint = new ArrayList<>(); } } - public void onCheckpointAcknowledgment(long checkpointId) throws IOException { + void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException { Preconditions.checkNotNull(fsWriter); Iterator>> it = - pendingPerCheckpoint.entrySet().iterator(); + pendingPartsPerCheckpoint.entrySet().iterator(); while (it.hasNext()) { Map.Entry> entry = it.next(); + if (entry.getKey() <= checkpointId) { - for (RecoverableWriter.CommitRecoverable commitable : entry.getValue()) { - fsWriter.recoverForCommit(commitable).commit(); + for (RecoverableWriter.CommitRecoverable committable : entry.getValue()) { + fsWriter.recoverForCommit(committable).commit(); } it.remove(); } } } - public BucketState onCheckpoint(long checkpointId) throws IOException { - RecoverableWriter.ResumeRecoverable resumable = null; - long creationTime = Long.MAX_VALUE; - - if (currentPart != null) { - resumable = currentPart.persist(); - creationTime = currentPart.getCreationTime(); + void onProcessingTime(long timestamp) throws IOException { + if (inProgressPart != null && rollingPolicy.shouldRollOnProcessingTime(inProgressPart, timestamp)) { + closePartFile(); } + } - if (!pending.isEmpty()) { - pendingPerCheckpoint.put(checkpointId, pending); - pending = new ArrayList<>(); - } - return new BucketState<>(bucketId, bucketPath, creationTime, resumable, pendingPerCheckpoint); + // --------------------------- Static Factory Methods ----------------------------- + + /** + * Creates a new empty {@code Bucket}. + * @param fsWriter the filesystem-specific {@link RecoverableWriter}. + * @param subtaskIndex the index of the subtask creating the bucket. + * @param bucketId the identifier of the bucket, as returned by the {@link BucketAssigner}. + * @param bucketPath the path to where the part files for the bucket will be written to. + * @param initialPartCounter the initial counter for the part files of the bucket. + * @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers. + * @param the type of input elements to the sink. + * @param the type of the identifier of the bucket, as returned by the {@link BucketAssigner} + * @return The new Bucket. + */ + static Bucket getNew( + final RecoverableWriter fsWriter, + final int subtaskIndex, + final BucketID bucketId, + final Path bucketPath, + final long initialPartCounter, + final PartFileWriter.PartFileFactory partFileFactory, + final RollingPolicy rollingPolicy) { + return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy); } - private Path getNewPartPath() { - return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter); + /** + * Restores a {@code Bucket} from the state included in the provided {@link BucketState}. + * @param fsWriter the filesystem-specific {@link RecoverableWriter}. + * @param subtaskIndex the index of the subtask creating the bucket. + * @param initialPartCounter the initial counter for the part files of the bucket. + * @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers. + * @param bucketState the initial state of the restored bucket. + * @param the type of input elements to the sink. + * @param the type of the identifier of the bucket, as returned by the {@link BucketAssigner} + * @return The restored Bucket. + */ + static Bucket restore( + final RecoverableWriter fsWriter, + final int subtaskIndex, + final long initialPartCounter, + final PartFileWriter.PartFileFactory partFileFactory, + final RollingPolicy rollingPolicy, + final BucketState bucketState) throws IOException { + return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.java similarity index 69% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucketer.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.java index a7052cb219ba5..a9b0200b110ce 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucketer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.java @@ -28,49 +28,45 @@ import java.io.Serializable; /** - * A bucketer is used with a {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink} - * to determine the {@link org.apache.flink.streaming.api.functions.sink.filesystem.Bucket} each incoming element + * A BucketAssigner is used with a {@link StreamingFileSink} to determine the {@link Bucket} each incoming element * should be put into. * *

The {@code StreamingFileSink} can be writing to many buckets at a time, and it is responsible for managing - * a set of active buckets. Whenever a new element arrives it will ask the {@code Bucketer} for the bucket the - * element should fall in. The {@code Bucketer} can, for example, determine buckets based on system time. + * a set of active buckets. Whenever a new element arrives it will ask the {@code BucketAssigner} for the bucket the + * element should fall in. The {@code BucketAssigner} can, for example, determine buckets based on system time. * * @param The type of input elements. - * @param The type of the object returned by the {@link #getBucketId(Object, Bucketer.Context)}. This has to have + * @param The type of the object returned by the {@link #getBucketId(Object, BucketAssigner.Context)}. This has to have * a correct {@link #hashCode()} and {@link #equals(Object)} method. In addition, the {@link Path} * to the created bucket will be the result of the {@link #toString()} of this method, appended to - * the {@code basePath} specified in the - * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink} + * the {@code basePath} specified in the {@link StreamingFileSink StreamingFileSink}. */ @PublicEvolving -public interface Bucketer extends Serializable { +public interface BucketAssigner extends Serializable { /** * Returns the identifier of the bucket the provided element should be put into. * @param element The current element being processed. - * @param context The {@link SinkFunction.Context context} used by the - * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink sink}. + * @param context The {@link SinkFunction.Context context} used by the {@link StreamingFileSink sink}. * * @return A string representing the identifier of the bucket the element should be put into. - * This actual path to the bucket will result from the concatenation of the returned string - * and the {@code base path} provided during the initialization of the - * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink sink}. + * The actual path to the bucket will result from the concatenation of the returned string + * and the {@code base path} provided during the initialization of the {@link StreamingFileSink sink}. */ - BucketID getBucketId(IN element, Bucketer.Context context); + BucketID getBucketId(IN element, BucketAssigner.Context context); /** * @return A {@link SimpleVersionedSerializer} capable of serializing/deserializing the elements * of type {@code BucketID}. That is the type of the objects returned by the - * {@link #getBucketId(Object, Bucketer.Context)}. + * {@link #getBucketId(Object, BucketAssigner.Context)}. */ SimpleVersionedSerializer getSerializer(); /** - * Context that the {@link Bucketer} can use for getting additional data about + * Context that the {@link BucketAssigner} can use for getting additional data about * an input record. * - *

The context is only valid for the duration of a {@link Bucketer#getBucketId(Object, Bucketer.Context)} call. + *

The context is only valid for the duration of a {@link BucketAssigner#getBucketId(Object, BucketAssigner.Context)} call. * Do not store the context and use afterwards! */ @PublicEvolving diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java index 0c6b587b421f0..2306fafff6adb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java @@ -37,12 +37,14 @@ Bucket getNewBucket( final BucketID bucketId, final Path bucketPath, final long initialPartCounter, - final PartFileWriter.PartFileFactory partFileWriterFactory) throws IOException; + final PartFileWriter.PartFileFactory partFileWriterFactory, + final RollingPolicy rollingPolicy) throws IOException; Bucket restoreBucket( final RecoverableWriter fsWriter, final int subtaskIndex, final long initialPartCounter, final PartFileWriter.PartFileFactory partFileWriterFactory, + final RollingPolicy rollingPolicy, final BucketState bucketState) throws IOException; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java index bb49e3a5d2593..18ef32f9a26b1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java @@ -32,64 +32,68 @@ * The state of the {@link Bucket} that is to be checkpointed. */ @Internal -public class BucketState { +class BucketState { private final BucketID bucketId; - /** - * The base path for the bucket, i.e. the directory where all the part files are stored. - */ + /** The directory where all the part files of the bucket are stored. */ private final Path bucketPath; /** - * The creation time of the currently open part file, or {@code Long.MAX_VALUE} if there is no open part file. + * The creation time of the currently open part file, + * or {@code Long.MAX_VALUE} if there is no open part file. */ - private final long creationTime; + private final long inProgressFileCreationTime; /** - * A {@link RecoverableWriter.ResumeRecoverable} for the currently open part file, or null - * if there is no currently open part file. + * A {@link RecoverableWriter.ResumeRecoverable} for the currently open + * part file, or null if there is no currently open part file. */ @Nullable - private final RecoverableWriter.ResumeRecoverable inProgress; + private final RecoverableWriter.ResumeRecoverable inProgressResumableFile; /** - * The {@link RecoverableWriter.CommitRecoverable files} pending to be committed, organized by checkpoint id. + * The {@link RecoverableWriter.CommitRecoverable files} pending to be + * committed, organized by checkpoint id. */ - private final Map> pendingPerCheckpoint; + private final Map> committableFilesPerCheckpoint; - public BucketState( + BucketState( final BucketID bucketId, final Path bucketPath, - final long creationTime, - @Nullable final RecoverableWriter.ResumeRecoverable inProgress, - final Map> pendingPerCheckpoint + final long inProgressFileCreationTime, + @Nullable final RecoverableWriter.ResumeRecoverable inProgressResumableFile, + final Map> pendingCommittablesPerCheckpoint ) { this.bucketId = Preconditions.checkNotNull(bucketId); this.bucketPath = Preconditions.checkNotNull(bucketPath); - this.creationTime = creationTime; - this.inProgress = inProgress; - this.pendingPerCheckpoint = Preconditions.checkNotNull(pendingPerCheckpoint); + this.inProgressFileCreationTime = inProgressFileCreationTime; + this.inProgressResumableFile = inProgressResumableFile; + this.committableFilesPerCheckpoint = Preconditions.checkNotNull(pendingCommittablesPerCheckpoint); } - public BucketID getBucketId() { + BucketID getBucketId() { return bucketId; } - public Path getBucketPath() { + Path getBucketPath() { return bucketPath; } - public long getCreationTime() { - return creationTime; + long getInProgressFileCreationTime() { + return inProgressFileCreationTime; + } + + boolean hasInProgressResumableFile() { + return inProgressResumableFile != null; } @Nullable - public RecoverableWriter.ResumeRecoverable getInProgress() { - return inProgress; + RecoverableWriter.ResumeRecoverable getInProgressResumableFile() { + return inProgressResumableFile; } - public Map> getPendingPerCheckpoint() { - return pendingPerCheckpoint; + Map> getCommittableFilesPerCheckpoint() { + return committableFilesPerCheckpoint; } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java index cf9b8057bd107..04de2462d6709 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java @@ -90,20 +90,20 @@ public BucketState deserialize(int version, byte[] serialized) throws void serializeV1(BucketState state, DataOutputView out) throws IOException { SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), out); out.writeUTF(state.getBucketPath().toString()); - out.writeLong(state.getCreationTime()); + out.writeLong(state.getInProgressFileCreationTime()); // put the current open part file - final RecoverableWriter.ResumeRecoverable currentPart = state.getInProgress(); - if (currentPart != null) { + if (state.hasInProgressResumableFile()) { + final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile(); out.writeBoolean(true); - SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, currentPart, out); + SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, resumable, out); } else { out.writeBoolean(false); } // put the map of pending files per checkpoint - final Map> pendingCommitters = state.getPendingPerCheckpoint(); + final Map> pendingCommitters = state.getCommittableFilesPerCheckpoint(); // manually keep the version here to safe some bytes out.writeInt(commitableSerializer.getVersion()); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java index e62c425fc2feb..405285ea6204c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.ListState; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -26,9 +27,6 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.annotation.Nullable; import java.io.IOException; @@ -44,19 +42,18 @@ * this class to the lifecycle of the operator. * * @param The type of input elements. - * @param The type of ids for the buckets, as returned by the {@link Bucketer}. + * @param The type of ids for the buckets, as returned by the {@link BucketAssigner}. */ +@Internal public class Buckets { - private static final Logger LOG = LoggerFactory.getLogger(Buckets.class); - // ------------------------ configuration fields -------------------------- private final Path basePath; private final BucketFactory bucketFactory; - private final Bucketer bucketer; + private final BucketAssigner bucketAssigner; private final PartFileWriter.PartFileFactory partFileWriterFactory; @@ -70,33 +67,33 @@ public class Buckets { private final Map> activeBuckets; - private long maxPartCounterUsed; + private long maxPartCounter; - private final RecoverableWriter fileSystemWriter; + private final RecoverableWriter fsWriter; // --------------------------- State Related Fields ----------------------------- private final BucketStateSerializer bucketStateSerializer; /** - * A private constructor creating a new empty bucket manager. + * A constructor creating a new empty bucket manager. * * @param basePath The base path for our buckets. - * @param bucketer The {@link Bucketer} provided by the user. + * @param bucketAssigner The {@link BucketAssigner} provided by the user. * @param bucketFactory The {@link BucketFactory} to be used to create buckets. * @param partFileWriterFactory The {@link PartFileWriter.PartFileFactory} to be used when writing data. * @param rollingPolicy The {@link RollingPolicy} as specified by the user. */ Buckets( final Path basePath, - final Bucketer bucketer, + final BucketAssigner bucketAssigner, final BucketFactory bucketFactory, final PartFileWriter.PartFileFactory partFileWriterFactory, final RollingPolicy rollingPolicy, final int subtaskIndex) throws IOException { this.basePath = Preconditions.checkNotNull(basePath); - this.bucketer = Preconditions.checkNotNull(bucketer); + this.bucketAssigner = Preconditions.checkNotNull(bucketAssigner); this.bucketFactory = Preconditions.checkNotNull(bucketFactory); this.partFileWriterFactory = Preconditions.checkNotNull(partFileWriterFactory); this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy); @@ -105,75 +102,86 @@ public class Buckets { this.activeBuckets = new HashMap<>(); this.bucketerContext = new Buckets.BucketerContext(); - this.fileSystemWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter(); + this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter(); this.bucketStateSerializer = new BucketStateSerializer<>( - fileSystemWriter.getResumeRecoverableSerializer(), - fileSystemWriter.getCommitRecoverableSerializer(), - bucketer.getSerializer() + fsWriter.getResumeRecoverableSerializer(), + fsWriter.getCommitRecoverableSerializer(), + bucketAssigner.getSerializer() ); - this.maxPartCounterUsed = 0L; + this.maxPartCounter = 0L; } /** * Initializes the state after recovery from a failure. + * + *

During this process: + *

    + *
  1. we set the initial value for part counter to the maximum value used before across all tasks and buckets. + * This guarantees that we do not overwrite valid data,
  2. + *
  3. we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),
  4. + *
  5. we resume writing to the previous in-progress file of each bucket, and
  6. + *
  7. if we receive multiple states for the same bucket, we merge them.
  8. + *
* @param bucketStates the state holding recovered state about active buckets. * @param partCounterState the state holding the max previously used part counters. - * @throws Exception + * @throws Exception if anything goes wrong during retrieving the state or restoring/committing of any + * in-progress/pending part files */ void initializeState(final ListState bucketStates, final ListState partCounterState) throws Exception { + initializePartCounter(partCounterState); + initializeActiveBuckets(bucketStates); + } - // When resuming after a failure: - // 1) we get the max part counter used before in order to make sure that we do not overwrite valid data - // 2) we commit any pending files for previous checkpoints (previous to the last successful one) - // 3) we resume writing to the previous in-progress file of each bucket, and - // 4) if we receive multiple states for the same bucket, we merge them. - - // get the max counter + private void initializePartCounter(final ListState partCounterState) throws Exception { long maxCounter = 0L; for (long partCounter: partCounterState.get()) { maxCounter = Math.max(partCounter, maxCounter); } - maxPartCounterUsed = maxCounter; + maxPartCounter = maxCounter; + } - // get the restored buckets - for (byte[] recoveredState : bucketStates.get()) { - final BucketState bucketState = SimpleVersionedSerialization.readVersionAndDeSerialize( - bucketStateSerializer, recoveredState); + private void initializeActiveBuckets(final ListState bucketStates) throws Exception { + for (byte[] serializedRecoveredState : bucketStates.get()) { + final BucketState recoveredState = + SimpleVersionedSerialization.readVersionAndDeSerialize( + bucketStateSerializer, serializedRecoveredState); + handleRestoredBucketState(recoveredState); + } + } - final BucketID bucketId = bucketState.getBucketId(); + private void handleRestoredBucketState(final BucketState recoveredState) throws Exception { + final BucketID bucketId = recoveredState.getBucketId(); - LOG.info("Recovered bucket for {}", bucketId); + final Bucket restoredBucket = bucketFactory + .restoreBucket( + fsWriter, + subtaskIndex, + maxPartCounter, + partFileWriterFactory, + rollingPolicy, + recoveredState + ); - final Bucket restoredBucket = bucketFactory.restoreBucket( - fileSystemWriter, - subtaskIndex, - maxPartCounterUsed, - partFileWriterFactory, - bucketState - ); - - final Bucket existingBucket = activeBuckets.get(bucketId); - if (existingBucket == null) { - activeBuckets.put(bucketId, restoredBucket); - } else { - existingBucket.merge(restoredBucket); - } + updateActiveBucketId(bucketId, restoredBucket); + } - if (LOG.isDebugEnabled()) { - LOG.debug("{} idx {} restored state for bucket {}", getClass().getSimpleName(), - subtaskIndex, assembleBucketPath(bucketId)); - } + private void updateActiveBucketId(final BucketID bucketId, final Bucket restoredBucket) throws IOException { + final Bucket bucket = activeBuckets.get(bucketId); + if (bucket != null) { + bucket.merge(restoredBucket); + } else { + activeBuckets.put(bucketId, restoredBucket); } } - void publishUpToCheckpoint(long checkpointId) throws IOException { + void commitUpToCheckpoint(final long checkpointId) throws IOException { final Iterator>> activeBucketIt = activeBuckets.entrySet().iterator(); while (activeBucketIt.hasNext()) { - Bucket bucket = activeBucketIt.next().getValue(); - bucket.onCheckpointAcknowledgment(checkpointId); + final Bucket bucket = activeBucketIt.next().getValue(); + bucket.onSuccessfulCompletionOfCheckpoint(checkpointId); if (!bucket.isActive()) { // We've dealt with all the pending files and the writer for this bucket is not currently open. @@ -185,35 +193,32 @@ void publishUpToCheckpoint(long checkpointId) throws IOException { void snapshotState( final long checkpointId, - final ListState bucketStates, - final ListState partCounterState) throws Exception { + final ListState bucketStatesContainer, + final ListState partCounterStateContainer) throws Exception { Preconditions.checkState( - fileSystemWriter != null && bucketStateSerializer != null, - "sink has not been initialized" - ); + fsWriter != null && bucketStateSerializer != null, + "sink has not been initialized"); + + snapshotActiveBuckets(checkpointId, bucketStatesContainer); + partCounterStateContainer.add(maxPartCounter); + } + + private void snapshotActiveBuckets( + final long checkpointId, + final ListState bucketStatesContainer) throws Exception { for (Bucket bucket : activeBuckets.values()) { - final PartFileInfo info = bucket.getInProgressPartInfo(); + final BucketState bucketState = bucket.onReceptionOfCheckpoint(checkpointId); - if (info != null && rollingPolicy.shouldRollOnCheckpoint(info)) { - bucket.closePartFile(); - } + final byte[] serializedBucketState = SimpleVersionedSerialization + .writeVersionAndSerialize(bucketStateSerializer, bucketState); - final BucketState bucketState = bucket.onCheckpoint(checkpointId); - bucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState)); + bucketStatesContainer.add(serializedBucketState); } - - partCounterState.add(maxPartCounterUsed); } - /** - * Called on every incoming element to write it to its final location. - * @param value the element itself. - * @param context the {@link SinkFunction.Context context} available to the sink function. - * @throws Exception - */ - void onElement(IN value, SinkFunction.Context context) throws Exception { + void onElement(final IN value, final SinkFunction.Context context) throws Exception { final long currentProcessingTime = context.currentProcessingTime(); // setting the values in the bucketer context @@ -222,79 +227,56 @@ void onElement(IN value, SinkFunction.Context context) throws Exception { context.currentWatermark(), currentProcessingTime); - final BucketID bucketId = bucketer.getBucketId(value, bucketerContext); + final BucketID bucketId = bucketAssigner.getBucketId(value, bucketerContext); + final Bucket bucket = getOrCreateBucketForBucketId(bucketId); + bucket.write(value, currentProcessingTime); + + // we update the global max counter here because as buckets become inactive and + // get removed from the list of active buckets, at the time when we want to create + // another part file for the bucket, if we start from 0 we may overwrite previous parts. + + this.maxPartCounter = Math.max(maxPartCounter, bucket.getPartCounter()); + } + private Bucket getOrCreateBucketForBucketId(final BucketID bucketId) throws IOException { Bucket bucket = activeBuckets.get(bucketId); if (bucket == null) { final Path bucketPath = assembleBucketPath(bucketId); bucket = bucketFactory.getNewBucket( - fileSystemWriter, + fsWriter, subtaskIndex, bucketId, bucketPath, - maxPartCounterUsed, - partFileWriterFactory); + maxPartCounter, + partFileWriterFactory, + rollingPolicy); activeBuckets.put(bucketId, bucket); } - - final PartFileInfo info = bucket.getInProgressPartInfo(); - if (info == null || rollingPolicy.shouldRollOnEvent(info, value)) { - - // info will be null if there is no currently open part file. This - // is the case when we have a new, just created bucket or a bucket - // that has not received any data after the closing of its previously - // open in-progress file due to the specified rolling policy. - - bucket.rollPartFile(currentProcessingTime); - } - bucket.write(value, currentProcessingTime); - - // we update the counter here because as buckets become inactive and - // get removed in the initializeState(), at the time we snapshot they - // may not be there to take them into account during checkpointing. - updateMaxPartCounter(bucket.getPartCounter()); + return bucket; } void onProcessingTime(long timestamp) throws Exception { for (Bucket bucket : activeBuckets.values()) { - final PartFileInfo info = bucket.getInProgressPartInfo(); - if (info != null && rollingPolicy.shouldRollOnProcessingTime(info, timestamp)) { - bucket.closePartFile(); - } + bucket.onProcessingTime(timestamp); } } void close() { if (activeBuckets != null) { - activeBuckets.values().forEach(Bucket::dispose); + activeBuckets.values().forEach(Bucket::disposePartFile); } } - /** - * Assembles the final bucket {@link Path} that will be used for the provided bucket in the - * underlying filesystem. - * @param bucketId the id of the bucket as returned by the {@link Bucketer}. - * @return The resulting path. - */ private Path assembleBucketPath(BucketID bucketId) { return new Path(basePath, bucketId.toString()); } /** - * Updates the state keeping track of the maximum used part - * counter across all local active buckets. - * @param candidate the part counter that will potentially replace the current {@link #maxPartCounterUsed}. - */ - private void updateMaxPartCounter(long candidate) { - maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate); - } - - /** - * The {@link Bucketer.Context} exposed to the - * {@link Bucketer#getBucketId(Object, Bucketer.Context)} + * The {@link BucketAssigner.Context} exposed to the + * {@link BucketAssigner#getBucketId(Object, BucketAssigner.Context)} * whenever a new incoming element arrives. */ - private static final class BucketerContext implements Bucketer.Context { + private static final class BucketerContext implements BucketAssigner.Context { @Nullable private Long elementTimestamp; @@ -309,8 +291,8 @@ private BucketerContext() { this.currentProcessingTime = Long.MIN_VALUE; } - void update(@Nullable Long element, long watermark, long processingTime) { - this.elementTimestamp = element; + void update(@Nullable Long elementTimestamp, long watermark, long processingTime) { + this.elementTimestamp = elementTimestamp; this.currentWatermark = watermark; this.currentProcessingTime = processingTime; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java index 7b8c8fe5c0470..005ae4e737fb9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java @@ -66,7 +66,7 @@ RecoverableWriter.CommitRecoverable closeForCommit() throws IOException { /** * A factory that creates {@link BulkPartWriter BulkPartWriters}. * @param The type of input elements. - * @param The type of ids for the buckets, as returned by the {@link Bucketer}. + * @param The type of ids for the buckets, as returned by the {@link BucketAssigner}. */ static class Factory implements PartFileWriter.PartFileFactory { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java similarity index 85% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java index 532138f1ba20c..1d2edbcaeb543 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java @@ -28,7 +28,7 @@ * A factory returning {@link Bucket buckets}. */ @Internal -class DefaultBucketFactory implements BucketFactory { +class DefaultBucketFactoryImpl implements BucketFactory { private static final long serialVersionUID = 1L; @@ -39,15 +39,17 @@ public Bucket getNewBucket( final BucketID bucketId, final Path bucketPath, final long initialPartCounter, - final PartFileWriter.PartFileFactory partFileWriterFactory) { + final PartFileWriter.PartFileFactory partFileWriterFactory, + final RollingPolicy rollingPolicy) { - return new Bucket<>( + return Bucket.getNew( fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, - partFileWriterFactory); + partFileWriterFactory, + rollingPolicy); } @Override @@ -56,13 +58,15 @@ public Bucket restoreBucket( final int subtaskIndex, final long initialPartCounter, final PartFileWriter.PartFileFactory partFileWriterFactory, + final RollingPolicy rollingPolicy, final BucketState bucketState) throws IOException { - return new Bucket<>( + return Bucket.restore( fsWriter, subtaskIndex, initialPartCounter, partFileWriterFactory, + rollingPolicy, bucketState); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java index dbd62a27d8c13..7750acd7e5877 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java @@ -32,7 +32,7 @@ public interface PartFileInfo { /** * @return The bucket identifier of the current buffer, as returned by the - * {@link Bucketer#getBucketId(Object, Bucketer.Context)}. + * {@link BucketAssigner#getBucketId(Object, BucketAssigner.Context)}. */ BucketID getBucketId(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java index 269b12c12f5b2..2478b79a52797 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java @@ -54,7 +54,7 @@ void write(IN element, long currentTime) throws IOException { /** * A factory that creates {@link RowWisePartWriter RowWisePartWriters}. * @param The type of input elements. - * @param The type of ids for the buckets, as returned by the {@link Bucketer}. + * @param The type of ids for the buckets, as returned by the {@link BucketAssigner}. */ static class Factory implements PartFileWriter.PartFileFactory { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java index 7635e37247823..6c7e135caf820 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java @@ -36,7 +36,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -58,11 +58,11 @@ * These part files contain the actual output data. * * - *

The sink uses a {@link Bucketer} to determine in which bucket directory each element should - * be written to inside the base directory. The {@code Bucketer} can, for example, use time or - * a property of the element to determine the bucket directory. The default {@code Bucketer} is a - * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify - * a custom {@code Bucketer} using the {@code setBucketer(Bucketer)} method, after calling + *

The sink uses a {@link BucketAssigner} to determine in which bucket directory each element should + * be written to inside the base directory. The {@code BucketAssigner} can, for example, use time or + * a property of the element to determine the bucket directory. The default {@code BucketAssigner} is a + * {@link DateTimeBucketAssigner} which will create one new bucket every hour. You can specify + * a custom {@code BucketAssigner} using the {@code setBucketAssigner(bucketAssigner)} method, after calling * {@link StreamingFileSink#forRowFormat(Path, Encoder)} or * {@link StreamingFileSink#forBulkFormat(Path, BulkWriter.Factory)}. * @@ -151,7 +151,7 @@ private StreamingFileSink( */ public static StreamingFileSink.RowFormatBuilder forRowFormat( final Path basePath, final Encoder encoder) { - return new StreamingFileSink.RowFormatBuilder<>(basePath, encoder, new DateTimeBucketer<>()); + return new StreamingFileSink.RowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>()); } /** @@ -164,7 +164,7 @@ public static StreamingFileSink.RowFormatBuilder forRowFormat( */ public static StreamingFileSink.BulkFormatBuilder forBulkFormat( final Path basePath, final BulkWriter.Factory writerFactory) { - return new StreamingFileSink.BulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketer<>()); + return new StreamingFileSink.BulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketAssigner<>()); } /** @@ -191,50 +191,45 @@ public static class RowFormatBuilder extends StreamingFileSink.Buc private final Encoder encoder; - private final Bucketer bucketer; + private final BucketAssigner bucketAssigner; private final RollingPolicy rollingPolicy; private final BucketFactory bucketFactory; - RowFormatBuilder(Path basePath, Encoder encoder, Bucketer bucketer) { - this(basePath, encoder, bucketer, DefaultRollingPolicy.create().build(), 60L * 1000L, new DefaultBucketFactory<>()); + RowFormatBuilder(Path basePath, Encoder encoder, BucketAssigner bucketAssigner) { + this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.create().build(), 60L * 1000L, new DefaultBucketFactoryImpl<>()); } private RowFormatBuilder( Path basePath, Encoder encoder, - Bucketer bucketer, - RollingPolicy rollingPolicy, + BucketAssigner assigner, + RollingPolicy policy, long bucketCheckInterval, BucketFactory bucketFactory) { this.basePath = Preconditions.checkNotNull(basePath); this.encoder = Preconditions.checkNotNull(encoder); - this.bucketer = Preconditions.checkNotNull(bucketer); - this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy); + this.bucketAssigner = Preconditions.checkNotNull(assigner); + this.rollingPolicy = Preconditions.checkNotNull(policy); this.bucketCheckInterval = bucketCheckInterval; this.bucketFactory = Preconditions.checkNotNull(bucketFactory); } public StreamingFileSink.RowFormatBuilder withBucketCheckInterval(final long interval) { - return new RowFormatBuilder<>(basePath, encoder, bucketer, rollingPolicy, interval, bucketFactory); + return new RowFormatBuilder<>(basePath, encoder, bucketAssigner, rollingPolicy, interval, bucketFactory); } - public StreamingFileSink.RowFormatBuilder withBucketer(final Bucketer bucketer) { - return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(bucketer), rollingPolicy, bucketCheckInterval, bucketFactory); + public StreamingFileSink.RowFormatBuilder withBucketAssigner(final BucketAssigner assigner) { + return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(assigner), rollingPolicy, bucketCheckInterval, bucketFactory); } public StreamingFileSink.RowFormatBuilder withRollingPolicy(final RollingPolicy policy) { - return new RowFormatBuilder<>(basePath, encoder, bucketer, Preconditions.checkNotNull(policy), bucketCheckInterval, bucketFactory); + return new RowFormatBuilder<>(basePath, encoder, bucketAssigner, Preconditions.checkNotNull(policy), bucketCheckInterval, bucketFactory); } - public StreamingFileSink.RowFormatBuilder withBucketerAndPolicy(final Bucketer bucketer, final RollingPolicy policy) { - return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(bucketer), Preconditions.checkNotNull(policy), bucketCheckInterval, new DefaultBucketFactory<>()); - } - - @VisibleForTesting - StreamingFileSink.RowFormatBuilder withBucketFactory(final BucketFactory factory) { - return new RowFormatBuilder<>(basePath, encoder, bucketer, rollingPolicy, bucketCheckInterval, Preconditions.checkNotNull(factory)); + public StreamingFileSink.RowFormatBuilder withBucketAssignerAndPolicy(final BucketAssigner assigner, final RollingPolicy policy) { + return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), bucketCheckInterval, new DefaultBucketFactoryImpl<>()); } /** Creates the actual sink. */ @@ -246,12 +241,17 @@ public StreamingFileSink build() { Buckets createBuckets(int subtaskIndex) throws IOException { return new Buckets<>( basePath, - bucketer, + bucketAssigner, bucketFactory, new RowWisePartWriter.Factory<>(encoder), rollingPolicy, subtaskIndex); } + + @VisibleForTesting + StreamingFileSink.RowFormatBuilder withBucketFactory(final BucketFactory factory) { + return new RowFormatBuilder<>(basePath, encoder, bucketAssigner, rollingPolicy, bucketCheckInterval, Preconditions.checkNotNull(factory)); + } } /** @@ -268,38 +268,38 @@ public static class BulkFormatBuilder extends StreamingFileSink.Bu private final BulkWriter.Factory writerFactory; - private final Bucketer bucketer; + private final BucketAssigner bucketAssigner; private final BucketFactory bucketFactory; - BulkFormatBuilder(Path basePath, BulkWriter.Factory writerFactory, Bucketer bucketer) { - this(basePath, writerFactory, bucketer, 60L * 1000L, new DefaultBucketFactory<>()); + BulkFormatBuilder(Path basePath, BulkWriter.Factory writerFactory, BucketAssigner assigner) { + this(basePath, writerFactory, assigner, 60L * 1000L, new DefaultBucketFactoryImpl<>()); } private BulkFormatBuilder( Path basePath, BulkWriter.Factory writerFactory, - Bucketer bucketer, + BucketAssigner assigner, long bucketCheckInterval, BucketFactory bucketFactory) { this.basePath = Preconditions.checkNotNull(basePath); this.writerFactory = writerFactory; - this.bucketer = Preconditions.checkNotNull(bucketer); + this.bucketAssigner = Preconditions.checkNotNull(assigner); this.bucketCheckInterval = bucketCheckInterval; this.bucketFactory = Preconditions.checkNotNull(bucketFactory); } public StreamingFileSink.BulkFormatBuilder withBucketCheckInterval(long interval) { - return new BulkFormatBuilder<>(basePath, writerFactory, bucketer, interval, bucketFactory); + return new BulkFormatBuilder<>(basePath, writerFactory, bucketAssigner, interval, bucketFactory); } - public StreamingFileSink.BulkFormatBuilder withBucketer(Bucketer bucketer) { - return new BulkFormatBuilder<>(basePath, writerFactory, Preconditions.checkNotNull(bucketer), bucketCheckInterval, new DefaultBucketFactory<>()); + public StreamingFileSink.BulkFormatBuilder withBucketAssigner(BucketAssigner assigner) { + return new BulkFormatBuilder<>(basePath, writerFactory, Preconditions.checkNotNull(assigner), bucketCheckInterval, new DefaultBucketFactoryImpl<>()); } @VisibleForTesting StreamingFileSink.BulkFormatBuilder withBucketFactory(final BucketFactory factory) { - return new BulkFormatBuilder<>(basePath, writerFactory, bucketer, bucketCheckInterval, Preconditions.checkNotNull(factory)); + return new BulkFormatBuilder<>(basePath, writerFactory, bucketAssigner, bucketCheckInterval, Preconditions.checkNotNull(factory)); } /** Creates the actual sink. */ @@ -311,10 +311,10 @@ public StreamingFileSink build() { Buckets createBuckets(int subtaskIndex) throws IOException { return new Buckets<>( basePath, - bucketer, + bucketAssigner, bucketFactory, new BulkPartWriter.Factory<>(writerFactory), - new OnCheckpointRollingPolicy<>(), + OnCheckpointRollingPolicy.build(), subtaskIndex); } } @@ -337,7 +337,7 @@ public void initializeState(FunctionInitializationContext context) throws Except @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - buckets.publishUpToCheckpoint(checkpointId); + buckets.commitUpToCheckpoint(checkpointId); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.java similarity index 84% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.java index c35ba8031c26f..3633004cc5d15 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.java @@ -16,23 +16,23 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers; +package org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.streaming.api.functions.sink.filesystem.Bucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; /** - * A {@link Bucketer} that does not perform any + * A {@link BucketAssigner} that does not perform any * bucketing of files. All files are written to the base path. */ @PublicEvolving -public class BasePathBucketer implements Bucketer { +public class BasePathBucketAssigner implements BucketAssigner { private static final long serialVersionUID = -6033643155550226022L; @Override - public String getBucketId(T element, Bucketer.Context context) { + public String getBucketId(T element, BucketAssigner.Context context) { return ""; } @@ -44,6 +44,6 @@ public SimpleVersionedSerializer getSerializer() { @Override public String toString() { - return "BasePathBucketer"; + return "BasePathBucketAssigner"; } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java similarity index 87% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java index d226d200f2f79..5d30f39fa2ed0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java @@ -16,17 +16,17 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers; +package org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.streaming.api.functions.sink.filesystem.Bucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; import java.text.SimpleDateFormat; import java.util.Date; /** - * A {@link Bucketer} that assigns to buckets based on current system time. + * A {@link BucketAssigner} that assigns to buckets based on current system time. * * *

The {@code DateTimeBucketer} will create directories of the following form: @@ -52,7 +52,7 @@ * */ @PublicEvolving -public class DateTimeBucketer implements Bucketer { +public class DateTimeBucketAssigner implements BucketAssigner { private static final long serialVersionUID = 1L; @@ -65,7 +65,7 @@ public class DateTimeBucketer implements Bucketer { /** * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}. */ - public DateTimeBucketer() { + public DateTimeBucketAssigner() { this(DEFAULT_FORMAT_STRING); } @@ -75,12 +75,12 @@ public DateTimeBucketer() { * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine * the bucket path. */ - public DateTimeBucketer(String formatString) { + public DateTimeBucketAssigner(String formatString) { this.formatString = formatString; } @Override - public String getBucketId(IN element, Bucketer.Context context) { + public String getBucketId(IN element, BucketAssigner.Context context) { if (dateFormatter == null) { dateFormatter = new SimpleDateFormat(formatString); } @@ -94,8 +94,6 @@ public SimpleVersionedSerializer getSerializer() { @Override public String toString() { - return "DateTimeBucketer{" + - "formatString='" + formatString + '\'' + - '}'; + return "DateTimeBucketAssigner{formatString='" + formatString + '\'' + '}'; } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/SimpleVersionedStringSerializer.java similarity index 96% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/SimpleVersionedStringSerializer.java index d025af976507e..4726aff967c9a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/SimpleVersionedStringSerializer.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers; +package org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.io.SimpleVersionedSerializer; import java.io.IOException; @@ -29,6 +30,7 @@ /** * A {@link SimpleVersionedSerializer} implementation for Strings. */ +@PublicEvolving public final class SimpleVersionedStringSerializer implements SimpleVersionedSerializer { private static final Charset CHARSET = StandardCharsets.UTF_8; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java index b4b1cebf88cdb..7c75f1c5e2563 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java @@ -87,7 +87,10 @@ public boolean shouldRollOnProcessingTime(final PartFileInfo partFileS * To finalize it and have the actual policy, call {@code .create()}. */ public static DefaultRollingPolicy.PolicyBuilder create() { - return new DefaultRollingPolicy.PolicyBuilder(); + return new DefaultRollingPolicy.PolicyBuilder( + DEFAULT_MAX_PART_SIZE, + DEFAULT_ROLLOVER_INTERVAL, + DEFAULT_INACTIVITY_INTERVAL); } /** @@ -96,42 +99,46 @@ public static DefaultRollingPolicy.PolicyBuilder create() { @PublicEvolving public static final class PolicyBuilder { - private long partSize = DEFAULT_MAX_PART_SIZE; + private final long partSize; - private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL; + private final long rolloverInterval; - private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL; + private final long inactivityInterval; - private PolicyBuilder() {} + private PolicyBuilder( + final long partSize, + final long rolloverInterval, + final long inactivityInterval) { + this.partSize = partSize; + this.rolloverInterval = rolloverInterval; + this.inactivityInterval = inactivityInterval; + } /** * Sets the part size above which a part file will have to roll. * @param size the allowed part size. */ - public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(long size) { + public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(final long size) { Preconditions.checkState(size > 0L); - this.partSize = size; - return this; + return new PolicyBuilder(size, rolloverInterval, inactivityInterval); } /** * Sets the interval of allowed inactivity after which a part file will have to roll. * @param interval the allowed inactivity interval. */ - public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(long interval) { + public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(final long interval) { Preconditions.checkState(interval > 0L); - this.inactivityInterval = interval; - return this; + return new PolicyBuilder(partSize, rolloverInterval, interval); } /** * Sets the max time a part file can stay open before having to roll. * @param interval the desired rollover interval. */ - public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(long interval) { + public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(final long interval) { Preconditions.checkState(interval > 0L); - this.rolloverInterval = interval; - return this; + return new PolicyBuilder(partSize, interval, inactivityInterval); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java index 9ad8172e9de90..53fce082c3bd6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java @@ -18,16 +18,20 @@ package org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo; import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; /** * A {@link RollingPolicy} which rolls on every checkpoint. */ +@PublicEvolving public class OnCheckpointRollingPolicy implements RollingPolicy { private static final long serialVersionUID = 1L; + private OnCheckpointRollingPolicy() {} + @Override public boolean shouldRollOnCheckpoint(PartFileInfo partFileState) { return true; @@ -42,4 +46,8 @@ public boolean shouldRollOnEvent(PartFileInfo partFileState, IN elemen public boolean shouldRollOnProcessingTime(PartFileInfo partFileState, long currentTime) { return false; } + + public static OnCheckpointRollingPolicy build() { + return new OnCheckpointRollingPolicy<>(); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java index 3d5be6340ff15..55360a4a35380 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java @@ -25,7 +25,7 @@ import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.junit.Assert; import org.junit.ClassRule; @@ -75,8 +75,8 @@ public void testSerializationEmpty() throws IOException { final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes); Assert.assertEquals(testBucket, recoveredState.getBucketPath()); - Assert.assertNull(recoveredState.getInProgress()); - Assert.assertTrue(recoveredState.getPendingPerCheckpoint().isEmpty()); + Assert.assertNull(recoveredState.getInProgressResumableFile()); + Assert.assertTrue(recoveredState.getCommittableFilesPerCheckpoint().isEmpty()); } @Test @@ -166,7 +166,7 @@ public void testSerializationFull() throws IOException { Assert.assertEquals(bucketPath, recoveredState.getBucketPath()); - final Map> recoveredRecoverables = recoveredState.getPendingPerCheckpoint(); + final Map> recoveredRecoverables = recoveredState.getCommittableFilesPerCheckpoint(); Assert.assertEquals(5L, recoveredRecoverables.size()); // recover and commit @@ -238,9 +238,9 @@ public void testSerializationNullInProgress() throws IOException { final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes); Assert.assertEquals(bucketPath, recoveredState.getBucketPath()); - Assert.assertNull(recoveredState.getInProgress()); + Assert.assertNull(recoveredState.getInProgressResumableFile()); - final Map> recoveredRecoverables = recoveredState.getPendingPerCheckpoint(); + final Map> recoveredRecoverables = recoveredState.getCommittableFilesPerCheckpoint(); Assert.assertEquals(5L, recoveredRecoverables.size()); // recover and commit diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java index 5e8eb6d9ba7c7..25622d14466fe 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java @@ -22,7 +22,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.junit.Assert; import org.junit.ClassRule; @@ -58,7 +58,7 @@ private void testCorrectPassingOfContext(Long timestamp, long watermark, long pr final Buckets buckets = StreamingFileSink .forRowFormat(new Path(outDir.toURI()), new SimpleStringEncoder<>()) - .withBucketer(new VarifyingBucketer(expectedTimestamp, expectedWatermark, expectedProcessingTime)) + .withBucketAssigner(new VarifyingBucketer(expectedTimestamp, expectedWatermark, expectedProcessingTime)) .createBuckets(2); buckets.onElement("TEST", new SinkFunction.Context() { @@ -79,7 +79,7 @@ public Long timestamp() { }); } - private static class VarifyingBucketer implements Bucketer { + private static class VarifyingBucketer implements BucketAssigner { private static final long serialVersionUID = 7729086510972377578L; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java index 7b6b82cd4eb72..20890c07a930f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java @@ -59,7 +59,7 @@ public void testCustomBulkWriter() throws Exception { 10L, new TestUtils.TupleToStringBucketer(), new TestBulkWriterFactory(), - new DefaultBucketFactory<>()) + new DefaultBucketFactoryImpl<>()) ) { testHarness.setup(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java index a0c438e184705..ab487d168bc62 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java @@ -491,8 +491,8 @@ public void testMaxCounterUponRecovery() throws Exception { OperatorSubtaskState mergedSnapshot; - final TestBucketFactory first = new TestBucketFactory(); - final TestBucketFactory second = new TestBucketFactory(); + final TestBucketFactoryImpl first = new TestBucketFactoryImpl(); + final TestBucketFactoryImpl second = new TestBucketFactoryImpl(); final RollingPolicy, String> rollingPolicy = DefaultRollingPolicy .create() @@ -526,8 +526,8 @@ public void testMaxCounterUponRecovery() throws Exception { ); } - final TestBucketFactory firstRecovered = new TestBucketFactory(); - final TestBucketFactory secondRecovered = new TestBucketFactory(); + final TestBucketFactoryImpl firstRecovered = new TestBucketFactoryImpl(); + final TestBucketFactoryImpl secondRecovered = new TestBucketFactoryImpl(); try ( OneInputStreamOperatorTestHarness, Object> testHarness1 = TestUtils.createCustomRescalingTestSink( @@ -559,7 +559,7 @@ public void testMaxCounterUponRecovery() throws Exception { ////////////////////// Helper Methods ////////////////////// - static class TestBucketFactory extends DefaultBucketFactory, String> { + static class TestBucketFactoryImpl extends DefaultBucketFactoryImpl, String> { private static final long serialVersionUID = 2794824980604027930L; @@ -572,7 +572,8 @@ public Bucket, String> getNewBucket( final String bucketId, final Path bucketPath, final long initialPartCounter, - final PartFileWriter.PartFileFactory, String> partFileWriterFactory) { + final PartFileWriter.PartFileFactory, String> partFileWriterFactory, + final RollingPolicy, String> rollingPolicy) { this.initialCounter = initialPartCounter; @@ -582,7 +583,8 @@ public Bucket, String> getNewBucket( bucketId, bucketPath, initialPartCounter, - partFileWriterFactory); + partFileWriterFactory, + rollingPolicy); } @Override @@ -591,6 +593,7 @@ public Bucket, String> restoreBucket( final int subtaskIndex, final long initialPartCounter, final PartFileWriter.PartFileFactory, String> partFileWriterFactory, + final RollingPolicy, String> rollingPolicy, final BucketState bucketState) throws IOException { this.initialCounter = initialPartCounter; @@ -600,6 +603,7 @@ public Bucket, String> restoreBucket( subtaskIndex, initialPartCounter, partFileWriterFactory, + rollingPolicy, bucketState); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java index f16a9085d9d66..851b6825d9a5e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java @@ -67,7 +67,7 @@ public void testDefaultRollingPolicy() throws Exception { new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, - new DefaultBucketFactory<>()) + new DefaultBucketFactoryImpl<>()) ) { testHarness.setup(); testHarness.open(); @@ -111,7 +111,7 @@ public void testDefaultRollingPolicy() throws Exception { public void testRollOnCheckpointPolicy() throws Exception { final File outDir = TEMP_FOLDER.newFolder(); - final RollingPolicy, String> rollingPolicy = new OnCheckpointRollingPolicy<>(); + final RollingPolicy, String> rollingPolicy = OnCheckpointRollingPolicy.build(); try ( OneInputStreamOperatorTestHarness, Object> testHarness = TestUtils.createCustomRescalingTestSink( @@ -122,7 +122,7 @@ public void testRollOnCheckpointPolicy() throws Exception { new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, - new DefaultBucketFactory<>()) + new DefaultBucketFactoryImpl<>()) ) { testHarness.setup(); testHarness.open(); @@ -246,7 +246,7 @@ public boolean shouldRollOnProcessingTime(PartFileInfo partFileState, lo new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, - new DefaultBucketFactory<>()) + new DefaultBucketFactoryImpl<>()) ) { testHarness.setup(); testHarness.open(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java index 8d9392b3c3781..bfbc12043ee96 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java @@ -23,7 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -58,7 +58,7 @@ static OneInputStreamOperatorTestHarness, Object> create .withInactivityInterval(inactivityInterval) .build(); - final Bucketer, String> bucketer = new TupleToStringBucketer(); + final BucketAssigner, String> bucketer = new TupleToStringBucketer(); final Encoder> encoder = (element, stream) -> { stream.write((element.f0 + '@' + element.f1).getBytes(StandardCharsets.UTF_8)); @@ -73,7 +73,7 @@ static OneInputStreamOperatorTestHarness, Object> create bucketer, encoder, rollingPolicy, - new DefaultBucketFactory<>()); + new DefaultBucketFactoryImpl<>()); } static OneInputStreamOperatorTestHarness, Object> createCustomRescalingTestSink( @@ -81,14 +81,14 @@ static OneInputStreamOperatorTestHarness, Object> create final int totalParallelism, final int taskIdx, final long bucketCheckInterval, - final Bucketer, String> bucketer, + final BucketAssigner, String> bucketer, final Encoder> writer, final RollingPolicy, String> rollingPolicy, final BucketFactory, String> bucketFactory) throws Exception { StreamingFileSink> sink = StreamingFileSink .forRowFormat(new Path(outDir.toURI()), writer) - .withBucketer(bucketer) + .withBucketAssigner(bucketer) .withRollingPolicy(rollingPolicy) .withBucketCheckInterval(bucketCheckInterval) .withBucketFactory(bucketFactory) @@ -102,13 +102,13 @@ static OneInputStreamOperatorTestHarness, Object> create final int totalParallelism, final int taskIdx, final long bucketCheckInterval, - final Bucketer, String> bucketer, + final BucketAssigner, String> bucketer, final BulkWriter.Factory> writer, final BucketFactory, String> bucketFactory) throws Exception { StreamingFileSink> sink = StreamingFileSink .forBulkFormat(new Path(outDir.toURI()), writer) - .withBucketer(bucketer) + .withBucketAssigner(bucketer) .withBucketCheckInterval(bucketCheckInterval) .withBucketFactory(bucketFactory) .build(); @@ -146,7 +146,7 @@ static Map getFileContentByPath(File directory) throws IOException return contents; } - static class TupleToStringBucketer implements Bucketer, String> { + static class TupleToStringBucketer implements BucketAssigner, String> { private static final long serialVersionUID = 1L;