diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index a350096e38b31..fbedaac28e659 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.Preconditions;
@@ -34,11 +34,11 @@
/**
* A bucket is the directory organization of the output of the {@link StreamingFileSink}.
*
- *
For each incoming element in the {@code BucketingSink}, the user-specified
- * {@link Bucketer Bucketer} is
- * queried to see in which bucket this element should be written to.
+ *
For each incoming element in the {@code StreamingFileSink}, the user-specified
+ * {@link BucketAssigner Bucketer} is queried to see in which bucket this element should
+ * be written to.
*/
-@PublicEvolving
+@Internal
public class Bucket {
private static final String PART_PREFIX = "part";
@@ -53,57 +53,27 @@ public class Bucket {
private final RecoverableWriter fsWriter;
- private final Map> pendingPerCheckpoint = new HashMap<>();
+ private final RollingPolicy rollingPolicy;
- private long partCounter;
-
- private PartFileWriter currentPart;
-
- private List pending;
-
- /**
- * Constructor to restore a bucket from checkpointed state.
- */
- public Bucket(
- RecoverableWriter fsWriter,
- int subtaskIndex,
- long initialPartCounter,
- PartFileWriter.PartFileFactory partFileFactory,
- BucketState bucketState) throws IOException {
+ private final Map> pendingPartsPerCheckpoint = new HashMap<>();
- this(fsWriter, subtaskIndex, bucketState.getBucketId(), bucketState.getBucketPath(), initialPartCounter, partFileFactory);
-
- // the constructor must have already initialized the filesystem writer
- Preconditions.checkState(fsWriter != null);
-
- // we try to resume the previous in-progress file, if the filesystem
- // supports such operation. If not, we just commit the file and start fresh.
+ private long partCounter;
- final RecoverableWriter.ResumeRecoverable resumable = bucketState.getInProgress();
- if (resumable != null) {
- currentPart = partFileFactory.resumeFrom(
- bucketId, fsWriter, resumable, bucketState.getCreationTime());
- }
+ private PartFileWriter inProgressPart;
- // we commit pending files for previous checkpoints to the last successful one
- // (from which we are recovering from)
- for (List commitables: bucketState.getPendingPerCheckpoint().values()) {
- for (RecoverableWriter.CommitRecoverable commitable: commitables) {
- fsWriter.recoverForCommit(commitable).commitAfterRecovery();
- }
- }
- }
+ private List pendingPartsForCurrentCheckpoint;
/**
* Constructor to create a new empty bucket.
*/
- public Bucket(
- RecoverableWriter fsWriter,
- int subtaskIndex,
- BucketID bucketId,
- Path bucketPath,
- long initialPartCounter,
- PartFileWriter.PartFileFactory partFileFactory) {
+ private Bucket(
+ final RecoverableWriter fsWriter,
+ final int subtaskIndex,
+ final BucketID bucketId,
+ final Path bucketPath,
+ final long initialPartCounter,
+ final PartFileWriter.PartFileFactory partFileFactory,
+ final RollingPolicy rollingPolicy) {
this.fsWriter = Preconditions.checkNotNull(fsWriter);
this.subtaskIndex = subtaskIndex;
@@ -111,117 +81,214 @@ public Bucket(
this.bucketPath = Preconditions.checkNotNull(bucketPath);
this.partCounter = initialPartCounter;
this.partFileFactory = Preconditions.checkNotNull(partFileFactory);
+ this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
- this.pending = new ArrayList<>();
+ this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
}
/**
- * Gets the information available for the currently
- * open part file, i.e. the one we are currently writing to.
- *
- * This will be null if there is no currently open part file. This
- * is the case when we have a new, just created bucket or a bucket
- * that has not received any data after the closing of its previously
- * open in-progress file due to the specified rolling policy.
- *
- * @return The information about the currently in-progress part file
- * or {@code null} if there is no open part file.
+ * Constructor to restore a bucket from checkpointed state.
*/
- public PartFileInfo getInProgressPartInfo() {
- return currentPart;
+ private Bucket(
+ final RecoverableWriter fsWriter,
+ final int subtaskIndex,
+ final long initialPartCounter,
+ final PartFileWriter.PartFileFactory partFileFactory,
+ final RollingPolicy rollingPolicy,
+ final BucketState bucketState) throws IOException {
+
+ this(
+ fsWriter,
+ subtaskIndex,
+ bucketState.getBucketId(),
+ bucketState.getBucketPath(),
+ initialPartCounter,
+ partFileFactory,
+ rollingPolicy);
+
+ restoreInProgressFile(bucketState);
+ commitRecoveredPendingFiles(bucketState);
+ }
+
+ private void restoreInProgressFile(final BucketState state) throws IOException {
+ // we try to resume the previous in-progress file
+ if (state.hasInProgressResumableFile()) {
+ final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
+ inProgressPart = partFileFactory.resumeFrom(
+ bucketId, fsWriter, resumable, state.getInProgressFileCreationTime());
+ }
}
- public BucketID getBucketId() {
+ private void commitRecoveredPendingFiles(final BucketState state) throws IOException {
+
+ // we commit pending files for checkpoints that precess the last successful one, from which we are recovering
+ for (List committables: state.getCommittableFilesPerCheckpoint().values()) {
+ for (RecoverableWriter.CommitRecoverable committable: committables) {
+ fsWriter.recoverForCommit(committable).commitAfterRecovery();
+ }
+ }
+ }
+
+ BucketID getBucketId() {
return bucketId;
}
- public Path getBucketPath() {
+ Path getBucketPath() {
return bucketPath;
}
- public long getPartCounter() {
+ long getPartCounter() {
return partCounter;
}
- public boolean isActive() {
- return currentPart != null || !pending.isEmpty() || !pendingPerCheckpoint.isEmpty();
+ boolean isActive() {
+ return inProgressPart != null || !pendingPartsForCurrentCheckpoint.isEmpty() || !pendingPartsPerCheckpoint.isEmpty();
+ }
+
+ void merge(final Bucket bucket) throws IOException {
+ Preconditions.checkNotNull(bucket);
+ Preconditions.checkState(Objects.equals(bucket.bucketPath, bucketPath));
+
+ // There should be no pending files in the "to-merge" states.
+ // The reason is that:
+ // 1) the pendingPartsForCurrentCheckpoint is emptied whenever we take a snapshot (see prepareBucketForCheckpointing()).
+ // So a snapshot, including the one we are recovering from, will never contain such files.
+ // 2) the files in pendingPartsPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).
+
+ Preconditions.checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
+ Preconditions.checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
+
+ RecoverableWriter.CommitRecoverable committable = bucket.closePartFile();
+ if (committable != null) {
+ pendingPartsForCurrentCheckpoint.add(committable);
+ }
}
void write(IN element, long currentTime) throws IOException {
- Preconditions.checkState(currentPart != null, "bucket has been closed");
- currentPart.write(element, currentTime);
+ if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
+ rollPartFile(currentTime);
+ }
+ inProgressPart.write(element, currentTime);
}
- void rollPartFile(final long currentTime) throws IOException {
+ private void rollPartFile(final long currentTime) throws IOException {
closePartFile();
- currentPart = partFileFactory.openNew(bucketId, fsWriter, getNewPartPath(), currentTime);
+ inProgressPart = partFileFactory.openNew(bucketId, fsWriter, assembleNewPartPath(), currentTime);
partCounter++;
}
- void merge(final Bucket bucket) throws IOException {
- Preconditions.checkNotNull(bucket);
- Preconditions.checkState(Objects.equals(bucket.getBucketPath(), bucketPath));
+ private Path assembleNewPartPath() {
+ return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
+ }
- // there should be no pending files in the "to-merge" states.
- Preconditions.checkState(bucket.pending.isEmpty());
- Preconditions.checkState(bucket.pendingPerCheckpoint.isEmpty());
+ private RecoverableWriter.CommitRecoverable closePartFile() throws IOException {
+ RecoverableWriter.CommitRecoverable committable = null;
+ if (inProgressPart != null) {
+ committable = inProgressPart.closeForCommit();
+ pendingPartsForCurrentCheckpoint.add(committable);
+ inProgressPart = null;
+ }
+ return committable;
+ }
- RecoverableWriter.CommitRecoverable commitable = bucket.closePartFile();
- if (commitable != null) {
- pending.add(commitable);
+ void disposePartFile() {
+ if (inProgressPart != null) {
+ inProgressPart.dispose();
}
}
- RecoverableWriter.CommitRecoverable closePartFile() throws IOException {
- RecoverableWriter.CommitRecoverable commitable = null;
- if (currentPart != null) {
- commitable = currentPart.closeForCommit();
- pending.add(commitable);
- currentPart = null;
+ BucketState onReceptionOfCheckpoint(long checkpointId) throws IOException {
+ prepareBucketForCheckpointing(checkpointId);
+
+ RecoverableWriter.ResumeRecoverable inProgressResumable = null;
+ long inProgressFileCreationTime = Long.MAX_VALUE;
+
+ if (inProgressPart != null) {
+ inProgressResumable = inProgressPart.persist();
+ inProgressFileCreationTime = inProgressPart.getCreationTime();
}
- return commitable;
+
+ return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint);
}
- public void dispose() {
- if (currentPart != null) {
- currentPart.dispose();
+ private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
+ if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
+ closePartFile();
+ }
+
+ if (!pendingPartsForCurrentCheckpoint.isEmpty()) {
+ pendingPartsPerCheckpoint.put(checkpointId, pendingPartsForCurrentCheckpoint);
+ pendingPartsForCurrentCheckpoint = new ArrayList<>();
}
}
- public void onCheckpointAcknowledgment(long checkpointId) throws IOException {
+ void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
Preconditions.checkNotNull(fsWriter);
Iterator>> it =
- pendingPerCheckpoint.entrySet().iterator();
+ pendingPartsPerCheckpoint.entrySet().iterator();
while (it.hasNext()) {
Map.Entry> entry = it.next();
+
if (entry.getKey() <= checkpointId) {
- for (RecoverableWriter.CommitRecoverable commitable : entry.getValue()) {
- fsWriter.recoverForCommit(commitable).commit();
+ for (RecoverableWriter.CommitRecoverable committable : entry.getValue()) {
+ fsWriter.recoverForCommit(committable).commit();
}
it.remove();
}
}
}
- public BucketState onCheckpoint(long checkpointId) throws IOException {
- RecoverableWriter.ResumeRecoverable resumable = null;
- long creationTime = Long.MAX_VALUE;
-
- if (currentPart != null) {
- resumable = currentPart.persist();
- creationTime = currentPart.getCreationTime();
+ void onProcessingTime(long timestamp) throws IOException {
+ if (inProgressPart != null && rollingPolicy.shouldRollOnProcessingTime(inProgressPart, timestamp)) {
+ closePartFile();
}
+ }
- if (!pending.isEmpty()) {
- pendingPerCheckpoint.put(checkpointId, pending);
- pending = new ArrayList<>();
- }
- return new BucketState<>(bucketId, bucketPath, creationTime, resumable, pendingPerCheckpoint);
+ // --------------------------- Static Factory Methods -----------------------------
+
+ /**
+ * Creates a new empty {@code Bucket}.
+ * @param fsWriter the filesystem-specific {@link RecoverableWriter}.
+ * @param subtaskIndex the index of the subtask creating the bucket.
+ * @param bucketId the identifier of the bucket, as returned by the {@link BucketAssigner}.
+ * @param bucketPath the path to where the part files for the bucket will be written to.
+ * @param initialPartCounter the initial counter for the part files of the bucket.
+ * @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
+ * @param the type of input elements to the sink.
+ * @param the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
+ * @return The new Bucket.
+ */
+ static Bucket getNew(
+ final RecoverableWriter fsWriter,
+ final int subtaskIndex,
+ final BucketID bucketId,
+ final Path bucketPath,
+ final long initialPartCounter,
+ final PartFileWriter.PartFileFactory partFileFactory,
+ final RollingPolicy rollingPolicy) {
+ return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy);
}
- private Path getNewPartPath() {
- return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
+ /**
+ * Restores a {@code Bucket} from the state included in the provided {@link BucketState}.
+ * @param fsWriter the filesystem-specific {@link RecoverableWriter}.
+ * @param subtaskIndex the index of the subtask creating the bucket.
+ * @param initialPartCounter the initial counter for the part files of the bucket.
+ * @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
+ * @param bucketState the initial state of the restored bucket.
+ * @param the type of input elements to the sink.
+ * @param the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
+ * @return The restored Bucket.
+ */
+ static Bucket restore(
+ final RecoverableWriter fsWriter,
+ final int subtaskIndex,
+ final long initialPartCounter,
+ final PartFileWriter.PartFileFactory partFileFactory,
+ final RollingPolicy rollingPolicy,
+ final BucketState bucketState) throws IOException {
+ return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState);
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.java
similarity index 69%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucketer.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.java
index a7052cb219ba5..a9b0200b110ce 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucketer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.java
@@ -28,49 +28,45 @@
import java.io.Serializable;
/**
- * A bucketer is used with a {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}
- * to determine the {@link org.apache.flink.streaming.api.functions.sink.filesystem.Bucket} each incoming element
+ * A BucketAssigner is used with a {@link StreamingFileSink} to determine the {@link Bucket} each incoming element
* should be put into.
*
* The {@code StreamingFileSink} can be writing to many buckets at a time, and it is responsible for managing
- * a set of active buckets. Whenever a new element arrives it will ask the {@code Bucketer} for the bucket the
- * element should fall in. The {@code Bucketer} can, for example, determine buckets based on system time.
+ * a set of active buckets. Whenever a new element arrives it will ask the {@code BucketAssigner} for the bucket the
+ * element should fall in. The {@code BucketAssigner} can, for example, determine buckets based on system time.
*
* @param The type of input elements.
- * @param The type of the object returned by the {@link #getBucketId(Object, Bucketer.Context)}. This has to have
+ * @param The type of the object returned by the {@link #getBucketId(Object, BucketAssigner.Context)}. This has to have
* a correct {@link #hashCode()} and {@link #equals(Object)} method. In addition, the {@link Path}
* to the created bucket will be the result of the {@link #toString()} of this method, appended to
- * the {@code basePath} specified in the
- * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * the {@code basePath} specified in the {@link StreamingFileSink StreamingFileSink}.
*/
@PublicEvolving
-public interface Bucketer extends Serializable {
+public interface BucketAssigner extends Serializable {
/**
* Returns the identifier of the bucket the provided element should be put into.
* @param element The current element being processed.
- * @param context The {@link SinkFunction.Context context} used by the
- * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink sink}.
+ * @param context The {@link SinkFunction.Context context} used by the {@link StreamingFileSink sink}.
*
* @return A string representing the identifier of the bucket the element should be put into.
- * This actual path to the bucket will result from the concatenation of the returned string
- * and the {@code base path} provided during the initialization of the
- * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink sink}.
+ * The actual path to the bucket will result from the concatenation of the returned string
+ * and the {@code base path} provided during the initialization of the {@link StreamingFileSink sink}.
*/
- BucketID getBucketId(IN element, Bucketer.Context context);
+ BucketID getBucketId(IN element, BucketAssigner.Context context);
/**
* @return A {@link SimpleVersionedSerializer} capable of serializing/deserializing the elements
* of type {@code BucketID}. That is the type of the objects returned by the
- * {@link #getBucketId(Object, Bucketer.Context)}.
+ * {@link #getBucketId(Object, BucketAssigner.Context)}.
*/
SimpleVersionedSerializer getSerializer();
/**
- * Context that the {@link Bucketer} can use for getting additional data about
+ * Context that the {@link BucketAssigner} can use for getting additional data about
* an input record.
*
- * The context is only valid for the duration of a {@link Bucketer#getBucketId(Object, Bucketer.Context)} call.
+ *
The context is only valid for the duration of a {@link BucketAssigner#getBucketId(Object, BucketAssigner.Context)} call.
* Do not store the context and use afterwards!
*/
@PublicEvolving
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
index 0c6b587b421f0..2306fafff6adb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -37,12 +37,14 @@ Bucket getNewBucket(
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
- final PartFileWriter.PartFileFactory partFileWriterFactory) throws IOException;
+ final PartFileWriter.PartFileFactory partFileWriterFactory,
+ final RollingPolicy rollingPolicy) throws IOException;
Bucket restoreBucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
final PartFileWriter.PartFileFactory partFileWriterFactory,
+ final RollingPolicy rollingPolicy,
final BucketState bucketState) throws IOException;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
index bb49e3a5d2593..18ef32f9a26b1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
@@ -32,64 +32,68 @@
* The state of the {@link Bucket} that is to be checkpointed.
*/
@Internal
-public class BucketState {
+class BucketState {
private final BucketID bucketId;
- /**
- * The base path for the bucket, i.e. the directory where all the part files are stored.
- */
+ /** The directory where all the part files of the bucket are stored. */
private final Path bucketPath;
/**
- * The creation time of the currently open part file, or {@code Long.MAX_VALUE} if there is no open part file.
+ * The creation time of the currently open part file,
+ * or {@code Long.MAX_VALUE} if there is no open part file.
*/
- private final long creationTime;
+ private final long inProgressFileCreationTime;
/**
- * A {@link RecoverableWriter.ResumeRecoverable} for the currently open part file, or null
- * if there is no currently open part file.
+ * A {@link RecoverableWriter.ResumeRecoverable} for the currently open
+ * part file, or null if there is no currently open part file.
*/
@Nullable
- private final RecoverableWriter.ResumeRecoverable inProgress;
+ private final RecoverableWriter.ResumeRecoverable inProgressResumableFile;
/**
- * The {@link RecoverableWriter.CommitRecoverable files} pending to be committed, organized by checkpoint id.
+ * The {@link RecoverableWriter.CommitRecoverable files} pending to be
+ * committed, organized by checkpoint id.
*/
- private final Map> pendingPerCheckpoint;
+ private final Map> committableFilesPerCheckpoint;
- public BucketState(
+ BucketState(
final BucketID bucketId,
final Path bucketPath,
- final long creationTime,
- @Nullable final RecoverableWriter.ResumeRecoverable inProgress,
- final Map> pendingPerCheckpoint
+ final long inProgressFileCreationTime,
+ @Nullable final RecoverableWriter.ResumeRecoverable inProgressResumableFile,
+ final Map> pendingCommittablesPerCheckpoint
) {
this.bucketId = Preconditions.checkNotNull(bucketId);
this.bucketPath = Preconditions.checkNotNull(bucketPath);
- this.creationTime = creationTime;
- this.inProgress = inProgress;
- this.pendingPerCheckpoint = Preconditions.checkNotNull(pendingPerCheckpoint);
+ this.inProgressFileCreationTime = inProgressFileCreationTime;
+ this.inProgressResumableFile = inProgressResumableFile;
+ this.committableFilesPerCheckpoint = Preconditions.checkNotNull(pendingCommittablesPerCheckpoint);
}
- public BucketID getBucketId() {
+ BucketID getBucketId() {
return bucketId;
}
- public Path getBucketPath() {
+ Path getBucketPath() {
return bucketPath;
}
- public long getCreationTime() {
- return creationTime;
+ long getInProgressFileCreationTime() {
+ return inProgressFileCreationTime;
+ }
+
+ boolean hasInProgressResumableFile() {
+ return inProgressResumableFile != null;
}
@Nullable
- public RecoverableWriter.ResumeRecoverable getInProgress() {
- return inProgress;
+ RecoverableWriter.ResumeRecoverable getInProgressResumableFile() {
+ return inProgressResumableFile;
}
- public Map> getPendingPerCheckpoint() {
- return pendingPerCheckpoint;
+ Map> getCommittableFilesPerCheckpoint() {
+ return committableFilesPerCheckpoint;
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
index cf9b8057bd107..04de2462d6709 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
@@ -90,20 +90,20 @@ public BucketState deserialize(int version, byte[] serialized) throws
void serializeV1(BucketState state, DataOutputView out) throws IOException {
SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), out);
out.writeUTF(state.getBucketPath().toString());
- out.writeLong(state.getCreationTime());
+ out.writeLong(state.getInProgressFileCreationTime());
// put the current open part file
- final RecoverableWriter.ResumeRecoverable currentPart = state.getInProgress();
- if (currentPart != null) {
+ if (state.hasInProgressResumableFile()) {
+ final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
out.writeBoolean(true);
- SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, currentPart, out);
+ SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, resumable, out);
}
else {
out.writeBoolean(false);
}
// put the map of pending files per checkpoint
- final Map> pendingCommitters = state.getPendingPerCheckpoint();
+ final Map> pendingCommitters = state.getCommittableFilesPerCheckpoint();
// manually keep the version here to safe some bytes
out.writeInt(commitableSerializer.getVersion());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index e62c425fc2feb..405285ea6204c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -26,9 +27,6 @@
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.annotation.Nullable;
import java.io.IOException;
@@ -44,19 +42,18 @@
* this class to the lifecycle of the operator.
*
* @param The type of input elements.
- * @param The type of ids for the buckets, as returned by the {@link Bucketer}.
+ * @param The type of ids for the buckets, as returned by the {@link BucketAssigner}.
*/
+@Internal
public class Buckets {
- private static final Logger LOG = LoggerFactory.getLogger(Buckets.class);
-
// ------------------------ configuration fields --------------------------
private final Path basePath;
private final BucketFactory bucketFactory;
- private final Bucketer bucketer;
+ private final BucketAssigner bucketAssigner;
private final PartFileWriter.PartFileFactory partFileWriterFactory;
@@ -70,33 +67,33 @@ public class Buckets {
private final Map> activeBuckets;
- private long maxPartCounterUsed;
+ private long maxPartCounter;
- private final RecoverableWriter fileSystemWriter;
+ private final RecoverableWriter fsWriter;
// --------------------------- State Related Fields -----------------------------
private final BucketStateSerializer bucketStateSerializer;
/**
- * A private constructor creating a new empty bucket manager.
+ * A constructor creating a new empty bucket manager.
*
* @param basePath The base path for our buckets.
- * @param bucketer The {@link Bucketer} provided by the user.
+ * @param bucketAssigner The {@link BucketAssigner} provided by the user.
* @param bucketFactory The {@link BucketFactory} to be used to create buckets.
* @param partFileWriterFactory The {@link PartFileWriter.PartFileFactory} to be used when writing data.
* @param rollingPolicy The {@link RollingPolicy} as specified by the user.
*/
Buckets(
final Path basePath,
- final Bucketer bucketer,
+ final BucketAssigner bucketAssigner,
final BucketFactory bucketFactory,
final PartFileWriter.PartFileFactory partFileWriterFactory,
final RollingPolicy rollingPolicy,
final int subtaskIndex) throws IOException {
this.basePath = Preconditions.checkNotNull(basePath);
- this.bucketer = Preconditions.checkNotNull(bucketer);
+ this.bucketAssigner = Preconditions.checkNotNull(bucketAssigner);
this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
this.partFileWriterFactory = Preconditions.checkNotNull(partFileWriterFactory);
this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
@@ -105,75 +102,86 @@ public class Buckets {
this.activeBuckets = new HashMap<>();
this.bucketerContext = new Buckets.BucketerContext();
- this.fileSystemWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
+ this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
this.bucketStateSerializer = new BucketStateSerializer<>(
- fileSystemWriter.getResumeRecoverableSerializer(),
- fileSystemWriter.getCommitRecoverableSerializer(),
- bucketer.getSerializer()
+ fsWriter.getResumeRecoverableSerializer(),
+ fsWriter.getCommitRecoverableSerializer(),
+ bucketAssigner.getSerializer()
);
- this.maxPartCounterUsed = 0L;
+ this.maxPartCounter = 0L;
}
/**
* Initializes the state after recovery from a failure.
+ *
+ * During this process:
+ *
+ * - we set the initial value for part counter to the maximum value used before across all tasks and buckets.
+ * This guarantees that we do not overwrite valid data,
+ * - we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),
+ * - we resume writing to the previous in-progress file of each bucket, and
+ * - if we receive multiple states for the same bucket, we merge them.
+ *
* @param bucketStates the state holding recovered state about active buckets.
* @param partCounterState the state holding the max previously used part counters.
- * @throws Exception
+ * @throws Exception if anything goes wrong during retrieving the state or restoring/committing of any
+ * in-progress/pending part files
*/
void initializeState(final ListState bucketStates, final ListState partCounterState) throws Exception {
+ initializePartCounter(partCounterState);
+ initializeActiveBuckets(bucketStates);
+ }
- // When resuming after a failure:
- // 1) we get the max part counter used before in order to make sure that we do not overwrite valid data
- // 2) we commit any pending files for previous checkpoints (previous to the last successful one)
- // 3) we resume writing to the previous in-progress file of each bucket, and
- // 4) if we receive multiple states for the same bucket, we merge them.
-
- // get the max counter
+ private void initializePartCounter(final ListState partCounterState) throws Exception {
long maxCounter = 0L;
for (long partCounter: partCounterState.get()) {
maxCounter = Math.max(partCounter, maxCounter);
}
- maxPartCounterUsed = maxCounter;
+ maxPartCounter = maxCounter;
+ }
- // get the restored buckets
- for (byte[] recoveredState : bucketStates.get()) {
- final BucketState bucketState = SimpleVersionedSerialization.readVersionAndDeSerialize(
- bucketStateSerializer, recoveredState);
+ private void initializeActiveBuckets(final ListState bucketStates) throws Exception {
+ for (byte[] serializedRecoveredState : bucketStates.get()) {
+ final BucketState recoveredState =
+ SimpleVersionedSerialization.readVersionAndDeSerialize(
+ bucketStateSerializer, serializedRecoveredState);
+ handleRestoredBucketState(recoveredState);
+ }
+ }
- final BucketID bucketId = bucketState.getBucketId();
+ private void handleRestoredBucketState(final BucketState recoveredState) throws Exception {
+ final BucketID bucketId = recoveredState.getBucketId();
- LOG.info("Recovered bucket for {}", bucketId);
+ final Bucket restoredBucket = bucketFactory
+ .restoreBucket(
+ fsWriter,
+ subtaskIndex,
+ maxPartCounter,
+ partFileWriterFactory,
+ rollingPolicy,
+ recoveredState
+ );
- final Bucket restoredBucket = bucketFactory.restoreBucket(
- fileSystemWriter,
- subtaskIndex,
- maxPartCounterUsed,
- partFileWriterFactory,
- bucketState
- );
-
- final Bucket existingBucket = activeBuckets.get(bucketId);
- if (existingBucket == null) {
- activeBuckets.put(bucketId, restoredBucket);
- } else {
- existingBucket.merge(restoredBucket);
- }
+ updateActiveBucketId(bucketId, restoredBucket);
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} idx {} restored state for bucket {}", getClass().getSimpleName(),
- subtaskIndex, assembleBucketPath(bucketId));
- }
+ private void updateActiveBucketId(final BucketID bucketId, final Bucket restoredBucket) throws IOException {
+ final Bucket bucket = activeBuckets.get(bucketId);
+ if (bucket != null) {
+ bucket.merge(restoredBucket);
+ } else {
+ activeBuckets.put(bucketId, restoredBucket);
}
}
- void publishUpToCheckpoint(long checkpointId) throws IOException {
+ void commitUpToCheckpoint(final long checkpointId) throws IOException {
final Iterator>> activeBucketIt =
activeBuckets.entrySet().iterator();
while (activeBucketIt.hasNext()) {
- Bucket bucket = activeBucketIt.next().getValue();
- bucket.onCheckpointAcknowledgment(checkpointId);
+ final Bucket bucket = activeBucketIt.next().getValue();
+ bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);
if (!bucket.isActive()) {
// We've dealt with all the pending files and the writer for this bucket is not currently open.
@@ -185,35 +193,32 @@ void publishUpToCheckpoint(long checkpointId) throws IOException {
void snapshotState(
final long checkpointId,
- final ListState bucketStates,
- final ListState partCounterState) throws Exception {
+ final ListState bucketStatesContainer,
+ final ListState partCounterStateContainer) throws Exception {
Preconditions.checkState(
- fileSystemWriter != null && bucketStateSerializer != null,
- "sink has not been initialized"
- );
+ fsWriter != null && bucketStateSerializer != null,
+ "sink has not been initialized");
+
+ snapshotActiveBuckets(checkpointId, bucketStatesContainer);
+ partCounterStateContainer.add(maxPartCounter);
+ }
+
+ private void snapshotActiveBuckets(
+ final long checkpointId,
+ final ListState bucketStatesContainer) throws Exception {
for (Bucket bucket : activeBuckets.values()) {
- final PartFileInfo info = bucket.getInProgressPartInfo();
+ final BucketState bucketState = bucket.onReceptionOfCheckpoint(checkpointId);
- if (info != null && rollingPolicy.shouldRollOnCheckpoint(info)) {
- bucket.closePartFile();
- }
+ final byte[] serializedBucketState = SimpleVersionedSerialization
+ .writeVersionAndSerialize(bucketStateSerializer, bucketState);
- final BucketState bucketState = bucket.onCheckpoint(checkpointId);
- bucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState));
+ bucketStatesContainer.add(serializedBucketState);
}
-
- partCounterState.add(maxPartCounterUsed);
}
- /**
- * Called on every incoming element to write it to its final location.
- * @param value the element itself.
- * @param context the {@link SinkFunction.Context context} available to the sink function.
- * @throws Exception
- */
- void onElement(IN value, SinkFunction.Context context) throws Exception {
+ void onElement(final IN value, final SinkFunction.Context context) throws Exception {
final long currentProcessingTime = context.currentProcessingTime();
// setting the values in the bucketer context
@@ -222,79 +227,56 @@ void onElement(IN value, SinkFunction.Context context) throws Exception {
context.currentWatermark(),
currentProcessingTime);
- final BucketID bucketId = bucketer.getBucketId(value, bucketerContext);
+ final BucketID bucketId = bucketAssigner.getBucketId(value, bucketerContext);
+ final Bucket bucket = getOrCreateBucketForBucketId(bucketId);
+ bucket.write(value, currentProcessingTime);
+
+ // we update the global max counter here because as buckets become inactive and
+ // get removed from the list of active buckets, at the time when we want to create
+ // another part file for the bucket, if we start from 0 we may overwrite previous parts.
+
+ this.maxPartCounter = Math.max(maxPartCounter, bucket.getPartCounter());
+ }
+ private Bucket getOrCreateBucketForBucketId(final BucketID bucketId) throws IOException {
Bucket bucket = activeBuckets.get(bucketId);
if (bucket == null) {
final Path bucketPath = assembleBucketPath(bucketId);
bucket = bucketFactory.getNewBucket(
- fileSystemWriter,
+ fsWriter,
subtaskIndex,
bucketId,
bucketPath,
- maxPartCounterUsed,
- partFileWriterFactory);
+ maxPartCounter,
+ partFileWriterFactory,
+ rollingPolicy);
activeBuckets.put(bucketId, bucket);
}
-
- final PartFileInfo info = bucket.getInProgressPartInfo();
- if (info == null || rollingPolicy.shouldRollOnEvent(info, value)) {
-
- // info will be null if there is no currently open part file. This
- // is the case when we have a new, just created bucket or a bucket
- // that has not received any data after the closing of its previously
- // open in-progress file due to the specified rolling policy.
-
- bucket.rollPartFile(currentProcessingTime);
- }
- bucket.write(value, currentProcessingTime);
-
- // we update the counter here because as buckets become inactive and
- // get removed in the initializeState(), at the time we snapshot they
- // may not be there to take them into account during checkpointing.
- updateMaxPartCounter(bucket.getPartCounter());
+ return bucket;
}
void onProcessingTime(long timestamp) throws Exception {
for (Bucket bucket : activeBuckets.values()) {
- final PartFileInfo info = bucket.getInProgressPartInfo();
- if (info != null && rollingPolicy.shouldRollOnProcessingTime(info, timestamp)) {
- bucket.closePartFile();
- }
+ bucket.onProcessingTime(timestamp);
}
}
void close() {
if (activeBuckets != null) {
- activeBuckets.values().forEach(Bucket::dispose);
+ activeBuckets.values().forEach(Bucket::disposePartFile);
}
}
- /**
- * Assembles the final bucket {@link Path} that will be used for the provided bucket in the
- * underlying filesystem.
- * @param bucketId the id of the bucket as returned by the {@link Bucketer}.
- * @return The resulting path.
- */
private Path assembleBucketPath(BucketID bucketId) {
return new Path(basePath, bucketId.toString());
}
/**
- * Updates the state keeping track of the maximum used part
- * counter across all local active buckets.
- * @param candidate the part counter that will potentially replace the current {@link #maxPartCounterUsed}.
- */
- private void updateMaxPartCounter(long candidate) {
- maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate);
- }
-
- /**
- * The {@link Bucketer.Context} exposed to the
- * {@link Bucketer#getBucketId(Object, Bucketer.Context)}
+ * The {@link BucketAssigner.Context} exposed to the
+ * {@link BucketAssigner#getBucketId(Object, BucketAssigner.Context)}
* whenever a new incoming element arrives.
*/
- private static final class BucketerContext implements Bucketer.Context {
+ private static final class BucketerContext implements BucketAssigner.Context {
@Nullable
private Long elementTimestamp;
@@ -309,8 +291,8 @@ private BucketerContext() {
this.currentProcessingTime = Long.MIN_VALUE;
}
- void update(@Nullable Long element, long watermark, long processingTime) {
- this.elementTimestamp = element;
+ void update(@Nullable Long elementTimestamp, long watermark, long processingTime) {
+ this.elementTimestamp = elementTimestamp;
this.currentWatermark = watermark;
this.currentProcessingTime = processingTime;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index 7b8c8fe5c0470..005ae4e737fb9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -66,7 +66,7 @@ RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
/**
* A factory that creates {@link BulkPartWriter BulkPartWriters}.
* @param The type of input elements.
- * @param The type of ids for the buckets, as returned by the {@link Bucketer}.
+ * @param The type of ids for the buckets, as returned by the {@link BucketAssigner}.
*/
static class Factory implements PartFileWriter.PartFileFactory {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
similarity index 85%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
index 532138f1ba20c..1d2edbcaeb543 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
@@ -28,7 +28,7 @@
* A factory returning {@link Bucket buckets}.
*/
@Internal
-class DefaultBucketFactory implements BucketFactory {
+class DefaultBucketFactoryImpl implements BucketFactory {
private static final long serialVersionUID = 1L;
@@ -39,15 +39,17 @@ public Bucket getNewBucket(
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
- final PartFileWriter.PartFileFactory partFileWriterFactory) {
+ final PartFileWriter.PartFileFactory partFileWriterFactory,
+ final RollingPolicy rollingPolicy) {
- return new Bucket<>(
+ return Bucket.getNew(
fsWriter,
subtaskIndex,
bucketId,
bucketPath,
initialPartCounter,
- partFileWriterFactory);
+ partFileWriterFactory,
+ rollingPolicy);
}
@Override
@@ -56,13 +58,15 @@ public Bucket restoreBucket(
final int subtaskIndex,
final long initialPartCounter,
final PartFileWriter.PartFileFactory partFileWriterFactory,
+ final RollingPolicy rollingPolicy,
final BucketState bucketState) throws IOException {
- return new Bucket<>(
+ return Bucket.restore(
fsWriter,
subtaskIndex,
initialPartCounter,
partFileWriterFactory,
+ rollingPolicy,
bucketState);
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
index dbd62a27d8c13..7750acd7e5877 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
@@ -32,7 +32,7 @@ public interface PartFileInfo {
/**
* @return The bucket identifier of the current buffer, as returned by the
- * {@link Bucketer#getBucketId(Object, Bucketer.Context)}.
+ * {@link BucketAssigner#getBucketId(Object, BucketAssigner.Context)}.
*/
BucketID getBucketId();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index 269b12c12f5b2..2478b79a52797 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -54,7 +54,7 @@ void write(IN element, long currentTime) throws IOException {
/**
* A factory that creates {@link RowWisePartWriter RowWisePartWriters}.
* @param The type of input elements.
- * @param The type of ids for the buckets, as returned by the {@link Bucketer}.
+ * @param The type of ids for the buckets, as returned by the {@link BucketAssigner}.
*/
static class Factory implements PartFileWriter.PartFileFactory {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 7635e37247823..6c7e135caf820 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -36,7 +36,7 @@
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -58,11 +58,11 @@
* These part files contain the actual output data.
*
*
- * The sink uses a {@link Bucketer} to determine in which bucket directory each element should
- * be written to inside the base directory. The {@code Bucketer} can, for example, use time or
- * a property of the element to determine the bucket directory. The default {@code Bucketer} is a
- * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify
- * a custom {@code Bucketer} using the {@code setBucketer(Bucketer)} method, after calling
+ *
The sink uses a {@link BucketAssigner} to determine in which bucket directory each element should
+ * be written to inside the base directory. The {@code BucketAssigner} can, for example, use time or
+ * a property of the element to determine the bucket directory. The default {@code BucketAssigner} is a
+ * {@link DateTimeBucketAssigner} which will create one new bucket every hour. You can specify
+ * a custom {@code BucketAssigner} using the {@code setBucketAssigner(bucketAssigner)} method, after calling
* {@link StreamingFileSink#forRowFormat(Path, Encoder)} or
* {@link StreamingFileSink#forBulkFormat(Path, BulkWriter.Factory)}.
*
@@ -151,7 +151,7 @@ private StreamingFileSink(
*/
public static StreamingFileSink.RowFormatBuilder forRowFormat(
final Path basePath, final Encoder encoder) {
- return new StreamingFileSink.RowFormatBuilder<>(basePath, encoder, new DateTimeBucketer<>());
+ return new StreamingFileSink.RowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
}
/**
@@ -164,7 +164,7 @@ public static StreamingFileSink.RowFormatBuilder forRowFormat(
*/
public static StreamingFileSink.BulkFormatBuilder forBulkFormat(
final Path basePath, final BulkWriter.Factory writerFactory) {
- return new StreamingFileSink.BulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketer<>());
+ return new StreamingFileSink.BulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketAssigner<>());
}
/**
@@ -191,50 +191,45 @@ public static class RowFormatBuilder extends StreamingFileSink.Buc
private final Encoder encoder;
- private final Bucketer bucketer;
+ private final BucketAssigner bucketAssigner;
private final RollingPolicy rollingPolicy;
private final BucketFactory bucketFactory;
- RowFormatBuilder(Path basePath, Encoder encoder, Bucketer bucketer) {
- this(basePath, encoder, bucketer, DefaultRollingPolicy.create().build(), 60L * 1000L, new DefaultBucketFactory<>());
+ RowFormatBuilder(Path basePath, Encoder encoder, BucketAssigner bucketAssigner) {
+ this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.create().build(), 60L * 1000L, new DefaultBucketFactoryImpl<>());
}
private RowFormatBuilder(
Path basePath,
Encoder encoder,
- Bucketer bucketer,
- RollingPolicy rollingPolicy,
+ BucketAssigner assigner,
+ RollingPolicy policy,
long bucketCheckInterval,
BucketFactory bucketFactory) {
this.basePath = Preconditions.checkNotNull(basePath);
this.encoder = Preconditions.checkNotNull(encoder);
- this.bucketer = Preconditions.checkNotNull(bucketer);
- this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+ this.bucketAssigner = Preconditions.checkNotNull(assigner);
+ this.rollingPolicy = Preconditions.checkNotNull(policy);
this.bucketCheckInterval = bucketCheckInterval;
this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
}
public StreamingFileSink.RowFormatBuilder withBucketCheckInterval(final long interval) {
- return new RowFormatBuilder<>(basePath, encoder, bucketer, rollingPolicy, interval, bucketFactory);
+ return new RowFormatBuilder<>(basePath, encoder, bucketAssigner, rollingPolicy, interval, bucketFactory);
}
- public StreamingFileSink.RowFormatBuilder withBucketer(final Bucketer bucketer) {
- return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(bucketer), rollingPolicy, bucketCheckInterval, bucketFactory);
+ public StreamingFileSink.RowFormatBuilder withBucketAssigner(final BucketAssigner assigner) {
+ return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(assigner), rollingPolicy, bucketCheckInterval, bucketFactory);
}
public StreamingFileSink.RowFormatBuilder withRollingPolicy(final RollingPolicy policy) {
- return new RowFormatBuilder<>(basePath, encoder, bucketer, Preconditions.checkNotNull(policy), bucketCheckInterval, bucketFactory);
+ return new RowFormatBuilder<>(basePath, encoder, bucketAssigner, Preconditions.checkNotNull(policy), bucketCheckInterval, bucketFactory);
}
- public StreamingFileSink.RowFormatBuilder withBucketerAndPolicy(final Bucketer bucketer, final RollingPolicy policy) {
- return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(bucketer), Preconditions.checkNotNull(policy), bucketCheckInterval, new DefaultBucketFactory<>());
- }
-
- @VisibleForTesting
- StreamingFileSink.RowFormatBuilder withBucketFactory(final BucketFactory factory) {
- return new RowFormatBuilder<>(basePath, encoder, bucketer, rollingPolicy, bucketCheckInterval, Preconditions.checkNotNull(factory));
+ public StreamingFileSink.RowFormatBuilder withBucketAssignerAndPolicy(final BucketAssigner assigner, final RollingPolicy policy) {
+ return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), bucketCheckInterval, new DefaultBucketFactoryImpl<>());
}
/** Creates the actual sink. */
@@ -246,12 +241,17 @@ public StreamingFileSink build() {
Buckets createBuckets(int subtaskIndex) throws IOException {
return new Buckets<>(
basePath,
- bucketer,
+ bucketAssigner,
bucketFactory,
new RowWisePartWriter.Factory<>(encoder),
rollingPolicy,
subtaskIndex);
}
+
+ @VisibleForTesting
+ StreamingFileSink.RowFormatBuilder withBucketFactory(final BucketFactory factory) {
+ return new RowFormatBuilder<>(basePath, encoder, bucketAssigner, rollingPolicy, bucketCheckInterval, Preconditions.checkNotNull(factory));
+ }
}
/**
@@ -268,38 +268,38 @@ public static class BulkFormatBuilder extends StreamingFileSink.Bu
private final BulkWriter.Factory writerFactory;
- private final Bucketer bucketer;
+ private final BucketAssigner bucketAssigner;
private final BucketFactory bucketFactory;
- BulkFormatBuilder(Path basePath, BulkWriter.Factory writerFactory, Bucketer bucketer) {
- this(basePath, writerFactory, bucketer, 60L * 1000L, new DefaultBucketFactory<>());
+ BulkFormatBuilder(Path basePath, BulkWriter.Factory writerFactory, BucketAssigner assigner) {
+ this(basePath, writerFactory, assigner, 60L * 1000L, new DefaultBucketFactoryImpl<>());
}
private BulkFormatBuilder(
Path basePath,
BulkWriter.Factory writerFactory,
- Bucketer bucketer,
+ BucketAssigner assigner,
long bucketCheckInterval,
BucketFactory bucketFactory) {
this.basePath = Preconditions.checkNotNull(basePath);
this.writerFactory = writerFactory;
- this.bucketer = Preconditions.checkNotNull(bucketer);
+ this.bucketAssigner = Preconditions.checkNotNull(assigner);
this.bucketCheckInterval = bucketCheckInterval;
this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
}
public StreamingFileSink.BulkFormatBuilder withBucketCheckInterval(long interval) {
- return new BulkFormatBuilder<>(basePath, writerFactory, bucketer, interval, bucketFactory);
+ return new BulkFormatBuilder<>(basePath, writerFactory, bucketAssigner, interval, bucketFactory);
}
- public StreamingFileSink.BulkFormatBuilder withBucketer(Bucketer bucketer) {
- return new BulkFormatBuilder<>(basePath, writerFactory, Preconditions.checkNotNull(bucketer), bucketCheckInterval, new DefaultBucketFactory<>());
+ public StreamingFileSink.BulkFormatBuilder withBucketAssigner(BucketAssigner assigner) {
+ return new BulkFormatBuilder<>(basePath, writerFactory, Preconditions.checkNotNull(assigner), bucketCheckInterval, new DefaultBucketFactoryImpl<>());
}
@VisibleForTesting
StreamingFileSink.BulkFormatBuilder withBucketFactory(final BucketFactory factory) {
- return new BulkFormatBuilder<>(basePath, writerFactory, bucketer, bucketCheckInterval, Preconditions.checkNotNull(factory));
+ return new BulkFormatBuilder<>(basePath, writerFactory, bucketAssigner, bucketCheckInterval, Preconditions.checkNotNull(factory));
}
/** Creates the actual sink. */
@@ -311,10 +311,10 @@ public StreamingFileSink build() {
Buckets createBuckets(int subtaskIndex) throws IOException {
return new Buckets<>(
basePath,
- bucketer,
+ bucketAssigner,
bucketFactory,
new BulkPartWriter.Factory<>(writerFactory),
- new OnCheckpointRollingPolicy<>(),
+ OnCheckpointRollingPolicy.build(),
subtaskIndex);
}
}
@@ -337,7 +337,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
- buckets.publishUpToCheckpoint(checkpointId);
+ buckets.commitUpToCheckpoint(checkpointId);
}
@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.java
similarity index 84%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.java
index c35ba8031c26f..3633004cc5d15 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.java
@@ -16,23 +16,23 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+package org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.api.functions.sink.filesystem.Bucketer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
/**
- * A {@link Bucketer} that does not perform any
+ * A {@link BucketAssigner} that does not perform any
* bucketing of files. All files are written to the base path.
*/
@PublicEvolving
-public class BasePathBucketer implements Bucketer {
+public class BasePathBucketAssigner implements BucketAssigner {
private static final long serialVersionUID = -6033643155550226022L;
@Override
- public String getBucketId(T element, Bucketer.Context context) {
+ public String getBucketId(T element, BucketAssigner.Context context) {
return "";
}
@@ -44,6 +44,6 @@ public SimpleVersionedSerializer getSerializer() {
@Override
public String toString() {
- return "BasePathBucketer";
+ return "BasePathBucketAssigner";
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
similarity index 87%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
index d226d200f2f79..5d30f39fa2ed0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
@@ -16,17 +16,17 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+package org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.api.functions.sink.filesystem.Bucketer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
- * A {@link Bucketer} that assigns to buckets based on current system time.
+ * A {@link BucketAssigner} that assigns to buckets based on current system time.
*
*
* The {@code DateTimeBucketer} will create directories of the following form:
@@ -52,7 +52,7 @@
*
*/
@PublicEvolving
-public class DateTimeBucketer implements Bucketer {
+public class DateTimeBucketAssigner implements BucketAssigner {
private static final long serialVersionUID = 1L;
@@ -65,7 +65,7 @@ public class DateTimeBucketer implements Bucketer {
/**
* Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}.
*/
- public DateTimeBucketer() {
+ public DateTimeBucketAssigner() {
this(DEFAULT_FORMAT_STRING);
}
@@ -75,12 +75,12 @@ public DateTimeBucketer() {
* @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
* the bucket path.
*/
- public DateTimeBucketer(String formatString) {
+ public DateTimeBucketAssigner(String formatString) {
this.formatString = formatString;
}
@Override
- public String getBucketId(IN element, Bucketer.Context context) {
+ public String getBucketId(IN element, BucketAssigner.Context context) {
if (dateFormatter == null) {
dateFormatter = new SimpleDateFormat(formatString);
}
@@ -94,8 +94,6 @@ public SimpleVersionedSerializer getSerializer() {
@Override
public String toString() {
- return "DateTimeBucketer{" +
- "formatString='" + formatString + '\'' +
- '}';
+ return "DateTimeBucketAssigner{formatString='" + formatString + '\'' + '}';
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/SimpleVersionedStringSerializer.java
similarity index 96%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/SimpleVersionedStringSerializer.java
index d025af976507e..4726aff967c9a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/SimpleVersionedStringSerializer.java
@@ -16,8 +16,9 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+package org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
@@ -29,6 +30,7 @@
/**
* A {@link SimpleVersionedSerializer} implementation for Strings.
*/
+@PublicEvolving
public final class SimpleVersionedStringSerializer implements SimpleVersionedSerializer {
private static final Charset CHARSET = StandardCharsets.UTF_8;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
index b4b1cebf88cdb..7c75f1c5e2563 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
@@ -87,7 +87,10 @@ public boolean shouldRollOnProcessingTime(final PartFileInfo partFileS
* To finalize it and have the actual policy, call {@code .create()}.
*/
public static DefaultRollingPolicy.PolicyBuilder create() {
- return new DefaultRollingPolicy.PolicyBuilder();
+ return new DefaultRollingPolicy.PolicyBuilder(
+ DEFAULT_MAX_PART_SIZE,
+ DEFAULT_ROLLOVER_INTERVAL,
+ DEFAULT_INACTIVITY_INTERVAL);
}
/**
@@ -96,42 +99,46 @@ public static DefaultRollingPolicy.PolicyBuilder create() {
@PublicEvolving
public static final class PolicyBuilder {
- private long partSize = DEFAULT_MAX_PART_SIZE;
+ private final long partSize;
- private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL;
+ private final long rolloverInterval;
- private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL;
+ private final long inactivityInterval;
- private PolicyBuilder() {}
+ private PolicyBuilder(
+ final long partSize,
+ final long rolloverInterval,
+ final long inactivityInterval) {
+ this.partSize = partSize;
+ this.rolloverInterval = rolloverInterval;
+ this.inactivityInterval = inactivityInterval;
+ }
/**
* Sets the part size above which a part file will have to roll.
* @param size the allowed part size.
*/
- public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(long size) {
+ public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(final long size) {
Preconditions.checkState(size > 0L);
- this.partSize = size;
- return this;
+ return new PolicyBuilder(size, rolloverInterval, inactivityInterval);
}
/**
* Sets the interval of allowed inactivity after which a part file will have to roll.
* @param interval the allowed inactivity interval.
*/
- public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(long interval) {
+ public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(final long interval) {
Preconditions.checkState(interval > 0L);
- this.inactivityInterval = interval;
- return this;
+ return new PolicyBuilder(partSize, rolloverInterval, interval);
}
/**
* Sets the max time a part file can stay open before having to roll.
* @param interval the desired rollover interval.
*/
- public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(long interval) {
+ public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(final long interval) {
Preconditions.checkState(interval > 0L);
- this.rolloverInterval = interval;
- return this;
+ return new PolicyBuilder(partSize, interval, inactivityInterval);
}
/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
index 9ad8172e9de90..53fce082c3bd6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
@@ -18,16 +18,20 @@
package org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
/**
* A {@link RollingPolicy} which rolls on every checkpoint.
*/
+@PublicEvolving
public class OnCheckpointRollingPolicy implements RollingPolicy {
private static final long serialVersionUID = 1L;
+ private OnCheckpointRollingPolicy() {}
+
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo partFileState) {
return true;
@@ -42,4 +46,8 @@ public boolean shouldRollOnEvent(PartFileInfo partFileState, IN elemen
public boolean shouldRollOnProcessingTime(PartFileInfo partFileState, long currentTime) {
return false;
}
+
+ public static OnCheckpointRollingPolicy build() {
+ return new OnCheckpointRollingPolicy<>();
+ }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
index 3d5be6340ff15..55360a4a35380 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -25,7 +25,7 @@
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.junit.Assert;
import org.junit.ClassRule;
@@ -75,8 +75,8 @@ public void testSerializationEmpty() throws IOException {
final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(testBucket, recoveredState.getBucketPath());
- Assert.assertNull(recoveredState.getInProgress());
- Assert.assertTrue(recoveredState.getPendingPerCheckpoint().isEmpty());
+ Assert.assertNull(recoveredState.getInProgressResumableFile());
+ Assert.assertTrue(recoveredState.getCommittableFilesPerCheckpoint().isEmpty());
}
@Test
@@ -166,7 +166,7 @@ public void testSerializationFull() throws IOException {
Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
- final Map> recoveredRecoverables = recoveredState.getPendingPerCheckpoint();
+ final Map> recoveredRecoverables = recoveredState.getCommittableFilesPerCheckpoint();
Assert.assertEquals(5L, recoveredRecoverables.size());
// recover and commit
@@ -238,9 +238,9 @@ public void testSerializationNullInProgress() throws IOException {
final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
- Assert.assertNull(recoveredState.getInProgress());
+ Assert.assertNull(recoveredState.getInProgressResumableFile());
- final Map> recoveredRecoverables = recoveredState.getPendingPerCheckpoint();
+ final Map> recoveredRecoverables = recoveredState.getCommittableFilesPerCheckpoint();
Assert.assertEquals(5L, recoveredRecoverables.size());
// recover and commit
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index 5e8eb6d9ba7c7..25622d14466fe 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -22,7 +22,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.junit.Assert;
import org.junit.ClassRule;
@@ -58,7 +58,7 @@ private void testCorrectPassingOfContext(Long timestamp, long watermark, long pr
final Buckets buckets = StreamingFileSink
.forRowFormat(new Path(outDir.toURI()), new SimpleStringEncoder<>())
- .withBucketer(new VarifyingBucketer(expectedTimestamp, expectedWatermark, expectedProcessingTime))
+ .withBucketAssigner(new VarifyingBucketer(expectedTimestamp, expectedWatermark, expectedProcessingTime))
.createBuckets(2);
buckets.onElement("TEST", new SinkFunction.Context() {
@@ -79,7 +79,7 @@ public Long timestamp() {
});
}
- private static class VarifyingBucketer implements Bucketer {
+ private static class VarifyingBucketer implements BucketAssigner {
private static final long serialVersionUID = 7729086510972377578L;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
index 7b6b82cd4eb72..20890c07a930f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
@@ -59,7 +59,7 @@ public void testCustomBulkWriter() throws Exception {
10L,
new TestUtils.TupleToStringBucketer(),
new TestBulkWriterFactory(),
- new DefaultBucketFactory<>())
+ new DefaultBucketFactoryImpl<>())
) {
testHarness.setup();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index a0c438e184705..ab487d168bc62 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -491,8 +491,8 @@ public void testMaxCounterUponRecovery() throws Exception {
OperatorSubtaskState mergedSnapshot;
- final TestBucketFactory first = new TestBucketFactory();
- final TestBucketFactory second = new TestBucketFactory();
+ final TestBucketFactoryImpl first = new TestBucketFactoryImpl();
+ final TestBucketFactoryImpl second = new TestBucketFactoryImpl();
final RollingPolicy, String> rollingPolicy = DefaultRollingPolicy
.create()
@@ -526,8 +526,8 @@ public void testMaxCounterUponRecovery() throws Exception {
);
}
- final TestBucketFactory firstRecovered = new TestBucketFactory();
- final TestBucketFactory secondRecovered = new TestBucketFactory();
+ final TestBucketFactoryImpl firstRecovered = new TestBucketFactoryImpl();
+ final TestBucketFactoryImpl secondRecovered = new TestBucketFactoryImpl();
try (
OneInputStreamOperatorTestHarness, Object> testHarness1 = TestUtils.createCustomRescalingTestSink(
@@ -559,7 +559,7 @@ public void testMaxCounterUponRecovery() throws Exception {
////////////////////// Helper Methods //////////////////////
- static class TestBucketFactory extends DefaultBucketFactory, String> {
+ static class TestBucketFactoryImpl extends DefaultBucketFactoryImpl, String> {
private static final long serialVersionUID = 2794824980604027930L;
@@ -572,7 +572,8 @@ public Bucket, String> getNewBucket(
final String bucketId,
final Path bucketPath,
final long initialPartCounter,
- final PartFileWriter.PartFileFactory, String> partFileWriterFactory) {
+ final PartFileWriter.PartFileFactory, String> partFileWriterFactory,
+ final RollingPolicy, String> rollingPolicy) {
this.initialCounter = initialPartCounter;
@@ -582,7 +583,8 @@ public Bucket, String> getNewBucket(
bucketId,
bucketPath,
initialPartCounter,
- partFileWriterFactory);
+ partFileWriterFactory,
+ rollingPolicy);
}
@Override
@@ -591,6 +593,7 @@ public Bucket, String> restoreBucket(
final int subtaskIndex,
final long initialPartCounter,
final PartFileWriter.PartFileFactory