Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 114 additions & 22 deletions hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,21 @@
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
import org.apache.hudi.sink.event.InitWriterEvent;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
Expand Down Expand Up @@ -145,6 +152,12 @@ public class StreamWriteFunction<K, I, O>
*/
private transient TotalSizeTracer tracer;

private transient ListState<WriteStatus> writeStatusState;

private final List<WriteStatus> writeStatusOfCurrentCkpt = new ArrayList<>();

private transient boolean canFlush = true;

/**
* Constructs a StreamingSinkFunction.
*
Expand All @@ -156,31 +169,63 @@ public StreamWriteFunction(Configuration config) {

@Override
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.actionType = CommitUtils.getCommitActionType(
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
this.tracer = new TotalSizeTracer(this.config);
initBuffer();
initWriteFunction();
}

@Override
public void initializeState(FunctionInitializationContext context) {
// no operation
public void initializeState(FunctionInitializationContext context) throws Exception {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.actionType = CommitUtils.getCommitActionType(
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));

this.writeStatusState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(
"hudi-write-status-state",
TypeInformation.of(new TypeHint<WriteStatus>() {})
));

final List<WriteStatus> writeStatuses = new ArrayList<>();

if (context.isRestored()) {
for (WriteStatus writeStatus : this.writeStatusState.get()) {
writeStatuses.add(writeStatus);
}

// disable flush, until received event from coordinator
this.canFlush = false;
}

final InitWriterEvent event = InitWriterEvent.builder()
.taskID(taskID)
.writeStatus(writeStatuses)
.build();
this.eventGateway.sendEventToCoordinator(event);

LOG.info("Send init event to coordinator, task[{}].", taskID);
}

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
// Based on the fact that the coordinator starts the checkpoint first,
// it would check the validity.
// wait for the buffer data flush out and request a new instant
flushRemaining(false);

// reset the snapshot state to the current state
this.writeStatusState.clear();
this.writeStatusState.addAll(writeStatusOfCurrentCkpt);
writeStatusOfCurrentCkpt.clear();

// disable flush, until received event from coordinator
this.canFlush = false;
}

@Override
public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) {
public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) throws Exception {
bufferRecord((HoodieRecord<?>) value);
}

Expand All @@ -203,6 +248,7 @@ public void notifyCheckpointComplete(long checkpointId) {
public void endInput() {
flushRemaining(true);
this.writeClient.cleanHandles();
writeStatusOfCurrentCkpt.clear();
}

// -------------------------------------------------------------------------
Expand Down Expand Up @@ -257,6 +303,11 @@ private void initWriteFunction() {
}
}

public void handleOperatorEvent(OperatorEvent evt) {
LOG.info("Received commit success from coordinator.");
this.canFlush = true;
}

/**
* Represents a data item in the buffer, this is needed to reduce the
* memory footprint.
Expand Down Expand Up @@ -420,6 +471,23 @@ private String getBucketID(HoodieRecord<?> record) {
return StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
}

private void waitForNewInstant() throws InterruptedException {
while (!canFlush) {
final String instant = this.writeClient.getLastPendingInstant(this.actionType);
if (instant != null) {
if (this.currentInstant == null || instant.compareTo(this.currentInstant) > 0) {
this.currentInstant = instant;
this.canFlush = true;
break;
}
}

LOG.info("Retry to get new instant, lastInstant is {}, now instant is {}.", this.currentInstant, instant);
// Reduce the frequency of access to the FileSystem
Thread.sleep(100);
}
}

/**
* Buffers the given record.
*
Expand All @@ -431,39 +499,57 @@ private String getBucketID(HoodieRecord<?> record) {
*
* @param value HoodieRecord
*/
private void bufferRecord(HoodieRecord<?> value) {
private void bufferRecord(HoodieRecord<?> value) throws Exception {
final String bucketID = getBucketID(value);

DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
final DataItem item = DataItem.fromHoodieRecord(value);
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);

// if current memorySize larger than maxBufferSize and no response message received from coordinator,
// should blocking until the lastPendingInstant update to avoid oom.
while (flushBuffer && !canFlush) {
LOG.warn("Trigger the flushBuffer but no response message received from coordinator, blocking until the lastPendingInstant update.");
waitForNewInstant();
}

if (flushBucket) {
flushBucket(bucket);
this.tracer.countDown(bucket.detector.totalSize);
bucket.reset();
boolean flush = flushBucket(bucket);
if (flush) {
this.tracer.countDown(bucket.detector.totalSize);
bucket.reset();
}
} else if (flushBuffer) {
// find the max size bucket and flush it out
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
final DataBucket bucketToFlush = sortedBuckets.get(0);
flushBucket(bucketToFlush);
this.tracer.countDown(bucketToFlush.detector.totalSize);
bucketToFlush.reset();
boolean flush = flushBucket(bucketToFlush);
if (flush) {
this.tracer.countDown(bucketToFlush.detector.totalSize);
bucketToFlush.reset();
}
}
bucket.records.add(item);
}

@SuppressWarnings("unchecked, rawtypes")
private void flushBucket(DataBucket bucket) {
private boolean flushBucket(DataBucket bucket) {
if (!canFlush) {
// in case there are flush before last instant commit
LOG.info("Last commit has not committed, skip.");
return false;
}

final String instant = this.writeClient.getLastPendingInstant(this.actionType);

if (instant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, cancel.");
return;
LOG.info("No inflight instant when flushing data, skip.");
return false;
}

List<HoodieRecord> records = bucket.writeBuffer();
Expand All @@ -480,17 +566,21 @@ private void flushBucket(DataBucket bucket) {
.isLastBatch(false)
.isEndInput(false)
.build();

writeStatusOfCurrentCkpt.addAll(writeStatus);
this.eventGateway.sendEventToCoordinator(event);

return true;
}

@SuppressWarnings("unchecked, rawtypes")
private void flushRemaining(boolean isEndInput) {
this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);

if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, cancel.");
return;
throw new HoodieException("No inflight instant when flushRemaining!");
}

final List<WriteStatus> writeStatus;
if (buckets.size() > 0) {
writeStatus = new ArrayList<>();
Expand Down Expand Up @@ -519,6 +609,8 @@ private void flushRemaining(boolean isEndInput) {
.isLastBatch(true)
.isEndInput(isEndInput)
.build();

writeStatusOfCurrentCkpt.addAll(writeStatus);
this.eventGateway.sendEventToCoordinator(event);
this.buckets.clear();
this.tracer.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public StreamWriteOperator(Configuration conf) {

@Override
public void handleOperatorEvent(OperatorEvent operatorEvent) {
// do nothing
sinkFunction.handleOperatorEvent(operatorEvent);
}

void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
Expand Down
Loading