Skip to content

Commit

Permalink
[FLINK-10029][DataStream API] Refactoring the StreamingFileSink code.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Aug 2, 2018
1 parent 94ca19e commit 1b0baa1
Show file tree
Hide file tree
Showing 22 changed files with 457 additions and 383 deletions.
Expand Up @@ -18,7 +18,7 @@


package org.apache.flink.streaming.api.functions.sink.filesystem; package org.apache.flink.streaming.api.functions.sink.filesystem;


import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
Expand All @@ -34,11 +34,11 @@
/** /**
* A bucket is the directory organization of the output of the {@link StreamingFileSink}. * A bucket is the directory organization of the output of the {@link StreamingFileSink}.
* *
* <p>For each incoming element in the {@code BucketingSink}, the user-specified * <p>For each incoming element in the {@code StreamingFileSink}, the user-specified
* {@link Bucketer Bucketer} is * {@link BucketAssigner Bucketer} is queried to see in which bucket this element should
* queried to see in which bucket this element should be written to. * be written to.
*/ */
@PublicEvolving @Internal
public class Bucket<IN, BucketID> { public class Bucket<IN, BucketID> {


private static final String PART_PREFIX = "part"; private static final String PART_PREFIX = "part";
Expand All @@ -53,175 +53,242 @@ public class Bucket<IN, BucketID> {


private final RecoverableWriter fsWriter; private final RecoverableWriter fsWriter;


private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint = new HashMap<>(); private final RollingPolicy<IN, BucketID> rollingPolicy;


private long partCounter; private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPartsPerCheckpoint = new HashMap<>();

private PartFileWriter<IN, BucketID> currentPart;

private List<RecoverableWriter.CommitRecoverable> pending;

/**
* Constructor to restore a bucket from checkpointed state.
*/
public Bucket(
RecoverableWriter fsWriter,
int subtaskIndex,
long initialPartCounter,
PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
BucketState<BucketID> bucketState) throws IOException {


this(fsWriter, subtaskIndex, bucketState.getBucketId(), bucketState.getBucketPath(), initialPartCounter, partFileFactory); private long partCounter;

// the constructor must have already initialized the filesystem writer
Preconditions.checkState(fsWriter != null);

// we try to resume the previous in-progress file, if the filesystem
// supports such operation. If not, we just commit the file and start fresh.


final RecoverableWriter.ResumeRecoverable resumable = bucketState.getInProgress(); private PartFileWriter<IN, BucketID> inProgressPart;
if (resumable != null) {
currentPart = partFileFactory.resumeFrom(
bucketId, fsWriter, resumable, bucketState.getCreationTime());
}


// we commit pending files for previous checkpoints to the last successful one private List<RecoverableWriter.CommitRecoverable> pendingPartsForCurrentCheckpoint;
// (from which we are recovering from)
for (List<RecoverableWriter.CommitRecoverable> commitables: bucketState.getPendingPerCheckpoint().values()) {
for (RecoverableWriter.CommitRecoverable commitable: commitables) {
fsWriter.recoverForCommit(commitable).commitAfterRecovery();
}
}
}


/** /**
* Constructor to create a new empty bucket. * Constructor to create a new empty bucket.
*/ */
public Bucket( private Bucket(
RecoverableWriter fsWriter, final RecoverableWriter fsWriter,
int subtaskIndex, final int subtaskIndex,
BucketID bucketId, final BucketID bucketId,
Path bucketPath, final Path bucketPath,
long initialPartCounter, final long initialPartCounter,
PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory) { final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy) {


this.fsWriter = Preconditions.checkNotNull(fsWriter); this.fsWriter = Preconditions.checkNotNull(fsWriter);
this.subtaskIndex = subtaskIndex; this.subtaskIndex = subtaskIndex;
this.bucketId = Preconditions.checkNotNull(bucketId); this.bucketId = Preconditions.checkNotNull(bucketId);
this.bucketPath = Preconditions.checkNotNull(bucketPath); this.bucketPath = Preconditions.checkNotNull(bucketPath);
this.partCounter = initialPartCounter; this.partCounter = initialPartCounter;
this.partFileFactory = Preconditions.checkNotNull(partFileFactory); this.partFileFactory = Preconditions.checkNotNull(partFileFactory);
this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);


this.pending = new ArrayList<>(); this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
} }


/** /**
* Gets the information available for the currently * Constructor to restore a bucket from checkpointed state.
* open part file, i.e. the one we are currently writing to.
*
* <p>This will be null if there is no currently open part file. This
* is the case when we have a new, just created bucket or a bucket
* that has not received any data after the closing of its previously
* open in-progress file due to the specified rolling policy.
*
* @return The information about the currently in-progress part file
* or {@code null} if there is no open part file.
*/ */
public PartFileInfo<BucketID> getInProgressPartInfo() { private Bucket(
return currentPart; final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState) throws IOException {

this(
fsWriter,
subtaskIndex,
bucketState.getBucketId(),
bucketState.getBucketPath(),
initialPartCounter,
partFileFactory,
rollingPolicy);

restoreInProgressFile(bucketState);
commitRecoveredPendingFiles(bucketState);
}

private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
// we try to resume the previous in-progress file
if (state.hasInProgressResumableFile()) {
final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
inProgressPart = partFileFactory.resumeFrom(
bucketId, fsWriter, resumable, state.getInProgressFileCreationTime());
}
} }


public BucketID getBucketId() { private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException {

// we commit pending files for checkpoints that precess the last successful one, from which we are recovering
for (List<RecoverableWriter.CommitRecoverable> committables: state.getCommittableFilesPerCheckpoint().values()) {
for (RecoverableWriter.CommitRecoverable committable: committables) {
fsWriter.recoverForCommit(committable).commitAfterRecovery();
}
}
}

BucketID getBucketId() {
return bucketId; return bucketId;
} }


public Path getBucketPath() { Path getBucketPath() {
return bucketPath; return bucketPath;
} }


public long getPartCounter() { long getPartCounter() {
return partCounter; return partCounter;
} }


public boolean isActive() { boolean isActive() {
return currentPart != null || !pending.isEmpty() || !pendingPerCheckpoint.isEmpty(); return inProgressPart != null || !pendingPartsForCurrentCheckpoint.isEmpty() || !pendingPartsPerCheckpoint.isEmpty();
}

void merge(final Bucket<IN, BucketID> bucket) throws IOException {
Preconditions.checkNotNull(bucket);
Preconditions.checkState(Objects.equals(bucket.bucketPath, bucketPath));

// There should be no pending files in the "to-merge" states.
// The reason is that:
// 1) the pendingPartsForCurrentCheckpoint is emptied whenever we take a snapshot (see prepareBucketForCheckpointing()).
// So a snapshot, including the one we are recovering from, will never contain such files.
// 2) the files in pendingPartsPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).

Preconditions.checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
Preconditions.checkState(bucket.pendingPartsPerCheckpoint.isEmpty());

RecoverableWriter.CommitRecoverable committable = bucket.closePartFile();
if (committable != null) {
pendingPartsForCurrentCheckpoint.add(committable);
}
} }


void write(IN element, long currentTime) throws IOException { void write(IN element, long currentTime) throws IOException {
Preconditions.checkState(currentPart != null, "bucket has been closed"); if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
currentPart.write(element, currentTime); rollPartFile(currentTime);
}
inProgressPart.write(element, currentTime);
} }


void rollPartFile(final long currentTime) throws IOException { private void rollPartFile(final long currentTime) throws IOException {
closePartFile(); closePartFile();
currentPart = partFileFactory.openNew(bucketId, fsWriter, getNewPartPath(), currentTime); inProgressPart = partFileFactory.openNew(bucketId, fsWriter, assembleNewPartPath(), currentTime);
partCounter++; partCounter++;
} }


void merge(final Bucket<IN, BucketID> bucket) throws IOException { private Path assembleNewPartPath() {
Preconditions.checkNotNull(bucket); return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
Preconditions.checkState(Objects.equals(bucket.getBucketPath(), bucketPath)); }


// there should be no pending files in the "to-merge" states. private RecoverableWriter.CommitRecoverable closePartFile() throws IOException {
Preconditions.checkState(bucket.pending.isEmpty()); RecoverableWriter.CommitRecoverable committable = null;
Preconditions.checkState(bucket.pendingPerCheckpoint.isEmpty()); if (inProgressPart != null) {
committable = inProgressPart.closeForCommit();
pendingPartsForCurrentCheckpoint.add(committable);
inProgressPart = null;
}
return committable;
}


RecoverableWriter.CommitRecoverable commitable = bucket.closePartFile(); void disposePartFile() {
if (commitable != null) { if (inProgressPart != null) {
pending.add(commitable); inProgressPart.dispose();
} }
} }


RecoverableWriter.CommitRecoverable closePartFile() throws IOException { BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
RecoverableWriter.CommitRecoverable commitable = null; prepareBucketForCheckpointing(checkpointId);
if (currentPart != null) {
commitable = currentPart.closeForCommit(); RecoverableWriter.ResumeRecoverable inProgressResumable = null;
pending.add(commitable); long inProgressFileCreationTime = Long.MAX_VALUE;
currentPart = null;
if (inProgressPart != null) {
inProgressResumable = inProgressPart.persist();
inProgressFileCreationTime = inProgressPart.getCreationTime();
} }
return commitable;
return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint);
} }


public void dispose() { private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
if (currentPart != null) { if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
currentPart.dispose(); closePartFile();
}

if (!pendingPartsForCurrentCheckpoint.isEmpty()) {
pendingPartsPerCheckpoint.put(checkpointId, pendingPartsForCurrentCheckpoint);
pendingPartsForCurrentCheckpoint = new ArrayList<>();
} }
} }


public void onCheckpointAcknowledgment(long checkpointId) throws IOException { void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
Preconditions.checkNotNull(fsWriter); Preconditions.checkNotNull(fsWriter);


Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it = Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it =
pendingPerCheckpoint.entrySet().iterator(); pendingPartsPerCheckpoint.entrySet().iterator();


while (it.hasNext()) { while (it.hasNext()) {
Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> entry = it.next(); Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> entry = it.next();

if (entry.getKey() <= checkpointId) { if (entry.getKey() <= checkpointId) {
for (RecoverableWriter.CommitRecoverable commitable : entry.getValue()) { for (RecoverableWriter.CommitRecoverable committable : entry.getValue()) {
fsWriter.recoverForCommit(commitable).commit(); fsWriter.recoverForCommit(committable).commit();
} }
it.remove(); it.remove();
} }
} }
} }


public BucketState<BucketID> onCheckpoint(long checkpointId) throws IOException { void onProcessingTime(long timestamp) throws IOException {
RecoverableWriter.ResumeRecoverable resumable = null; if (inProgressPart != null && rollingPolicy.shouldRollOnProcessingTime(inProgressPart, timestamp)) {
long creationTime = Long.MAX_VALUE; closePartFile();

if (currentPart != null) {
resumable = currentPart.persist();
creationTime = currentPart.getCreationTime();
} }
}


if (!pending.isEmpty()) { // --------------------------- Static Factory Methods -----------------------------
pendingPerCheckpoint.put(checkpointId, pending);
pending = new ArrayList<>(); /**
} * Creates a new empty {@code Bucket}.
return new BucketState<>(bucketId, bucketPath, creationTime, resumable, pendingPerCheckpoint); * @param fsWriter the filesystem-specific {@link RecoverableWriter}.
* @param subtaskIndex the index of the subtask creating the bucket.
* @param bucketId the identifier of the bucket, as returned by the {@link BucketAssigner}.
* @param bucketPath the path to where the part files for the bucket will be written to.
* @param initialPartCounter the initial counter for the part files of the bucket.
* @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
* @param <IN> the type of input elements to the sink.
* @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
* @return The new Bucket.
*/
static <IN, BucketID> Bucket<IN, BucketID> getNew(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy) {
return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy);
} }


private Path getNewPartPath() { /**
return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter); * Restores a {@code Bucket} from the state included in the provided {@link BucketState}.
* @param fsWriter the filesystem-specific {@link RecoverableWriter}.
* @param subtaskIndex the index of the subtask creating the bucket.
* @param initialPartCounter the initial counter for the part files of the bucket.
* @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
* @param bucketState the initial state of the restored bucket.
* @param <IN> the type of input elements to the sink.
* @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
* @return The restored Bucket.
*/
static <IN, BucketID> Bucket<IN, BucketID> restore(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState) throws IOException {
return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState);
} }
} }

0 comments on commit 1b0baa1

Please sign in to comment.