From c45de0b53b45e4e6866e7359bb0d207c75207fae Mon Sep 17 00:00:00 2001 From: fishfishfishfishaa <792850842@qq.com> Date: Tue, 26 May 2026 00:48:19 +0800 Subject: [PATCH 1/5] [PIP-30] Improve Paimon committer in Flink --- .../paimon/flink/FlinkConnectorOptions.java | 14 + .../sink/CommitterCoordinatedFactory.java | 96 ++++ .../paimon/flink/sink/FixedBucketSink.java | 32 ++ .../apache/paimon/flink/sink/FlinkSink.java | 22 +- .../flink/sink/PrepareCommitOperator.java | 10 +- .../flink/sink/RowDataStoreWriteOperator.java | 22 + .../paimon/flink/sink/TableWriteOperator.java | 6 + .../coordinator/CommitterCoordinator.java | 256 +++++++++++ .../CoordinatedFileInfoSender.java | 270 +++++++++++ .../flink/sink/coordinator/EndInputEvent.java | 26 ++ .../coordinator/FileInfoConfirmRequest.java | 35 ++ .../coordinator/FileInfoConfirmResponse.java | 36 ++ .../flink/sink/coordinator/FileInfoEvent.java | 50 +++ .../coordinator/PaimonWriterCoordinator.java | 418 ++++++++++++++++++ .../sink/coordinator/PendingSubtask.java | 153 +++++++ .../flink/sink/coordinator/PwcState.java | 30 ++ .../sink/coordinator/PwcStateManager.java | 169 +++++++ .../sink/coordinator/PwcStateSerializer.java | 72 +++ .../sink/coordinator/WatermarkEvent.java | 34 ++ .../PaimonWriterCoordinatorTest.java | 361 +++++++++++++++ 20 files changed, 2107 insertions(+), 5 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterCoordinatedFactory.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/CommitterCoordinator.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/CoordinatedFileInfoSender.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/EndInputEvent.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/FileInfoConfirmRequest.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/FileInfoConfirmResponse.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/FileInfoEvent.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PaimonWriterCoordinator.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PendingSubtask.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PwcState.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PwcStateManager.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PwcStateSerializer.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WatermarkEvent.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/PaimonWriterCoordinatorTest.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index a1089d991ae2..41ef73ceed7c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -506,6 +506,20 @@ public class FlinkConnectorOptions { "Commit listener will be called after a successful commit. This option list custom commit " + "listener identifiers separated by comma."); + public static final ConfigOption SINK_COMMITTER_COORDINATOR_OPERATOR_ENABLED = + key("sink.committer-coordinator-operator.enabled") + .booleanType() + .defaultValue(false) + .withDescription("Enable committer coordinator to commit in Job Manager."); + + public static final ConfigOption SINK_COMMITTER_COORDINATOR_STATE_DIR = + key("sink.committer-coordinator-operator.state-dir") + .stringType() + .noDefaultValue() + .withDescription( + "Optional override for PWC state directory. " + + "If not set, uses Flink checkpoint directory."); + public static final ConfigOption SINK_WRITER_COORDINATOR_ENABLED = key("sink.writer-coordinator.enabled") .booleanType() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterCoordinatedFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterCoordinatedFactory.java new file mode 100644 index 000000000000..fde829f24b76 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterCoordinatedFactory.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.flink.sink.coordinator.CoordinatedFileInfoSender; +import org.apache.paimon.flink.sink.coordinator.PaimonWriterCoordinator; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; + +/** + * {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link + * CommitterCoordinatedFactory}. + */ +public class CommitterCoordinatedFactory + extends PrepareCommitOperator.Factory + implements CoordinatedOperatorFactory { + protected final boolean streamingCheckpointEnabled; + private final String initialCommitUser; + private final String configuredStateDir; + + RowDataStoreWriteOperator.Factory writeFactory; + private Committer.Factory committerFactory; + + protected final Long endInputWatermark; + + public CommitterCoordinatedFactory( + boolean streamingCheckpointEnabled, + String configuredStateDir, + RowDataStoreWriteOperator.Factory writeFactory, + Committer.Factory committerFactory, + String initialCommitUser, + Long endInputWatermark) { + super(writeFactory.options); + this.streamingCheckpointEnabled = streamingCheckpointEnabled; + this.configuredStateDir = configuredStateDir; + this.writeFactory = writeFactory; + this.committerFactory = committerFactory; + this.initialCommitUser = initialCommitUser; + this.endInputWatermark = endInputWatermark; + } + + @Override + public final > T createStreamOperator( + StreamOperatorParameters parameters) { + OperatorID operatorID = parameters.getStreamConfig().getOperatorID(); + TaskOperatorEventGateway gateway = + parameters + .getContainingTask() + .getEnvironment() + .getOperatorCoordinatorEventGateway(); + TableWriteOperator operator = writeFactory.createStreamOperator(parameters); + operator.setFileInfoSender( + new CoordinatedFileInfoSender( + gateway, operatorID, this.streamingCheckpointEnabled)); + return (T) operator; + } + + @Override + public Class getStreamOperatorClass(ClassLoader classLoader) { + return writeFactory.getStreamOperatorClass(classLoader); + } + + @Override + public OperatorCoordinator.Provider getCoordinatorProvider( + String operatorName, OperatorID operatorID) { + return new PaimonWriterCoordinator.WriterCoordinatorProvider( + streamingCheckpointEnabled, + operatorName, + configuredStateDir, + operatorID, + initialCommitUser, + committerFactory, + endInputWatermark); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java index c43c2ebb1ef5..cbd6236b3b1b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java @@ -28,6 +28,9 @@ import java.util.Map; +import static org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK; +import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_COORDINATOR_OPERATOR_ENABLED; +import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_COORDINATOR_STATE_DIR; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_WRITER_COORDINATOR_ENABLED; /** {@link FlinkSink} for writing records into fixed bucket Paimon table. */ @@ -56,4 +59,33 @@ protected OneInputStreamOperatorFactory createWriteOpe : new RowDataStoreWriteOperator.Factory( table, logSinkFunction, writeProvider, commitUser); } + + @Override + protected OneInputStreamOperatorFactory createWriteCoordinatorFactory( + StoreSinkWrite.Provider writeProvider, + String commitUser, + boolean isStreaming, + String checkpointDir) { + Options options = table.coreOptions().toConfiguration(); + boolean commitCoordinatorEnable = options.get(SINK_COMMITTER_COORDINATOR_OPERATOR_ENABLED); + boolean coordinatorEnabled = options.get(SINK_WRITER_COORDINATOR_ENABLED); + String configuredStateDir = + options.getString( + SINK_COMMITTER_COORDINATOR_STATE_DIR.key(), checkpointDir + "/pwc"); + if (commitCoordinatorEnable) { + if (coordinatorEnabled) { + throw new UnsupportedOperationException( + "Unsupported for both writer coordinator and commit coordinator"); + } + return new CommitterCoordinatedFactory( + isStreaming, + configuredStateDir, + new RowDataStoreWriteOperator.Factory( + table, logSinkFunction, writeProvider, commitUser), + createCommitterFactory(), + commitUser, + options.get(END_INPUT_WATERMARK)); + } + return createWriteOperatorFactory(writeProvider, commitUser); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 959132ad58e0..009d1bdec4d9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -32,6 +32,7 @@ import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.CheckpointingMode; @@ -55,6 +56,7 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK; import static org.apache.paimon.flink.FlinkConnectorOptions.PRECOMMIT_COMPACT; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT; +import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_COORDINATOR_OPERATOR_ENABLED; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_CPU; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING; @@ -132,14 +134,17 @@ public DataStream doWrite( input.transform( (writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME) + " : " + table.name(), new CommittableTypeInfo(), - createWriteOperatorFactory( + createWriteCoordinatorFactory( StoreSinkWrite.createWriteProvider( table, env.getCheckpointConfig(), isStreaming, ignorePreviousFiles, hasSinkMaterializer(input)), - commitUser)); + commitUser, + isStreaming && env.getCheckpointConfig().isCheckpointingEnabled(), + env.getConfiguration() + .get(CheckpointingOptions.CHECKPOINTS_DIRECTORY))); if (parallelism == null) { forwardParallelism(written, input); } else { @@ -195,8 +200,11 @@ public DataStreamSink doCommit(DataStream written, String commit if (streamingCheckpointEnabled) { assertStreamingConfiguration(env); } - Options options = Options.fromMap(table.options()); + if (options.get(SINK_COMMITTER_COORDINATOR_OPERATOR_ENABLED)) { + return written.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1); + } + OneInputStreamOperatorFactory committerOperator = createCommitterOperatorFactory( streamingCheckpointEnabled, commitUser, options.get(END_INPUT_WATERMARK)); @@ -307,6 +315,14 @@ public static void assertBatchAdaptiveParallelism( protected abstract OneInputStreamOperatorFactory createWriteOperatorFactory( StoreSinkWrite.Provider writeProvider, String commitUser); + protected OneInputStreamOperatorFactory createWriteCoordinatorFactory( + StoreSinkWrite.Provider writeProvider, + String commitUser, + boolean isStreaming, + String checkpointDir) { + return createWriteOperatorFactory(writeProvider, commitUser); + } + protected abstract Committer.Factory createCommitterFactory(); protected abstract CommittableStateManager createCommittableStateManager(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java index 35f5ff15b9ae..874b0850a9d2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PrepareCommitOperator.java @@ -112,8 +112,14 @@ public void close() throws Exception { } private void emitCommittables(boolean waitCompaction, long checkpointId) throws IOException { - prepareCommit(waitCompaction, checkpointId) - .forEach(committable -> output.collect(new StreamRecord<>(committable))); + prepareCommit(waitCompaction, checkpointId).forEach(this::collect); + handleCommittables(checkpointId); + } + + protected void handleCommittables(long checkpointId) {} + + protected void collect(OUT committable) { + output.collect(new StreamRecord<>(committable)); } protected abstract List prepareCommit(boolean waitCompaction, long checkpointId) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java index 4a9b64f7b1b2..3f343bbc8d9b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java @@ -127,6 +127,9 @@ public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); this.currentWatermark = mark.getTimestamp(); + if (sender != null) { + sender.sendWatermark(mark.getTimestamp()); + } if (logSinkFunction != null) { logSinkFunction.writeWatermark( new org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp())); @@ -161,6 +164,9 @@ protected SinkRecord write(InternalRow row) throws Exception { public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); + if (sender != null) { + sender.snapshot(context.getCheckpointId()); + } if (logSinkFunction != null) { StreamingFunctionUtils.snapshotFunctionState( context, getOperatorStateBackend(), logSinkFunction); @@ -203,6 +209,22 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { } } + @Override + protected void handleCommittables(long chechpointId) { + if (sender != null) { + sender.sendToCoordinator(chechpointId); + } + } + + @Override + protected void collect(Committable committable) { + if (sender != null) { + sender.collect(committable); + } else { + super.collect(committable); + } + } + @Override protected List prepareCommit(boolean waitCompaction, long checkpointId) throws IOException { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java index 70a6af8a25dc..a6df96da0cbb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java @@ -20,6 +20,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.flink.sink.StoreSinkWriteState.StateValueFilter; +import org.apache.paimon.flink.sink.coordinator.CoordinatedFileInfoSender; import org.apache.paimon.flink.sink.coordinator.CoordinatedWriteRestore; import org.apache.paimon.flink.sink.coordinator.WriteOperatorCoordinator; import org.apache.paimon.flink.utils.RuntimeContextUtils; @@ -49,6 +50,7 @@ public abstract class TableWriteOperator extends PrepareCommitOperator { + public static final long END_INPUT_CHECKPOINT_ID = Long.MAX_VALUE; + + private transient boolean endInput; + + private final boolean streamingCheckpointEnabled; + private final String initialCommitUser; + + private final Long endInputWatermark; + protected final NavigableMap committablesPerCheckpoint; + protected final Committer.Factory committerFactory; + + protected Committer committer; + + private PwcStateManager stateManager; + + SerializableSupplier> committableSerializer; + + private transient long globalWatermark; + + public CommitterCoordinator( + boolean streamingCheckpointEnabled, + String initialCommitUser, + Committer.Factory committerFactory, + SerializableSupplier> committableSerializer, + Long endInputWatermark) { + this.streamingCheckpointEnabled = streamingCheckpointEnabled; + this.committablesPerCheckpoint = new TreeMap<>(); + this.committerFactory = checkNotNull(committerFactory); + this.initialCommitUser = initialCommitUser; + this.committableSerializer = committableSerializer; + this.globalWatermark = Long.MIN_VALUE; + this.endInputWatermark = endInputWatermark; + } + + public void init(int parallelism) throws Exception { + this.globalWatermark = Long.MIN_VALUE; + this.endInput = false; + if (committer == null) { + committer = + committerFactory.create( + Committer.createContext( + this.initialCommitUser, + null, + true, + false, + null, + parallelism, + 0)); + } + } + + public void initStateManager(OperatorID operatorId, String baseDir) throws IOException { + this.stateManager = + new PwcStateManager( + baseDir, + operatorId, + new PwcStateSerializer(this.committableSerializer.get()), + 3); + } + + public void restore(long restoredCheckpointId) throws IOException { + List ckIds = stateManager.listCheckpointIds(); + + List commitsToFilter = new ArrayList<>(); + + for (Long ckId : ckIds) { + if (ckId > restoredCheckpointId) { + stateManager.deleteState(ckId); + continue; + } + + PwcState state = stateManager.readState(ckId); + if (state != null) { + commitsToFilter.add(state.globalCommit); + } + } + + if (!commitsToFilter.isEmpty()) { + committer.filterAndCommit(commitsToFilter, true, false); + } + + // 清理 HDFS + for (Long ckId : ckIds) { + if (ckId <= restoredCheckpointId) { + stateManager.deleteState(ckId); + } + } + } + + public void processWaterMark(long watermark) { + if (watermark != Long.MAX_VALUE) { + this.globalWatermark = Math.max(globalWatermark, watermark); + } + } + + private void pollInputs(Collection e) throws Exception { + Map> grouped = committer.groupByCheckpoint(e); + + for (Map.Entry> entry : grouped.entrySet()) { + Long cp = entry.getKey(); + List committables = entry.getValue(); + // To prevent the asynchronous completion of tasks with multiple concurrent bounded + // stream inputs, which leads to some tasks passing a Committable with cp = + // END_INPUT_CHECKPOINT_ID during the endInput method call of the current checkpoint, + // while other tasks pass a Committable with END_INPUT_CHECKPOINT_ID during other + // checkpoints hence causing an error here, we have a special handling for Committables + // with END_INPUT_CHECKPOINT_ID: instead of throwing an error, we merge them. + if (cp != null + && cp == END_INPUT_CHECKPOINT_ID + && committablesPerCheckpoint.containsKey(cp)) { + // Merge the END_INPUT_CHECKPOINT_ID committables here. + GlobalCommitT commitT = + committer.combine( + cp, + globalWatermark, + committablesPerCheckpoint.get(cp), + committables); + committablesPerCheckpoint.put(cp, commitT); + } else if (committablesPerCheckpoint.containsKey(cp)) { + throw new RuntimeException( + String.format( + "Repeatedly commit the same checkpoint files. \n" + + "The previous files is %s, \n" + + "and the subsequent files is %s", + committablesPerCheckpoint.get(cp), committables)); + } else { + committablesPerCheckpoint.put(cp, toCommittables(cp, committables)); + } + } + } + + public boolean save(List committables, long checkpointId) throws Exception { + pollInputs(committables); + if (checkpointId == END_INPUT_CHECKPOINT_ID) { + endInput(); + } + persistCheckpointState(checkpointId); + return true; + } + + private void persistCheckpointState(long checkpointId) throws IOException { + PwcState state = + new PwcState( + checkpointId, + checkpointId == END_INPUT_CHECKPOINT_ID + ? this.committablesPerCheckpoint.lastEntry().getValue() + : this.committablesPerCheckpoint.get(checkpointId)); + stateManager.writeState(state); + } + + public boolean isEndInput() { + return endInput && streamingCheckpointEnabled; + } + + private void commitUpToCheckpoint(long checkpointId) throws Exception { + NavigableMap headMap = + committablesPerCheckpoint.headMap(checkpointId, true); + List committables = committables(headMap); + if (committables.isEmpty() && committer.forceCreatingSnapshot()) { + committables = + Collections.singletonList( + toCommittables(checkpointId, Collections.emptyList())); + } + + if (checkpointId == END_INPUT_CHECKPOINT_ID) { + // In new versions of Flink, if a batch job fails, it might restart from some operator + // in the middle. + // If the job is restarted from the commit operator, endInput will be called again, and + // the same commit messages will be committed again. + // So when `endInput` is called, we must check if the corresponding snapshot exists. + // However, if the snapshot does not exist, then append files must be new files. So + // there is no need to check for duplicated append files. + committer.filterAndCommit(committables, false, true); + } else { + committer.commit(committables); + } + headMap.clear(); + } + + private GlobalCommitT toCommittables(long checkpoint, List inputs) throws Exception { + return committer.combine(checkpoint, globalWatermark, inputs); + } + + private List committables(NavigableMap map) { + return new ArrayList<>(map.values()); + } + + public void endInput() throws Exception { + endInput = true; + if (endInputWatermark != null) { + globalWatermark = endInputWatermark; + } + + if (streamingCheckpointEnabled) { + return; // streaming means still have snapshot and commit! + } + + commitUpToCheckpoint(END_INPUT_CHECKPOINT_ID); + } + + public void close() throws Exception { + committablesPerCheckpoint.clear(); + if (committer != null) { + committer.close(); + } + } + + public void notifyCheckpointComplete(long checkpointId) throws Exception { + commitUpToCheckpoint(endInput ? END_INPUT_CHECKPOINT_ID : checkpointId); + stateManager.deleteState(checkpointId); + } + + public void notifyCheckpointAborted(long checkpointId) { + committablesPerCheckpoint.remove(checkpointId); + stateManager.deleteState(checkpointId); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/CoordinatedFileInfoSender.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/CoordinatedFileInfoSender.java new file mode 100644 index 000000000000..5efa4fc11ad0 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/CoordinatedFileInfoSender.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.coordinator; + +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.sink.CommittableSerializer; +import org.apache.paimon.table.sink.CommitMessageSerializer; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.apache.paimon.flink.sink.coordinator.CommitterCoordinator.END_INPUT_CHECKPOINT_ID; + +/** Use direct byte buffer save serialized committables. */ +public class CoordinatedFileInfoSender { + private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; + private static final int LENGTH_FIELD_SIZE = 4; + private static final int COUNT_FIELD_SIZE = 4; + + protected final boolean streamingCheckpointEnabled; + private final List buffers; + private ByteBuffer currentBuffer; + private transient boolean endInput; + + private final CommittableSerializer serializer; + private final TaskOperatorEventGateway gateway; + private final OperatorID operatorID; + private int totalCommittables = 0; + + public CoordinatedFileInfoSender( + TaskOperatorEventGateway gateway, + OperatorID operatorID, + int bufferSize, + boolean streamingCheckpointEnabled) { + this.gateway = gateway; + this.operatorID = operatorID; + this.streamingCheckpointEnabled = streamingCheckpointEnabled; + this.serializer = new CommittableSerializer(new CommitMessageSerializer()); + this.buffers = new ArrayList<>(); + + allocateNewBuffer(bufferSize > 0 ? bufferSize : DEFAULT_BUFFER_SIZE); + } + + public CoordinatedFileInfoSender( + TaskOperatorEventGateway gateway, + OperatorID operatorID, + boolean streamingCheckpointEnabled) { + this(gateway, operatorID, DEFAULT_BUFFER_SIZE, streamingCheckpointEnabled); + } + + private void allocateNewBuffer(int capacity) { + this.currentBuffer = ByteBuffer.allocateDirect(capacity); + this.buffers.add(this.currentBuffer); + } + + /* + * serialize committable to Direct ByteBuffer + */ + public void collect(Committable committable) { + Preconditions.checkNotNull(committable, "Committable cannot be null"); + + try { + byte[] serialized = serializer.serialize(committable); + int dataLength = serialized.length; + + int requiredSpace = LENGTH_FIELD_SIZE + dataLength; + + if (currentBuffer.remaining() < requiredSpace) { + currentBuffer.flip(); + allocateNewBuffer(Math.max(DEFAULT_BUFFER_SIZE, requiredSpace)); + } + + currentBuffer.putInt(dataLength); + currentBuffer.put(serialized); + + totalCommittables++; + + } catch (IOException e) { + throw new RuntimeException("Failed to serialize committable to off-heap buffer", e); + } + } + + public void sendToCoordinator(long checkpointId) { + if (checkpointId == END_INPUT_CHECKPOINT_ID) { + this.endInput = true; + } + sendFileInfo(checkpointId); + } + + private void sendFileInfo(long checkpointId) { + Preconditions.checkState(!buffers.isEmpty(), "No buffers to send"); + try { + if (currentBuffer.position() > 0) { + currentBuffer.flip(); + } + + byte[] finalData = null; + if (totalCommittables != 0) { + finalData = assembleFinalData(); + } + FileInfoEvent event = new FileInfoEvent(finalData, checkpointId); + + SerializedValue serializedRequest = new SerializedValue<>(event); + gateway.sendOperatorEventToCoordinator(operatorID, serializedRequest); + } catch (IOException e) { + throw new RuntimeException( + "Failed to send FileInfo for checkpoint " + + checkpointId + + ": " + + e.getMessage(), + e); + } finally { + clear(); + } + } + + public void snapshot(long checkpointId) { + try { + FileInfoConfirmRequest request = + new FileInfoConfirmRequest(endInput ? END_INPUT_CHECKPOINT_ID : checkpointId); + SerializedValue serializedRequest = new SerializedValue<>(request); + gateway.sendRequestToCoordinator(operatorID, serializedRequest).get(); + } catch (IOException | ExecutionException | InterruptedException e) { + throw new RuntimeException( + "Failed to send snapshot for checkpoint " + + checkpointId + + ": " + + e.getMessage(), + e); + } finally { + clear(); + } + } + + private byte[] assembleFinalData() { + int actualCount = 0; + int actualTotalBytes = 0; + + List readBuffers = new ArrayList<>(); + + for (ByteBuffer buffer : buffers) { + if (buffer.position() > 0 && !buffer.isReadOnly()) { + buffer.flip(); + } + + ByteBuffer tempBuffer = buffer.duplicate(); + + while (tempBuffer.hasRemaining()) { + if (tempBuffer.remaining() < 4) { + break; + } + int length = tempBuffer.getInt(); + if (length < 0 || length > tempBuffer.remaining()) { + break; + } + + tempBuffer.position(tempBuffer.position() + length); + + actualCount++; + actualTotalBytes += length; + } + + readBuffers.add(buffer); + } + + int totalSize = COUNT_FIELD_SIZE + actualTotalBytes + (actualCount * LENGTH_FIELD_SIZE); + byte[] result = new byte[totalSize]; + ByteBuffer resultBuffer = ByteBuffer.wrap(result); + + resultBuffer.putInt(actualCount); + + for (ByteBuffer buffer : readBuffers) { + buffer.rewind(); + + while (buffer.hasRemaining()) { + if (buffer.remaining() < 4) { + break; + } + int length = buffer.getInt(); + if (length < 0 || length > buffer.remaining()) { + break; + } + + byte[] data = new byte[length]; + buffer.get(data); + + resultBuffer.putInt(length); + resultBuffer.put(data); + } + } + totalCommittables = 0; + + return result; + } + + private void clear() { + buffers.clear(); + totalCommittables = 0; + allocateNewBuffer(DEFAULT_BUFFER_SIZE); + } + + public static List deserializeCommittables(byte[] data) throws IOException { + if (data == null) { + return new ArrayList<>(); + } + ByteBuffer buffer = ByteBuffer.wrap(data); + + if (buffer.remaining() < 4) { + throw new IOException("Invalid data: too short"); + } + + int count = buffer.getInt(); + List result = new ArrayList<>(count); + + CommittableSerializer serializer = new CommittableSerializer(new CommitMessageSerializer()); + int version = serializer.getVersion(); + + for (int i = 0; i < count; i++) { + if (buffer.remaining() < 4) { + throw new IOException("Invalid data: missing length field"); + } + int length = buffer.getInt(); + if (length < 0 || length > buffer.remaining()) { + throw new IOException("Invalid data: length field corrupted"); + } + byte[] committableBytes = new byte[length]; + buffer.get(committableBytes); + + Committable committable = serializer.deserialize(version, committableBytes); + result.add(committable); + } + + return result; + } + + public void sendWatermark(long watermark) { + try { + gateway.sendOperatorEventToCoordinator( + operatorID, new SerializedValue<>(new WatermarkEvent(watermark))); + } catch (IOException e) { + throw new RuntimeException("Fail to send watermark " + watermark, e); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/EndInputEvent.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/EndInputEvent.java new file mode 100644 index 000000000000..17a078792c6d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/EndInputEvent.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.coordinator; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; + +/** EndInput event to coordinator. */ +public class EndInputEvent implements OperatorEvent { + private static final long serialVersionUID = 1L; +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/FileInfoConfirmRequest.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/FileInfoConfirmRequest.java new file mode 100644 index 000000000000..880118ed3173 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/FileInfoConfirmRequest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.coordinator; + +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; + +/** Confirm whether coordinator done for state save. */ +public class FileInfoConfirmRequest implements CoordinationRequest { + private static final long serialVersionUID = 1L; + private final long checkpointId; + + public FileInfoConfirmRequest(long checkpoint) { + this.checkpointId = checkpoint; + } + + public long getCheckpointId() { + return checkpointId; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/FileInfoConfirmResponse.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/FileInfoConfirmResponse.java new file mode 100644 index 000000000000..c1cbfb452626 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/FileInfoConfirmResponse.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.coordinator; + +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; + +/** Confirm coordinator done for state save. */ +public class FileInfoConfirmResponse implements CoordinationResponse { + private static final long serialVersionUID = 1L; + + private final long checkpointId; + + public FileInfoConfirmResponse(long checkpointId) { + this.checkpointId = checkpointId; + } + + public long getCheckpointId() { + return checkpointId; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/FileInfoEvent.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/FileInfoEvent.java new file mode 100644 index 000000000000..0ed440c9ecd0 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/FileInfoEvent.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.coordinator; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; + +/** File info event to coordinator. */ +public class FileInfoEvent implements OperatorEvent { + private static final long serialVersionUID = 1L; + + // Format:| Count (4B) | Len1 (4B) | Data1 | Len2 (4B) | Data2 | ... | + private final byte[] serializedData; + private final long checkpoint; + + public FileInfoEvent(byte[] serializedData, long checkpoint) { + this.serializedData = serializedData; + this.checkpoint = checkpoint; + } + + public byte[] getSerializedData() { + return serializedData; + } + + public long checkpoint() { + return checkpoint; + } + + @Override + public String toString() { + return String.format( + "FileInfoEvent{checkpoint=%d, dataSize=%d bytes}", + checkpoint, serializedData.length); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PaimonWriterCoordinator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PaimonWriterCoordinator.java new file mode 100644 index 000000000000..766662762b28 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PaimonWriterCoordinator.java @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.coordinator; + +import org.apache.paimon.flink.sink.Committer; +import org.apache.paimon.flink.sink.TableWriteOperator; +import org.apache.paimon.manifest.ManifestCommittableSerializer; +import org.apache.paimon.utils.ExceptionUtils; +import org.apache.paimon.utils.SerializableSupplier; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.util.ThrowableCatchingRunnable; +import org.apache.flink.util.function.ThrowingRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link OperatorCoordinator} for {@link TableWriteOperator}, as global commit node to obtain the + * file info for write operators and do commit. + */ +public class PaimonWriterCoordinator implements OperatorCoordinator, CoordinationRequestHandler { + private static final Logger LOG = LoggerFactory.getLogger(PaimonWriterCoordinator.class); + private final PendingSubtask pendingSubtask; + + private final String configuredStateDir; + + private volatile boolean isFreshInstance = true; + private ScheduledExecutorService coordinatorExecutor; + private final PaimonWriterCoordinator.CoordinatorExecutorThreadFactory coordinatorThreadFactory; + + private final CompletableFuture finalCheckpointCompleted = new CompletableFuture<>(); + private OperatorCoordinator.Context operatorCoordinatorContext; + + private CommitterCoordinator coordinator; + + private boolean started; + + public PaimonWriterCoordinator( + String configuredStateDir, + boolean streamingCheckpointEnabled, + String initialCommitUser, + Committer.Factory committerFactory, + SerializableSupplier committableSerializer, + Context operatorCoordinatorContext, + CoordinatorExecutorThreadFactory coordinatorThreadFactory, + Long endInputWatermark) { + this.configuredStateDir = configuredStateDir; + this.coordinatorThreadFactory = coordinatorThreadFactory; + this.operatorCoordinatorContext = operatorCoordinatorContext; + this.coordinator = + new CommitterCoordinator( + streamingCheckpointEnabled, + initialCommitUser, + committerFactory, + committableSerializer, + endInputWatermark); + this.pendingSubtask = new PendingSubtask(this.coordinator); + } + + @Override + public void start() throws Exception { + OperatorID operatorId = operatorCoordinatorContext.getOperatorId(); + LOG.info("PWC starting, operatorId={}", operatorId); + + this.started = true; + this.coordinatorExecutor = Executors.newScheduledThreadPool(1, coordinatorThreadFactory); + this.coordinator.initStateManager(operatorId, resolveBaseDir()); + this.coordinator.init(operatorCoordinatorContext.currentParallelism()); + } + + /** + * Save subtask index and attempt number in coordinator to filter file info from writer task. + */ + @Override + public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) { + runInEventLoop( + () -> pendingSubtask.registerSubtask(subtask, attemptNumber, gateway), + "making event gateway to subtask %d (#%d) available", + subtask, + attemptNumber); + } + + /** Failed specify subtask in coordinator. */ + @Override + public void executionAttemptFailed(int subtask, int attemptNumber, Throwable throwable) { + runInEventLoop( + () -> { + LOG.info( + "Removing registered reader after failure for subtask {} (#{}) of Committer.", + subtask, + attemptNumber); + pendingSubtask.unregisterSubtask(subtask, attemptNumber, throwable); + }, + "handling subtask %d (#%d) failure", + subtask, + attemptNumber); + } + + /** + * Receive file info from writer, check whether all tasks have been reported and then flush + * state to HDFS. + */ + @Override + public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) { + this.isFreshInstance = false; + runInEventLoop( + () -> { + if (!pendingSubtask.isValid(subtask, attemptNumber)) { + return; + } + if (event instanceof FileInfoEvent) { + pendingSubtask.receive(subtask, (FileInfoEvent) event); + } else if (event instanceof WatermarkEvent) { + long watermark = ((WatermarkEvent) event).getWatermark(); + coordinator.processWaterMark(watermark); + } + }, + "handling operator event %s from subtask %d (#%d)", + event, + subtask, + attemptNumber); + } + + public CompletableFuture handleCoordinationRequest( + CoordinationRequest request) { + this.isFreshInstance = false; + if (request instanceof FileInfoConfirmRequest) { + long checkpointId = ((FileInfoConfirmRequest) request).getCheckpointId(); + return pendingSubtask + .waitFor(checkpointId) + .thenApply(v -> new FileInfoConfirmResponse(checkpointId)); + } + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally( + new IllegalArgumentException("Unsupported request type: " + request.getClass())); + return future; + } + + @Override + public void checkpointCoordinator(long l, CompletableFuture completableFuture) { + this.isFreshInstance = false; + completableFuture.complete(new byte[0]); + } + + @Override + public void close() throws Exception { + coordinator.close(); + pendingSubtask.close(); + if (coordinatorExecutor != null) { + coordinatorExecutor.shutdownNow(); + } + } + + /** Notify coordinator to perform table commit for given checkpoint id. */ + @Override + public void notifyCheckpointComplete(long checkpointId) { + this.isFreshInstance = false; + runInEventLoop( + () -> { + try { + this.coordinator.notifyCheckpointComplete(checkpointId); + if (coordinator.isEndInput()) { + finalCheckpointCompleted.complete(null); + } + pendingSubtask.remove(checkpointId); + } catch (Exception e) { + throw new RuntimeException( + "Fail to do checkpointComplete for checkpoint: " + checkpointId, e); + } + }, + "notifying the enumerator of completion of checkpoint %d", + checkpointId); + if (coordinator.isEndInput()) { + try { + finalCheckpointCompleted.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void notifyCheckpointAborted(long checkpointId) { + runInEventLoop( + () -> { + LOG.info("Checkpoint-{} aborted, cleaning up", checkpointId); + pendingSubtask.abortCheckpoint(checkpointId); + coordinator.notifyCheckpointAborted(checkpointId); + }, + "aborting checkpoint-%d", + checkpointId); + } + + @Override + public void resetToCheckpoint(long checkpointId, byte[] bytes) throws Exception { + LOG.info( + "resetToCheckpoint: checkpointId={}, isFreshInstance={}", + checkpointId, + isFreshInstance); + if (isFreshInstance && checkpointId >= 0) { + // JM HA 恢复 + coordinator.restore(checkpointId); + } + + isFreshInstance = false; + } + + private String resolveBaseDir() { + if (configuredStateDir != null && !configuredStateDir.isEmpty()) { + return configuredStateDir; + } + + throw new IllegalStateException( + "PWC state directory not configured. " + + "Please set 'paimon.write-coordinator.state-dir'."); + } + + private void runInEventLoop( + final ThrowingRunnable action, + final String actionName, + final Object... actionNameFormatParameters) { + + ensureStarted(); + + if (coordinator == null) { + return; + } + + runInCoordinatorThread( + () -> { + try { + action.run(); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + + final String actionString = + String.format(actionName, actionNameFormatParameters); + LOG.error( + "Uncaught exception in the CommitCoordinator while {}. Triggering job failover.", + actionString, + t); + operatorCoordinatorContext.failJob(t); + } + }); + } + + public void runInCoordinatorThread(Runnable runnable) { + // when using a ScheduledThreadPool, uncaught exception handler catches only + // exceptions thrown by the threadPool, so manually call it when the exception is + // thrown by the runnable + coordinatorExecutor.execute( + new ThrowableCatchingRunnable( + throwable -> + coordinatorThreadFactory.uncaughtException( + Thread.currentThread(), throwable), + runnable)); + } + + private void ensureStarted() { + if (!started) { + throw new IllegalStateException("The coordinator has not started yet."); + } + } + + @Override + public void subtaskReset(int i, long l) {} + + /** sender for coordinator. */ + public static class WriterCoordinatorProvider implements OperatorCoordinator.Provider { + private static final long serialVersionUID = 1L; + + protected final boolean streamingCheckpointEnabled; + + private final String operatorName; + + private final String configuredStateDir; + + private final OperatorID operatorId; + + private final String initialCommitUser; + + private final Committer.Factory committerFactory; + + protected final Long endInputWatermark; + + public WriterCoordinatorProvider( + boolean streamingCheckpointEnabled, + String operatorName, + String configuredStateDir, + OperatorID operatorID, + String initialCommitUser, + Committer.Factory committerFactory, + Long endInputWatermark) { + this.streamingCheckpointEnabled = streamingCheckpointEnabled; + this.operatorName = operatorName; + this.configuredStateDir = configuredStateDir; + this.operatorId = operatorID; + this.initialCommitUser = initialCommitUser; + this.committerFactory = committerFactory; + this.endInputWatermark = endInputWatermark; + } + + @Override + public OperatorID getOperatorId() { + return operatorId; + } + + @Override + public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception { + final String coordinatorThreadName = " PaimonWriterCoordinator-" + operatorName; + PaimonWriterCoordinator.CoordinatorExecutorThreadFactory coordinatorThreadFactory = + new PaimonWriterCoordinator.CoordinatorExecutorThreadFactory( + coordinatorThreadName, context); + return new PaimonWriterCoordinator( + configuredStateDir, + streamingCheckpointEnabled, + initialCommitUser, + committerFactory, + ManifestCommittableSerializer::new, + context, + coordinatorThreadFactory, + endInputWatermark); + } + } + + /** + * A thread factory class that provides some helper methods. Because it is used to check the + * current thread, it is a one-off, do not use this ThreadFactory to create multiple threads. + */ + public static class CoordinatorExecutorThreadFactory + implements ThreadFactory, Thread.UncaughtExceptionHandler { + + private final String coordinatorThreadName; + private final ClassLoader cl; + private final Thread.UncaughtExceptionHandler errorHandler; + + @Nullable private Thread t; + + CoordinatorExecutorThreadFactory( + final String coordinatorThreadName, final OperatorCoordinator.Context context) { + this( + coordinatorThreadName, + context.getUserCodeClassloader(), + (t, e) -> { + LOG.error( + "Thread '{}' produced an uncaught exception. Failing the job.", + t.getName(), + e); + context.failJob(e); + }); + } + + CoordinatorExecutorThreadFactory( + final String coordinatorThreadName, + final ClassLoader contextClassLoader, + final Thread.UncaughtExceptionHandler errorHandler) { + this.coordinatorThreadName = coordinatorThreadName; + this.cl = contextClassLoader; + this.errorHandler = errorHandler; + } + + @Override + public synchronized Thread newThread(Runnable r) { + checkState( + t == null, + "Please using the new CoordinatorExecutorThreadFactory," + + " this factory cannot new multiple threads."); + t = new Thread(r, coordinatorThreadName); + t.setContextClassLoader(cl); + t.setUncaughtExceptionHandler(this); + return t; + } + + @Override + public synchronized void uncaughtException(Thread t, Throwable e) { + errorHandler.uncaughtException(t, e); + } + + String getCoordinatorThreadName() { + return coordinatorThreadName; + } + + boolean isCurrentThreadCoordinatorThread() { + return Thread.currentThread() == t; + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PendingSubtask.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PendingSubtask.java new file mode 100644 index 000000000000..30853a9ec36a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PendingSubtask.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.coordinator; + +import org.apache.paimon.flink.sink.Committable; + +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** Subtask manager for commit coordinator. */ +public class PendingSubtask { + private final HashMap> + registeredSubtask; + private final CommitterCoordinator coordinator; + Map> notYetCheckpoint; + + // taskId-chk-fileInfo + Map>> temporaryCommittables = new HashMap<>(); + + private volatile ConcurrentHashMap> futures = + new ConcurrentHashMap<>(); + + public PendingSubtask(CommitterCoordinator coordinator) { + this.registeredSubtask = new HashMap<>(); + this.coordinator = coordinator; + this.notYetCheckpoint = new HashMap<>(); + } + + public void registerSubtask( + int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway gateway) { + registeredSubtask + .computeIfAbsent(subtask, a -> new HashMap<>()) + .put(attemptNumber, gateway); + } + + public void unregisterSubtask(int subtask, int attemptNumber, Throwable throwable) { + registeredSubtask.computeIfAbsent(subtask, a -> new HashMap<>()).remove(attemptNumber); + } + + public boolean isValid(int subtask, int attemptNumber) { + HashMap subtaskMap = + registeredSubtask.get(subtask); + return subtaskMap != null && subtaskMap.containsKey(attemptNumber); + } + + private boolean allReceived(long checkpoint) { + return notYetCheckpoint.get(checkpoint).isEmpty(); + } + + public void receive(int subtask, FileInfoEvent fileInfoEvent) throws Exception { + long checkpointId = fileInfoEvent.checkpoint(); + temporaryCommittables + .computeIfAbsent(subtask, k -> new HashMap<>()) + .put( + checkpointId, + CoordinatedFileInfoSender.deserializeCommittables( + fileInfoEvent.getSerializedData())); + notYetCheckpoint + .computeIfAbsent(checkpointId, k -> new HashSet<>(registeredSubtask.keySet())) + .remove(subtask); + if (allReceived(checkpointId) + && coordinator.save(getAllTemporaryCommittables(checkpointId), checkpointId)) { + signal(checkpointId); + cleanupCheckpointState(checkpointId); + } + } + + public void notifyEndInput(int subtask) throws Exception { + notYetCheckpoint + .computeIfAbsent( + CommitterCoordinator.END_INPUT_CHECKPOINT_ID, + k -> new HashSet<>(registeredSubtask.keySet())) + .remove(subtask); + if (allReceived(CommitterCoordinator.END_INPUT_CHECKPOINT_ID)) { + coordinator.endInput(); + } + } + + private List getAllTemporaryCommittables(long checkpoint) { + + List committables = new ArrayList<>(); + for (Map> map : temporaryCommittables.values()) { + committables.addAll(map.get(checkpoint)); + } + return committables; + } + + public void abortCheckpoint(long checkpointId) { + cleanupCheckpointState(checkpointId); + remove(checkpointId); + } + + private void cleanupCheckpointState(long checkpointId) { + notYetCheckpoint.remove(checkpointId); + + for (Map> subtaskState : temporaryCommittables.values()) { + subtaskState.remove(checkpointId); + } + + temporaryCommittables.entrySet().removeIf(entry -> entry.getValue().isEmpty()); + } + + public void signal(long checkpointId) { + futures.compute( + checkpointId, + (k, existingFuture) -> { + if (existingFuture == null) { + return CompletableFuture.completedFuture(null); + } else { + existingFuture.complete(null); + return existingFuture; + } + }); + } + + public CompletableFuture waitFor(long checkpointId) { + return futures.computeIfAbsent(checkpointId, k -> new CompletableFuture<>()); + } + + public void remove(long value) { + futures.remove(value); + } + + public void close() { + notYetCheckpoint.clear(); + temporaryCommittables.clear(); + futures.clear(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PwcState.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PwcState.java new file mode 100644 index 000000000000..b325be921309 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PwcState.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.coordinator; + +/** pwc state with ck and global commit. */ +public class PwcState { + public final long checkpointId; + public final GlobalCommitT globalCommit; + + public PwcState(long checkpointId, GlobalCommitT globalCommit) { + this.checkpointId = checkpointId; + this.globalCommit = globalCommit; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PwcStateManager.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PwcStateManager.java new file mode 100644 index 000000000000..8f424211cad8 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PwcStateManager.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.coordinator; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** PWC HDFS state Manage using OperatorID . path: baseDir/operatorId/checkpoint-{ckId}.state */ +public class PwcStateManager { + + private static final Logger LOG = LoggerFactory.getLogger(PwcStateManager.class); + private static final String FILE_SUFFIX = ".state"; + + private final FileSystem fs; + private final Path basePath; + private final PwcStateSerializer serializer; + private final int retryTimes; + + public PwcStateManager( + String baseDir, OperatorID operatorId, PwcStateSerializer serializer, int retryTimes) + throws IOException { + + this.serializer = serializer; + this.retryTimes = retryTimes; + this.basePath = new Path(baseDir, operatorId.toHexString()); + + Configuration hadoopConf = new Configuration(); + this.fs = FileSystem.get(basePath.toUri(), hadoopConf); + + if (!fs.exists(basePath)) { + fs.mkdirs(basePath); + } + + LOG.info("PwcStateManager initialized: basePath={}", basePath); + } + + public void writeState(PwcState state) throws IOException { + long checkpointId = state.checkpointId; + Path finalPath = getPath(checkpointId); + Path tmpPath = new Path(finalPath.getParent(), finalPath.getName() + ".tmp"); + + IOException lastException = null; + + for (int i = 0; i < retryTimes; i++) { + try { + byte[] bytes = serializer.serialize(state); + + try (FSDataOutputStream out = fs.create(tmpPath, true)) { + out.write(bytes); + out.hsync(); + } + + if (fs.rename(tmpPath, finalPath)) { + LOG.debug("Wrote PWC state for checkpoint-{} to {}", checkpointId, finalPath); + return; + } + + throw new IOException("Rename failed: " + tmpPath + " -> " + finalPath); + + } catch (IOException e) { + lastException = e; + LOG.warn( + "Write state failed (attempt {}/{}): {}", + i + 1, + retryTimes, + e.getMessage()); + tryDelete(tmpPath); + } + } + + throw new IOException("Write state failed after " + retryTimes + " retries", lastException); + } + + public PwcState readState(long checkpointId) throws IOException { + Path path = getPath(checkpointId); + if (!fs.exists(path)) { + return null; + } + + try (FSDataInputStream in = fs.open(path)) { + byte[] bytes = new byte[(int) fs.getFileStatus(path).getLen()]; + in.readFully(bytes); + return serializer.deserialize(bytes); + } + } + + public void deleteState(long checkpointId) { + tryDelete(getPath(checkpointId)); + } + + public List listCheckpointIds() throws IOException { + List result = new ArrayList<>(); + + if (!fs.exists(basePath)) { + return result; + } + + FileStatus[] files = fs.listStatus(basePath, path -> path.getName().endsWith(FILE_SUFFIX)); + + for (FileStatus file : files) { + String name = file.getPath().getName(); + try { + String idStr = + name.substring( + "checkpoint-".length(), name.length() - FILE_SUFFIX.length()); + result.add(Long.parseLong(idStr)); + } catch (NumberFormatException e) { + LOG.warn("Invalid checkpoint file name: {}", name); + } + } + + return result; + } + + public void cleanupAll() throws IOException { + if (!fs.exists(basePath)) { + return; + } + + FileStatus[] files = fs.listStatus(basePath); + for (FileStatus file : files) { + fs.delete(file.getPath(), true); + } + + LOG.info("Cleaned up all PWC states in {}", basePath); + } + + private Path getPath(long checkpointId) { + return new Path(basePath, "checkpoint-" + checkpointId + FILE_SUFFIX); + } + + private void tryDelete(Path path) { + try { + if (fs.exists(path)) { + fs.delete(path, false); + LOG.debug("Deleted PWC state: {}", path); + } + } catch (IOException e) { + LOG.warn("Failed to delete PWC state: {}", path, e); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PwcStateSerializer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PwcStateSerializer.java new file mode 100644 index 000000000000..d7fdf4330d2e --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PwcStateSerializer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.coordinator; + +import org.apache.paimon.data.serializer.VersionedSerializer; + +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +/** pwc state serializer using VersionedSerializer. */ +public class PwcStateSerializer { + + private final VersionedSerializer globalCommitSerializer; + + public PwcStateSerializer(VersionedSerializer globalCommitSerializer) { + this.globalCommitSerializer = globalCommitSerializer; + } + + public byte[] serialize(PwcState state) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputViewStreamWrapper(baos)) { + + out.writeLong(state.checkpointId); + out.writeInt(globalCommitSerializer.getVersion()); + byte[] serialized = globalCommitSerializer.serialize(state.globalCommit); + out.writeInt(serialized.length); + out.write(serialized); + + out.flush(); + return baos.toByteArray(); + } + } + + public PwcState deserialize(byte[] bytes) throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + DataInputStream in = new DataInputViewStreamWrapper(bais)) { + + long checkpointId = in.readLong(); + int version = in.readInt(); + + int len = in.readInt(); + byte[] serialized = new byte[len]; + in.readFully(serialized); + + GlobalCommitT globalCommit = globalCommitSerializer.deserialize(version, serialized); + + return new PwcState<>(checkpointId, globalCommit); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WatermarkEvent.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WatermarkEvent.java new file mode 100644 index 000000000000..dbf1e544d669 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WatermarkEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.coordinator; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; + +/** Watermark event to commit coordinator. */ +public class WatermarkEvent implements OperatorEvent { + private final long watermark; + + public WatermarkEvent(long watermark) { + this.watermark = watermark; + } + + public long getWatermark() { + return watermark; + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/PaimonWriterCoordinatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/PaimonWriterCoordinatorTest.java new file mode 100644 index 000000000000..20c06ad0cd46 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/PaimonWriterCoordinatorTest.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.coordinator; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.sink.Committer; +import org.apache.paimon.flink.sink.RowDataStoreWriteOperator; +import org.apache.paimon.flink.sink.StoreCommitter; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.manifest.ManifestCommittableSerializer; +import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.SerializableSupplier; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.SerializedValue; +import org.assertj.core.util.Lists; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Test for {@link RowDataStoreWriteOperator}. */ +public class PaimonWriterCoordinatorTest { + private static final long CK0 = 7L; + private static final long CK1 = 8L; + private static final long CK2 = 9L; + private static final List CK0_ROW_DATA = Lists.newArrayList(GenericRow.of(0, 0L)); + private static final List CK1_ROW_DATA = + Lists.newArrayList(GenericRow.of(1, 10L), GenericRow.of(2, 20L)); + private static final List CK1_RETRY_ROW_DATA = + Lists.newArrayList(GenericRow.of(1, 10L), GenericRow.of(2, 20L), GenericRow.of(3, 30L)); + private static final List CK2_ROW_DATA = + Lists.newArrayList(GenericRow.of(3, 30L), GenericRow.of(4, 40L)); + + private static final RowType ROW_TYPE = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.BIGINT()}, new String[] {"a", "b"}); + public String configuredStateDir; + + @TempDir public java.nio.file.Path tempDir; + protected Path tablePath; + public OperatorID operatorID; + + @BeforeEach + public void before() { + tablePath = new Path(tempDir.toString()); + initialCommitUser = UUID.randomUUID().toString(); + configuredStateDir = tempDir.toString() + "/pwc"; + operatorID = new OperatorID(); + } + + protected String initialCommitUser; + + // all normal + @Test + public void testCoordinatorDefault() throws Exception { + boolean streamingCheckpointEnabled = true; + FileStoreTable table = createFileStoreTable(); + PaimonWriterCoordinator coordinator = createCoordinator(table, streamingCheckpointEnabled); + coordinator.start(); + + doneForCK0(coordinator, table); + + tryCk(CK1, CK1_ROW_DATA, coordinator, table, true, true); + assertResults(table, "0, 0", "1, 10", "2, 20"); + + coordinator.close(); + } + + // =======================support recovery============================ + + // JM normal; WriteTask failover + // CK1 fail; recover from CK1; check; CK2 done; check; + @Test + public void testFailIntentionally() throws Exception { + boolean streamingCheckpointEnabled = true; + FileStoreTable table = createFileStoreTable(); + PaimonWriterCoordinator coordinator = createCoordinator(table, streamingCheckpointEnabled); + coordinator.start(); + + doneForCK0(coordinator, table); + + tryCk(CK1, CK1_ROW_DATA, coordinator, table, streamingCheckpointEnabled, false); + assertThat(table.snapshotManager().latestSnapshotId() == CK0); + coordinator.notifyCheckpointAborted(CK1); + + coordinator.resetToCheckpoint(CK0, null); + assertResults(table, "0, 0"); + + tryCk(CK2, CK2_ROW_DATA, coordinator, table, streamingCheckpointEnabled, true); + assertResults(table, "0, 0", "3, 30", "4, 40"); + + coordinator.close(); + } + + // JM failover; JM restart from CK0; CK1 fail + // CK0 done; CK1 fail; recover from CK0; check; + @Test + public void testCoordinatorFailoverWithRecoveryCK0() throws Exception { + FileStoreTable table = createFileStoreTable(); + boolean streamingCheckpointEnabled = true; + PaimonWriterCoordinator coordinator1 = createCoordinator(table, streamingCheckpointEnabled); + coordinator1.start(); + + doneForCK0(coordinator1, table); + + tryCk(CK1, CK1_ROW_DATA, coordinator1, table, streamingCheckpointEnabled, false); + assertThat(table.snapshotManager().latestSnapshotId() == CK0); + coordinator1.close(); + + PaimonWriterCoordinator coordinator2 = createCoordinator(table, streamingCheckpointEnabled); + coordinator2.start(); + coordinator2.resetToCheckpoint(CK0, null); + assertResults(table, "0, 0"); + + coordinator2.close(); + } + + // Checkpoint coordinator not notice CK1 fail, replace + @Test + public void testCoordinatorFailoverWithRecoveryCK0withoutNotice() throws Exception { + FileStoreTable table = createFileStoreTable(); + boolean streamingCheckpointEnabled = true; + PaimonWriterCoordinator coordinator1 = createCoordinator(table, streamingCheckpointEnabled); + coordinator1.start(); + + doneForCK0(coordinator1, table); + + tryCk(CK1, CK1_ROW_DATA, coordinator1, table, streamingCheckpointEnabled, false); + assertThat(table.snapshotManager().latestSnapshotId() == CK0); + coordinator1.close(); + + PaimonWriterCoordinator coordinator2 = createCoordinator(table, streamingCheckpointEnabled); + coordinator2.start(); + coordinator2.resetToCheckpoint(CK0, null); + assertResults(table, "0, 0"); + + tryCk(CK1, CK1_RETRY_ROW_DATA, coordinator2, table, streamingCheckpointEnabled, true); + assertResults(table, "0, 0", "1, 10", "2, 20", "3, 30"); + coordinator2.close(); + } + + // JM failover; Restart from CK1; restart CK2 + // CK1 done; trying CK2 ; recover from CK1; check; CK2 done; check; + @Test + public void testCoordinatorFailoverWithRecoveryFromCK1() throws Exception { + FileStoreTable table = createFileStoreTable(); + boolean streamingCheckpointEnabled = true; + + PaimonWriterCoordinator coordinator1 = createCoordinator(table, streamingCheckpointEnabled); + coordinator1.start(); + doneForCK0(coordinator1, table); + assertThat(table.snapshotManager().latestSnapshotId() == CK1); + + tryCk(CK1, CK1_ROW_DATA, coordinator1, table, streamingCheckpointEnabled, false); + coordinator1.close(); + + PaimonWriterCoordinator coordinator2 = createCoordinator(table, streamingCheckpointEnabled); + coordinator2.start(); + coordinator2.resetToCheckpoint(CK1, null); + assertResults(table, "0, 0", "1, 10", "2, 20"); + + coordinator2.close(); + } + + // ==================== Helper Methods ==================== + + private void doneForCK0(PaimonWriterCoordinator coordinator, FileStoreTable table) + throws Exception { + tryCk(CK0, CK0_ROW_DATA, coordinator, table, true, true); + assertResults(table, "0, 0"); + } + + private void tryCk( + long ck, + List input, + PaimonWriterCoordinator coordinator, + FileStoreTable table, + boolean streamingCheckpointEnabled, + boolean doCommit) + throws Exception { + TaskOperatorEventGateway gateway = Mockito.mock(TaskOperatorEventGateway.class); + CoordinatedFileInfoSender sender = + new CoordinatedFileInfoSender(gateway, operatorID, streamingCheckpointEnabled); + StreamTableWrite write = + table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite(); + for (GenericRow row : input) { + write.write(row); + } + for (CommitMessage committable : write.prepareCommit(false, ck)) { + sender.collect(new Committable(ck, Committable.Kind.FILE, committable)); + } + + sender.sendToCoordinator(ck); + + ArgumentCaptor zeroCaptor = ArgumentCaptor.forClass(SerializedValue.class); + verify(gateway).sendOperatorEventToCoordinator(eq(operatorID), zeroCaptor.capture()); + SerializedValue capturedValue = zeroCaptor.getValue(); + OperatorEvent actualEvent = + capturedValue.deserializeValue(OperatorEvent.class.getClassLoader()); + + OperatorCoordinator.SubtaskGateway subtaskGateway = + Mockito.mock(OperatorCoordinator.SubtaskGateway.class); + // register subtask + coordinator.executionAttemptReady(0, 2, subtaskGateway); + + coordinator.handleEventFromOperator(0, 2, actualEvent); + if (doCommit) { + coordinator.notifyCheckpointComplete(ck); + } + waitForCoordinatorToProcessActions(coordinator); + write.close(); + } + + private PaimonWriterCoordinator createCoordinator( + FileStoreTable table, boolean streamingCheckpointEnabled) { + String initialCommitUser = "testUser"; + OperatorCoordinator.Context context = Mockito.mock(OperatorCoordinator.Context.class); + when(context.getOperatorId()).thenReturn(operatorID); + Committer.Factory committerFactory = createCommitterFactory(table); + SerializableSupplier committableSerializer = ManifestCommittableSerializer::new; + when(context.currentParallelism()).thenReturn(1); + PaimonWriterCoordinator.CoordinatorExecutorThreadFactory coordinatorThreadFactory = + new PaimonWriterCoordinator.CoordinatorExecutorThreadFactory("PWC", context); + Long endInputWatermark = 1000L; + + return new PaimonWriterCoordinator( + configuredStateDir, + streamingCheckpointEnabled, + initialCommitUser, + committerFactory, + committableSerializer, + context, + coordinatorThreadFactory, + endInputWatermark); + } + + static void waitForCoordinatorToProcessActions(PaimonWriterCoordinator coordinator) { + final CompletableFuture future = new CompletableFuture<>(); + coordinator.runInCoordinatorThread(() -> future.complete(null)); + + try { + future.get(); + } catch (InterruptedException e) { + throw new AssertionError("test interrupted"); + } catch (ExecutionException e) { + ExceptionUtils.rethrow(ExceptionUtils.stripExecutionException(e)); + } + } + + protected Committer.Factory createCommitterFactory( + FileStoreTable table) { + return context -> + new StoreCommitter( + table, + table.newCommit(context.commitUser()) + .ignoreEmptyCommit(!context.streamingCheckpointEnabled()), + context); + } + + protected void assertResults(FileStoreTable table, String... expected) { + TableRead read = table.newReadBuilder().newRead(); + List actual = new ArrayList<>(); + table.newReadBuilder() + .newScan() + .plan() + .splits() + .forEach( + s -> { + try { + RecordReader recordReader = read.createReader(s); + CloseableIterator it = + new RecordReaderIterator<>(recordReader); + while (it.hasNext()) { + InternalRow row = it.next(); + actual.add(row.getInt(0) + ", " + row.getLong(1)); + } + it.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + Collections.sort(actual); + assertThat(actual).isEqualTo(Arrays.asList(expected)); + } + + protected FileStoreTable createFileStoreTable() throws Exception { + return createFileStoreTable(options -> {}, Collections.emptyList()); + } + + protected FileStoreTable createFileStoreTable( + Consumer setOptions, List partitionKeys) throws Exception { + Options conf = new Options(); + conf.set(CoreOptions.PATH, tablePath.toString()); + conf.setString("bucket", "1"); + conf.setString("bucket-key", "a"); + setOptions.accept(conf); + SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), tablePath); + schemaManager.createTable( + new Schema( + ROW_TYPE.getFields(), + partitionKeys, + Collections.emptyList(), + conf.toMap(), + "")); + return FileStoreTableFactory.create(LocalFileIO.create(), conf); + } +} From 158eeeecd7e4972edc62573077a88cfb4f600596 Mon Sep 17 00:00:00 2001 From: fishfishfishfishaa <792850842@qq.com> Date: Tue, 26 May 2026 15:54:38 +0800 Subject: [PATCH 2/5] fix conflict --- .../org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java index 827337f2bca1..ab0775692a7a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import javax.annotation.Nullable; From 0ba4578b803c8389071af9dfb7d4247421ac6fe1 Mon Sep 17 00:00:00 2001 From: fishfishfishfishaa <792850842@qq.com> Date: Tue, 26 May 2026 16:33:40 +0800 Subject: [PATCH 3/5] fix conflict --- .../java/org/apache/paimon/flink/sink/FixedBucketSink.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java index fe7462198f96..cc82988419e3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FixedBucketSink.java @@ -72,8 +72,7 @@ protected OneInputStreamOperatorFactory createWriteCoo return new CommitterCoordinatedFactory( isStreaming, configuredStateDir, - new RowDataStoreWriteOperator.Factory( - table, logSinkFunction, writeProvider, commitUser), + new RowDataStoreWriteOperator.Factory(table, writeProvider, commitUser), createCommitterFactory(), commitUser, options.get(END_INPUT_WATERMARK)); From 3a80122f8a756bf83793d7840262b3b39766d881 Mon Sep 17 00:00:00 2001 From: fishfishfishfishaa <792850842@qq.com> Date: Tue, 26 May 2026 17:05:23 +0800 Subject: [PATCH 4/5] fix ut --- .../flink/sink/coordinator/PaimonWriterCoordinatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/PaimonWriterCoordinatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/PaimonWriterCoordinatorTest.java index 20c06ad0cd46..4671028f51aa 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/PaimonWriterCoordinatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/coordinator/PaimonWriterCoordinatorTest.java @@ -241,7 +241,7 @@ private void tryCk( write.write(row); } for (CommitMessage committable : write.prepareCommit(false, ck)) { - sender.collect(new Committable(ck, Committable.Kind.FILE, committable)); + sender.collect(new Committable(ck, committable)); } sender.sendToCoordinator(ck); From 9b6d5212dadce7f663ff40cd969f3f7e29d1bd90 Mon Sep 17 00:00:00 2001 From: fishfishfishfishaa <792850842@qq.com> Date: Wed, 27 May 2026 11:10:13 +0800 Subject: [PATCH 5/5] docs: regenerate config docs for new options --- docs/generated/catalog_configuration.html | 18 +++++++++++ docs/generated/cdc_configuration.html | 18 +++++++++++ docs/generated/core_configuration.html | 30 +++++++++++++++---- .../flink_catalog_configuration.html | 18 +++++++++++ .../flink_connector_configuration.html | 30 +++++++++++++++++++ .../generated/hive_catalog_configuration.html | 18 +++++++++++ .../hive_connector_configuration.html | 18 +++++++++++ docs/generated/iceberg_configuration.html | 18 +++++++++++ .../generated/jdbc_catalog_configuration.html | 18 +++++++++++ docs/generated/orc_configuration.html | 18 +++++++++++ docs/generated/rocksdb_configuration.html | 18 +++++++++++ .../spark_catalog_configuration.html | 18 +++++++++++ .../spark_connector_configuration.html | 18 +++++++++++ 13 files changed, 252 insertions(+), 6 deletions(-) diff --git a/docs/generated/catalog_configuration.html b/docs/generated/catalog_configuration.html index 2a5f25ef4370..74053392d1ca 100644 --- a/docs/generated/catalog_configuration.html +++ b/docs/generated/catalog_configuration.html @@ -1,3 +1,21 @@ + diff --git a/docs/generated/cdc_configuration.html b/docs/generated/cdc_configuration.html index 989440f124a7..a6ff6ba4754a 100644 --- a/docs/generated/cdc_configuration.html +++ b/docs/generated/cdc_configuration.html @@ -1,3 +1,21 @@ +
diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index 481369245597..22e9c8bcefb6 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -1,3 +1,21 @@ +
@@ -44,6 +62,12 @@ + + + + + + @@ -434,12 +458,6 @@ - - - - - - diff --git a/docs/generated/flink_catalog_configuration.html b/docs/generated/flink_catalog_configuration.html index 91c565e347f9..6241c224eaf8 100644 --- a/docs/generated/flink_catalog_configuration.html +++ b/docs/generated/flink_catalog_configuration.html @@ -1,3 +1,21 @@ +
Boolean Write blob field using blob descriptor rather than blob bytes.
blob-compaction.enabled
falseBooleanWhether to compact blob files when compacting a data evolution table.
blob-descriptor-field
(none)Duration The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication.
blob-compaction.enabled
falseBooleanWhether to compact blob files when compacting a data evolution table.
data-evolution.enabled
false
diff --git a/docs/generated/flink_connector_configuration.html b/docs/generated/flink_connector_configuration.html index 347473e479bb..9e72a5b33b74 100644 --- a/docs/generated/flink_connector_configuration.html +++ b/docs/generated/flink_connector_configuration.html @@ -1,3 +1,21 @@ +
@@ -230,6 +248,18 @@ + + + + + + + + + + + + diff --git a/docs/generated/hive_catalog_configuration.html b/docs/generated/hive_catalog_configuration.html index 66e39f6a4ff2..48adc1114fb3 100644 --- a/docs/generated/hive_catalog_configuration.html +++ b/docs/generated/hive_catalog_configuration.html @@ -1,3 +1,21 @@ +
Boolean Indicates whether to further sort data belonged to each sink task after range partitioning.
sink.committer-coordinator-operator.enabled
falseBooleanEnable committer coordinator to commit in Job Manager.
sink.committer-coordinator-operator.state-dir
(none)StringOptional override for PWC state directory. If not set, uses Flink checkpoint directory.
sink.committer-cpu
1.0
diff --git a/docs/generated/hive_connector_configuration.html b/docs/generated/hive_connector_configuration.html index a3aac1d1b16e..3e0d7fb2282f 100644 --- a/docs/generated/hive_connector_configuration.html +++ b/docs/generated/hive_connector_configuration.html @@ -1,3 +1,21 @@ +
diff --git a/docs/generated/iceberg_configuration.html b/docs/generated/iceberg_configuration.html index e0e6d9126f01..c08388c8cbc4 100644 --- a/docs/generated/iceberg_configuration.html +++ b/docs/generated/iceberg_configuration.html @@ -1,3 +1,21 @@ +
diff --git a/docs/generated/jdbc_catalog_configuration.html b/docs/generated/jdbc_catalog_configuration.html index 9ec21bc1c465..9b36f3f37257 100644 --- a/docs/generated/jdbc_catalog_configuration.html +++ b/docs/generated/jdbc_catalog_configuration.html @@ -1,3 +1,21 @@ +
diff --git a/docs/generated/orc_configuration.html b/docs/generated/orc_configuration.html index 62cd8ddd8754..c5d5d7d42075 100644 --- a/docs/generated/orc_configuration.html +++ b/docs/generated/orc_configuration.html @@ -1,3 +1,21 @@ +
diff --git a/docs/generated/rocksdb_configuration.html b/docs/generated/rocksdb_configuration.html index 3ba853bf4382..d24f2f379f27 100644 --- a/docs/generated/rocksdb_configuration.html +++ b/docs/generated/rocksdb_configuration.html @@ -1,3 +1,21 @@ +
diff --git a/docs/generated/spark_catalog_configuration.html b/docs/generated/spark_catalog_configuration.html index d8b0674b06b8..f09bfed60e91 100644 --- a/docs/generated/spark_catalog_configuration.html +++ b/docs/generated/spark_catalog_configuration.html @@ -1,3 +1,21 @@ +
diff --git a/docs/generated/spark_connector_configuration.html b/docs/generated/spark_connector_configuration.html index c4ed50b8a2f5..a6c8278f4c44 100644 --- a/docs/generated/spark_connector_configuration.html +++ b/docs/generated/spark_connector_configuration.html @@ -1,3 +1,21 @@ +