Skip to content

Commit

Permalink
[FLINK-9903] [DataStream API] Refactor StreamingFileSink / add bulk e…
Browse files Browse the repository at this point in the history
…ncoders

* Add supports for bulk encoders.
* Expose more options in the rolling policy and
* Allows to return any object as bucket id from the bucketer.
  • Loading branch information
kl0u authored and StephanEwen committed Jul 20, 2018
1 parent d309e61 commit b56c75c
Show file tree
Hide file tree
Showing 26 changed files with 1,969 additions and 599 deletions.
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.serialization;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.fs.FSDataOutputStream;

import java.io.IOException;
import java.io.Serializable;

/**
* An encoder that encodes data in a bulk fashion, encoding many records together at a time.
*
* <p>Examples for bulk encoding are most compressed formats, including formats like
* Parquet and ORC which encode batches of records into blocks of column vectors.
*
* <p>The bulk encoder may be stateful and is bound to a single stream during its
* lifetime.
*
* @param <T> The type of the elements encoded through this encoder.
*/
@PublicEvolving
public interface BulkWriter<T> {

/**
* Adds an element to the encoder. The encoder may temporarily buffer the element,
* or immediately write it to the stream.
*
* <p>It may be that adding this element fills up an internal buffer and causes the
* encoding and flushing of a batch of internally buffered elements.
*
* @param element The element to add.
* @throws IOException Thrown, if the element cannot be added to the encoder,
* or if the output stream throws an exception.
*/
void addElement(T element) throws IOException;

/**
* Flushes all intermediate buffered data to the output stream.
* It is expected that flushing often may reduce the efficiency of the encoding.
*
* @throws IOException Thrown if the encoder cannot be flushed, or if the output
* stream throws an exception.
*/
void flush() throws IOException;

/**
* Finishes the writing. This must flush all internal buffer, finish encoding, and write
* footers.
*
* <p>The writer is not expected to handle any more records via {@link #addElement(Object)} after
* this method is called.
*
* <p><b>Important:</b> This method MUST NOT close the stream that the writer writes to.
* Closing the stream is expected to happen through the invoker of this method afterwards.
*
* @throws IOException Thrown if the finalization fails.
*/
void finish() throws IOException;

// ------------------------------------------------------------------------

/**
* A factory that creates a {@link BulkWriter}.
* @param <T> The type of record to write.
*/
@FunctionalInterface
interface Factory<T> extends Serializable {

/**
* Creates a writer that writes to the given stream.
*
* @param out The output stream to write the encoded data to.
* @throws IOException Thrown if the writer cannot be opened, or if the output
* stream throws an exception.
*/
BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}
}
Expand Up @@ -64,6 +64,11 @@ public FileStatus getFileStatus(Path f) throws IOException {
return unsafeFileSystem.getFileStatus(f);
}

@Override
public RecoverableWriter createRecoverableWriter() throws IOException {
return unsafeFileSystem.createRecoverableWriter();
}

@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
return unsafeFileSystem.getFileBlockLocations(file, start, len);
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.Preconditions;
Expand All @@ -30,6 +29,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* A bucket is the directory organization of the output of the {@link StreamingFileSink}.
Expand All @@ -39,25 +39,25 @@
* queried to see in which bucket this element should be written to.
*/
@PublicEvolving
public class Bucket<IN> {
public class Bucket<IN, BucketID> {

private static final String PART_PREFIX = "part";

private final String bucketId;
private final BucketID bucketId;

private final Path bucketPath;

private final int subtaskIndex;

private final Encoder<IN> encoder;
private final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory;

private final RecoverableWriter fsWriter;

private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint = new HashMap<>();

private long partCounter;

private PartFileHandler<IN> currentPart;
private PartFileWriter<IN, BucketID> currentPart;

private List<RecoverableWriter.CommitRecoverable> pending;

Expand All @@ -68,26 +68,26 @@ public Bucket(
RecoverableWriter fsWriter,
int subtaskIndex,
long initialPartCounter,
Encoder<IN> writer,
BucketState bucketstate) throws IOException {
PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
BucketState<BucketID> bucketState) throws IOException {

this(fsWriter, subtaskIndex, bucketstate.getBucketId(), bucketstate.getBucketPath(), initialPartCounter, writer);
this(fsWriter, subtaskIndex, bucketState.getBucketId(), bucketState.getBucketPath(), initialPartCounter, partFileFactory);

// the constructor must have already initialized the filesystem writer
Preconditions.checkState(fsWriter != null);

// we try to resume the previous in-progress file, if the filesystem
// supports such operation. If not, we just commit the file and start fresh.

final RecoverableWriter.ResumeRecoverable resumable = bucketstate.getCurrentInProgress();
final RecoverableWriter.ResumeRecoverable resumable = bucketState.getInProgress();
if (resumable != null) {
currentPart = PartFileHandler.resumeFrom(
bucketId, fsWriter, resumable, bucketstate.getCreationTime());
currentPart = partFileFactory.resumeFrom(
bucketId, fsWriter, resumable, bucketState.getCreationTime());
}

// we commit pending files for previous checkpoints to the last successful one
// (from which we are recovering from)
for (List<RecoverableWriter.CommitRecoverable> commitables: bucketstate.getPendingPerCheckpoint().values()) {
for (List<RecoverableWriter.CommitRecoverable> commitables: bucketState.getPendingPerCheckpoint().values()) {
for (RecoverableWriter.CommitRecoverable commitable: commitables) {
fsWriter.recoverForCommit(commitable).commitAfterRecovery();
}
Expand All @@ -100,26 +100,26 @@ public Bucket(
public Bucket(
RecoverableWriter fsWriter,
int subtaskIndex,
String bucketId,
BucketID bucketId,
Path bucketPath,
long initialPartCounter,
Encoder<IN> writer) {
PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory) {

this.fsWriter = Preconditions.checkNotNull(fsWriter);
this.subtaskIndex = subtaskIndex;
this.bucketId = Preconditions.checkNotNull(bucketId);
this.bucketPath = Preconditions.checkNotNull(bucketPath);
this.partCounter = initialPartCounter;
this.encoder = Preconditions.checkNotNull(writer);
this.partFileFactory = Preconditions.checkNotNull(partFileFactory);

this.pending = new ArrayList<>();
}

public PartFileInfo getInProgressPartInfo() {
public PartFileInfo<BucketID> getInProgressPartInfo() {
return currentPart;
}

public String getBucketId() {
public BucketID getBucketId() {
return bucketId;
}

Expand All @@ -137,18 +137,18 @@ public boolean isActive() {

void write(IN element, long currentTime) throws IOException {
Preconditions.checkState(currentPart != null, "bucket has been closed");
currentPart.write(element, encoder, currentTime);
currentPart.write(element, currentTime);
}

void rollPartFile(final long currentTime) throws IOException {
closePartFile();
currentPart = PartFileHandler.openNew(bucketId, fsWriter, getNewPartPath(), currentTime);
currentPart = partFileFactory.openNew(bucketId, fsWriter, getNewPartPath(), currentTime);
partCounter++;
}

void merge(final Bucket<IN> bucket) throws IOException {
void merge(final Bucket<IN, BucketID> bucket) throws IOException {
Preconditions.checkNotNull(bucket);
Preconditions.checkState(bucket.getBucketPath().equals(getBucketPath()));
Preconditions.checkState(Objects.equals(bucket.getBucketPath(), bucketPath));

// there should be no pending files in the "to-merge" states.
Preconditions.checkState(bucket.pending.isEmpty());
Expand Down Expand Up @@ -176,7 +176,7 @@ public void dispose() {
}
}

public void commitUpToCheckpoint(long checkpointId) throws IOException {
public void onCheckpointAcknowledgment(long checkpointId) throws IOException {
Preconditions.checkNotNull(fsWriter);

Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it =
Expand All @@ -193,7 +193,7 @@ public void commitUpToCheckpoint(long checkpointId) throws IOException {
}
}

public BucketState snapshot(long checkpointId) throws IOException {
public BucketState<BucketID> onCheckpoint(long checkpointId) throws IOException {
RecoverableWriter.ResumeRecoverable resumable = null;
long creationTime = Long.MAX_VALUE;

Expand All @@ -206,7 +206,7 @@ public BucketState snapshot(long checkpointId) throws IOException {
pendingPerCheckpoint.put(checkpointId, pending);
pending = new ArrayList<>();
}
return new BucketState(bucketId, bucketPath, creationTime, resumable, pendingPerCheckpoint);
return new BucketState<>(bucketId, bucketPath, creationTime, resumable, pendingPerCheckpoint);
}

private Path getNewPartPath() {
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;

Expand All @@ -30,20 +29,20 @@
* A factory able to create {@link Bucket buckets} for the {@link StreamingFileSink}.
*/
@Internal
public interface BucketFactory<IN> extends Serializable {
interface BucketFactory<IN, BucketID> extends Serializable {

Bucket<IN> getNewBucket(
RecoverableWriter fsWriter,
int subtaskIndex,
String bucketId,
Path bucketPath,
long initialPartCounter,
Encoder<IN> writer) throws IOException;
Bucket<IN, BucketID> getNewBucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory) throws IOException;

Bucket<IN> restoreBucket(
RecoverableWriter fsWriter,
int subtaskIndex,
long initialPartCounter,
Encoder<IN> writer,
BucketState bucketstate) throws IOException;
Bucket<IN, BucketID> restoreBucket(
final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final BucketState<BucketID> bucketState) throws IOException;
}
Expand Up @@ -32,9 +32,9 @@
* The state of the {@link Bucket} that is to be checkpointed.
*/
@Internal
public class BucketState {
public class BucketState<BucketID> {

private final String bucketId;
private final BucketID bucketId;

/**
* The base path for the bucket, i.e. the directory where all the part files are stored.
Expand All @@ -59,10 +59,10 @@ public class BucketState {
private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint;

public BucketState(
final String bucketId,
final BucketID bucketId,
final Path bucketPath,
final long creationTime,
final @Nullable RecoverableWriter.ResumeRecoverable inProgress,
@Nullable final RecoverableWriter.ResumeRecoverable inProgress,
final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint
) {
this.bucketId = Preconditions.checkNotNull(bucketId);
Expand All @@ -72,7 +72,7 @@ public BucketState(
this.pendingPerCheckpoint = Preconditions.checkNotNull(pendingPerCheckpoint);
}

public String getBucketId() {
public BucketID getBucketId() {
return bucketId;
}

Expand All @@ -85,7 +85,7 @@ public long getCreationTime() {
}

@Nullable
public RecoverableWriter.ResumeRecoverable getCurrentInProgress() {
public RecoverableWriter.ResumeRecoverable getInProgress() {
return inProgress;
}

Expand Down

0 comments on commit b56c75c

Please sign in to comment.