Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,5 +362,11 @@
<td>Integer</td>
<td>Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.</td>
</tr>
<tr>
<td><h5>unaware-bucket.no-shuffle</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, the CDC sync pipeline will skip the network shuffle between source and writer operators. This is only supported for bucket-unaware (append) tables where each writer subtask independently appends data without bucket ownership constraints. This eliminates data transfer overhead when the source already provides suitable data distribution (e.g., Kafka partitions).</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
protected void buildSink(
DataStream<RichCdcMultiplexRecord> input,
EventParser.Factory<RichCdcMultiplexRecord> parserFactory) {
createCdcSinkBuilder(input, parserFactory).build();
}

protected CdcSinkBuilder<RichCdcMultiplexRecord> createCdcSinkBuilder(
DataStream<RichCdcMultiplexRecord> input,
EventParser.Factory<RichCdcMultiplexRecord> parserFactory) {
CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
new CdcSinkBuilder<RichCdcMultiplexRecord>()
.withInput(input)
Expand All @@ -175,7 +181,7 @@ protected void buildSink(
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
}
sinkBuilder.build();
return sinkBuilder;
}

private void checkConstraints() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@

package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;

import org.apache.flink.streaming.api.datastream.DataStream;

import java.util.Map;

Expand All @@ -33,4 +38,15 @@ public KafkaSyncTableAction(
Map<String, String> kafkaConfig) {
super(database, table, catalogConfig, kafkaConfig, SyncJobHandler.SourceType.KAFKA);
}

@Override
protected void buildSink(
DataStream<RichCdcMultiplexRecord> input,
EventParser.Factory<RichCdcMultiplexRecord> parserFactory) {
boolean noShuffle =
Boolean.parseBoolean(
tableConfig.getOrDefault(
FlinkConnectorOptions.UNAWARE_BUCKET_NO_SHUFFLE.key(), "false"));
createCdcSinkBuilder(input, parserFactory).withNoShuffle(noShuffle).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

import javax.annotation.Nullable;
Expand All @@ -37,10 +38,16 @@
public class CdcAppendTableSink extends FlinkWriteSink<CdcRecord> {

private final Integer parallelism;
private final boolean noShuffle;

public CdcAppendTableSink(FileStoreTable table, Integer parallelism) {
this(table, parallelism, false);
}

public CdcAppendTableSink(FileStoreTable table, Integer parallelism, boolean noShuffle) {
super(table, null);
this.parallelism = parallelism;
this.noShuffle = noShuffle;
}

@Override
Expand All @@ -52,7 +59,13 @@ protected OneInputStreamOperatorFactory<CdcRecord, Committable> createWriteOpera
@Override
public DataStream<Committable> doWrite(
DataStream<CdcRecord> input, String initialCommitUser, @Nullable Integer parallelism) {
return super.doWrite(input, initialCommitUser, this.parallelism);
DataStream<Committable> written = super.doWrite(input, initialCommitUser, this.parallelism);
if (noShuffle) {
// Break operator chaining between parse and write to avoid deadlock
// during schema evolution retries, without introducing a network shuffle.
((SingleOutputStreamOperator<Committable>) written).startNewChain();
}
return written;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class CdcSinkBuilder<T> {
private TypeMapping typeMapping = null;

@Nullable private Integer parallelism;
private boolean noShuffle = false;

public CdcSinkBuilder<T> withInput(DataStream<T> input) {
this.input = input;
Expand Down Expand Up @@ -90,6 +91,11 @@ public CdcSinkBuilder<T> withTypeMapping(TypeMapping typeMapping) {
return this;
}

public CdcSinkBuilder<T> withNoShuffle(boolean noShuffle) {
this.noShuffle = noShuffle;
return this;
}

public DataStreamSink<?> build() {
Preconditions.checkNotNull(input, "Input DataStream can not be null.");
Preconditions.checkNotNull(parserFactory, "Event ParserFactory can not be null.");
Expand Down Expand Up @@ -160,6 +166,9 @@ private DataStreamSink<?> buildForPostponeBucket(DataStream<CdcRecord> parsed) {

private DataStreamSink<?> buildForUnawareBucket(DataStream<CdcRecord> parsed) {
FileStoreTable dataTable = (FileStoreTable) table;
if (noShuffle) {
return new CdcAppendTableSink(dataTable, parallelism, true).sinkFrom(parsed);
}
// rebalance it to make sure schema change work to avoid infinite loop
return new CdcAppendTableSink(dataTable, parallelism).sinkFrom(parsed.rebalance());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,36 +71,43 @@ public class FlinkCdcSyncTableSinkITCase extends AbstractTestBase {
@Test
@Timeout(120)
public void testRandomCdcEvents() throws Exception {
innerTestRandomCdcEvents(ThreadLocalRandom.current().nextInt(5) + 1, false, false);
innerTestRandomCdcEvents(ThreadLocalRandom.current().nextInt(5) + 1, false, false, false);
}

@Test
@Timeout(120)
public void testRandomCdcEventsDynamicBucket() throws Exception {
innerTestRandomCdcEvents(-1, false, false);
innerTestRandomCdcEvents(-1, false, false, false);
}

@Test
@Timeout(120)
public void testRandomCdcEventsPostponeBucket() throws Exception {
innerTestRandomCdcEvents(BucketMode.POSTPONE_BUCKET, false, false);
innerTestRandomCdcEvents(BucketMode.POSTPONE_BUCKET, false, false, false);
}

@Disabled
@Test
@Timeout(120)
public void testRandomCdcEventsGlobalDynamicBucket() throws Exception {
innerTestRandomCdcEvents(-1, true, false);
innerTestRandomCdcEvents(-1, true, false, false);
}

@Test
@Timeout(120)
public void testRandomCdcEventsUnawareBucket() throws Exception {
innerTestRandomCdcEvents(-1, false, true);
innerTestRandomCdcEvents(-1, false, true, false);
}

@Test
@Timeout(120)
public void testRandomCdcEventsUnawareBucketNoShuffle() throws Exception {
innerTestRandomCdcEvents(-1, false, true, true);
}

private void innerTestRandomCdcEvents(
int numBucket, boolean globalIndex, boolean unawareBucketMode) throws Exception {
int numBucket, boolean globalIndex, boolean unawareBucketMode, boolean noShuffle)
throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();

int numEvents = random.nextInt(1500) + 1;
Expand Down Expand Up @@ -177,6 +184,7 @@ private void innerTestRandomCdcEvents(
.withParallelism(3)
.withIdentifier(Identifier.create(DATABASE_NAME, TABLE_NAME))
.withCatalogLoader(catalogLoader)
.withNoShuffle(noShuffle)
.build();

// enable failure when running jobs if needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ public class FlinkConnectorOptions {
+ "By default, if this option is not defined, the planner will derive the parallelism "
+ "for each statement individually by also considering the global configuration.");

public static final ConfigOption<Boolean> UNAWARE_BUCKET_NO_SHUFFLE =
ConfigOptions.key("unaware-bucket.no-shuffle")
.booleanType()
.defaultValue(false)
.withDescription(
"If true, the CDC sync pipeline will skip the network shuffle between "
+ "source and writer operators. This is only supported for "
+ "bucket-unaware (append) tables where each writer subtask "
+ "independently appends data without bucket ownership constraints. "
+ "This eliminates data transfer overhead when the source already "
+ "provides suitable data distribution (e.g., Kafka partitions).");

public static final ConfigOption<Boolean> INFER_SCAN_PARALLELISM =
ConfigOptions.key("scan.infer-parallelism")
.booleanType()
Expand Down
Loading