From 8f9110536591506b6112c9468176332d9b71a758 Mon Sep 17 00:00:00 2001 From: Nick Del Nano Date: Mon, 27 Apr 2026 14:28:14 -0700 Subject: [PATCH] Add option to avoid shuffle in bucket unaware append sink --- .../flink_connector_configuration.html | 6 ++++++ .../flink/action/cdc/SyncTableActionBase.java | 8 +++++++- .../cdc/kafka/KafkaSyncTableAction.java | 16 +++++++++++++++ .../flink/sink/cdc/CdcAppendTableSink.java | 15 +++++++++++++- .../paimon/flink/sink/cdc/CdcSinkBuilder.java | 9 +++++++++ .../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 20 +++++++++++++------ .../paimon/flink/FlinkConnectorOptions.java | 12 +++++++++++ 7 files changed, 78 insertions(+), 8 deletions(-) diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index 253652cb78e8..359b8dba9834 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -362,5 +362,11 @@ Integer Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. + +
unaware-bucket.no-shuffle
+ false + Boolean + If true, the CDC sync pipeline will skip the network shuffle between source and writer operators. This is only supported for bucket-unaware (append) tables where each writer subtask independently appends data without bucket ownership constraints. This eliminates data transfer overhead when the source already provides suitable data distribution (e.g., Kafka partitions). + diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index 0f984d365419..c3d2e94bcace 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -163,6 +163,12 @@ protected EventParser.Factory buildEventParserFactory() protected void buildSink( DataStream input, EventParser.Factory parserFactory) { + createCdcSinkBuilder(input, parserFactory).build(); + } + + protected CdcSinkBuilder createCdcSinkBuilder( + DataStream input, + EventParser.Factory parserFactory) { CdcSinkBuilder sinkBuilder = new CdcSinkBuilder() .withInput(input) @@ -175,7 +181,7 @@ protected void buildSink( if (sinkParallelism != null) { sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism)); } - sinkBuilder.build(); + return sinkBuilder; } private void checkConstraints() { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java index bff61ab0cde1..9c3f18f7077b 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java @@ -18,8 +18,13 @@ package org.apache.paimon.flink.action.cdc.kafka; +import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase; import org.apache.paimon.flink.action.cdc.SyncJobHandler; +import org.apache.paimon.flink.sink.cdc.EventParser; +import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; + +import org.apache.flink.streaming.api.datastream.DataStream; import java.util.Map; @@ -33,4 +38,15 @@ public KafkaSyncTableAction( Map kafkaConfig) { super(database, table, catalogConfig, kafkaConfig, SyncJobHandler.SourceType.KAFKA); } + + @Override + protected void buildSink( + DataStream input, + EventParser.Factory parserFactory) { + boolean noShuffle = + Boolean.parseBoolean( + tableConfig.getOrDefault( + FlinkConnectorOptions.UNAWARE_BUCKET_NO_SHUFFLE.key(), "false")); + createCdcSinkBuilder(input, parserFactory).withNoShuffle(noShuffle).build(); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java index 7a5b2a0f11f8..6694601bc74a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java @@ -26,6 +26,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import javax.annotation.Nullable; @@ -37,10 +38,16 @@ public class CdcAppendTableSink extends FlinkWriteSink { private final Integer parallelism; + private final boolean noShuffle; public CdcAppendTableSink(FileStoreTable table, Integer parallelism) { + this(table, parallelism, false); + } + + public CdcAppendTableSink(FileStoreTable table, Integer parallelism, boolean noShuffle) { super(table, null); this.parallelism = parallelism; + this.noShuffle = noShuffle; } @Override @@ -52,7 +59,13 @@ protected OneInputStreamOperatorFactory createWriteOpera @Override public DataStream doWrite( DataStream input, String initialCommitUser, @Nullable Integer parallelism) { - return super.doWrite(input, initialCommitUser, this.parallelism); + DataStream written = super.doWrite(input, initialCommitUser, this.parallelism); + if (noShuffle) { + // Break operator chaining between parse and write to avoid deadlock + // during schema evolution retries, without introducing a network shuffle. + ((SingleOutputStreamOperator) written).startNewChain(); + } + return written; } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java index e4518d3a26bc..3ae0deb5481c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java @@ -54,6 +54,7 @@ public class CdcSinkBuilder { private TypeMapping typeMapping = null; @Nullable private Integer parallelism; + private boolean noShuffle = false; public CdcSinkBuilder withInput(DataStream input) { this.input = input; @@ -90,6 +91,11 @@ public CdcSinkBuilder withTypeMapping(TypeMapping typeMapping) { return this; } + public CdcSinkBuilder withNoShuffle(boolean noShuffle) { + this.noShuffle = noShuffle; + return this; + } + public DataStreamSink build() { Preconditions.checkNotNull(input, "Input DataStream can not be null."); Preconditions.checkNotNull(parserFactory, "Event ParserFactory can not be null."); @@ -160,6 +166,9 @@ private DataStreamSink buildForPostponeBucket(DataStream parsed) { private DataStreamSink buildForUnawareBucket(DataStream parsed) { FileStoreTable dataTable = (FileStoreTable) table; + if (noShuffle) { + return new CdcAppendTableSink(dataTable, parallelism, true).sinkFrom(parsed); + } // rebalance it to make sure schema change work to avoid infinite loop return new CdcAppendTableSink(dataTable, parallelism).sinkFrom(parsed.rebalance()); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java index ea0265297178..c7e66452a95d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java @@ -71,36 +71,43 @@ public class FlinkCdcSyncTableSinkITCase extends AbstractTestBase { @Test @Timeout(120) public void testRandomCdcEvents() throws Exception { - innerTestRandomCdcEvents(ThreadLocalRandom.current().nextInt(5) + 1, false, false); + innerTestRandomCdcEvents(ThreadLocalRandom.current().nextInt(5) + 1, false, false, false); } @Test @Timeout(120) public void testRandomCdcEventsDynamicBucket() throws Exception { - innerTestRandomCdcEvents(-1, false, false); + innerTestRandomCdcEvents(-1, false, false, false); } @Test @Timeout(120) public void testRandomCdcEventsPostponeBucket() throws Exception { - innerTestRandomCdcEvents(BucketMode.POSTPONE_BUCKET, false, false); + innerTestRandomCdcEvents(BucketMode.POSTPONE_BUCKET, false, false, false); } @Disabled @Test @Timeout(120) public void testRandomCdcEventsGlobalDynamicBucket() throws Exception { - innerTestRandomCdcEvents(-1, true, false); + innerTestRandomCdcEvents(-1, true, false, false); } @Test @Timeout(120) public void testRandomCdcEventsUnawareBucket() throws Exception { - innerTestRandomCdcEvents(-1, false, true); + innerTestRandomCdcEvents(-1, false, true, false); + } + + @Test + @Timeout(120) + public void testRandomCdcEventsUnawareBucketNoShuffle() throws Exception { + innerTestRandomCdcEvents(-1, false, true, true); } private void innerTestRandomCdcEvents( - int numBucket, boolean globalIndex, boolean unawareBucketMode) throws Exception { + int numBucket, boolean globalIndex, boolean unawareBucketMode, boolean noShuffle) + throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); int numEvents = random.nextInt(1500) + 1; @@ -177,6 +184,7 @@ private void innerTestRandomCdcEvents( .withParallelism(3) .withIdentifier(Identifier.create(DATABASE_NAME, TABLE_NAME)) .withCatalogLoader(catalogLoader) + .withNoShuffle(noShuffle) .build(); // enable failure when running jobs if needed diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 89d3e2174a0f..61c741fee288 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -72,6 +72,18 @@ public class FlinkConnectorOptions { + "By default, if this option is not defined, the planner will derive the parallelism " + "for each statement individually by also considering the global configuration."); + public static final ConfigOption UNAWARE_BUCKET_NO_SHUFFLE = + ConfigOptions.key("unaware-bucket.no-shuffle") + .booleanType() + .defaultValue(false) + .withDescription( + "If true, the CDC sync pipeline will skip the network shuffle between " + + "source and writer operators. This is only supported for " + + "bucket-unaware (append) tables where each writer subtask " + + "independently appends data without bucket ownership constraints. " + + "This eliminates data transfer overhead when the source already " + + "provides suitable data distribution (e.g., Kafka partitions)."); + public static final ConfigOption INFER_SCAN_PARALLELISM = ConfigOptions.key("scan.infer-parallelism") .booleanType()