diff --git a/docs/content/docs/how-to/writing-tables.md b/docs/content/docs/how-to/writing-tables.md
index dcdfae3cf9e0..14c19c4213ef 100644
--- a/docs/content/docs/how-to/writing-tables.md
+++ b/docs/content/docs/how-to/writing-tables.md
@@ -96,6 +96,9 @@ Use `INSERT INTO` to apply records and changes to tables.
INSERT INTO MyTable SELECT ...
```
+Table Store supports shuffle data by bucket in sink phase. To improve data skew, Table Store also
+supports shuffling data by partition fields. You can add option `sink.partition-shuffle` to the table.
+
{{< /tab >}}
{{< tab "Spark3" >}}
diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index d5d40b365533..6431a767c212 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -26,5 +26,11 @@
Integer |
Defines a custom parallelism for the sink. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. |
+
+ sink.partition-shuffle |
+ false |
+ Boolean |
+ The option to enable shuffle data by dynamic partition fields in sink phase for table store. |
+
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
index 2bf58bac1cf9..ce2a2c975c2a 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
@@ -57,6 +57,13 @@ public class FlinkConnectorOptions {
public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;
+ public static final ConfigOption SINK_SHUFFLE_BY_PARTITION =
+ ConfigOptions.key("sink.partition-shuffle")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "The option to enable shuffle data by dynamic partition fields in sink phase for table store.");
+
public static final ConfigOption SCAN_PARALLELISM =
ConfigOptions.key("scan.parallelism")
.intType()
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
index 7b3d6d291f9f..d8bd0c20e357 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
@@ -24,32 +24,48 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.connector.FlinkRowWrapper;
+import org.apache.flink.table.store.data.InternalRow;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.table.sink.BucketComputer;
+import org.apache.flink.table.store.table.sink.PartitionComputer;
+
+import java.util.Objects;
+import java.util.function.Function;
/** A {@link StreamPartitioner} to partition records by bucket. */
public class BucketStreamPartitioner extends StreamPartitioner {
private final TableSchema schema;
+ private final boolean shuffleByPartitionEnable;
- private transient BucketComputer computer;
- private transient int numberOfChannels;
+ private transient Function partitioner;
- public BucketStreamPartitioner(TableSchema schema) {
+ public BucketStreamPartitioner(TableSchema schema, boolean shuffleByPartitionEnable) {
this.schema = schema;
+ this.shuffleByPartitionEnable = shuffleByPartitionEnable;
}
@Override
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
- this.computer = new BucketComputer(schema);
- this.numberOfChannels = numberOfChannels;
+ BucketComputer bucketComputer = new BucketComputer(schema);
+ if (shuffleByPartitionEnable) {
+ PartitionComputer partitionComputer = new PartitionComputer(schema);
+ partitioner =
+ row ->
+ Math.abs(
+ Objects.hash(
+ bucketComputer.bucket(row),
+ partitionComputer.partition(row)))
+ % numberOfChannels;
+ } else {
+ partitioner = row -> bucketComputer.bucket(row) % numberOfChannels;
+ }
}
@Override
public int selectChannel(SerializationDelegate> record) {
- return computer.bucket(new FlinkRowWrapper(record.getInstance().getValue()))
- % numberOfChannels;
+ return partitioner.apply(new FlinkRowWrapper(record.getInstance().getValue()));
}
@Override
@@ -69,6 +85,6 @@ public boolean isPointwise() {
@Override
public String toString() {
- return "bucket-assigner";
+ return shuffleByPartitionEnable ? "bucket-partition-assigner" : "bucket-assigner";
}
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
index ce24aa4cf1e2..366247caabab 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
@@ -83,7 +83,11 @@ public DataStreamSink> sinkFrom(DataStream input) {
// When the job restarts, commitUser will be recovered from states and this value is
// ignored.
String initialCommitUser = UUID.randomUUID().toString();
+ return sinkFrom(input, initialCommitUser, createWriteProvider(initialCommitUser));
+ }
+ public DataStreamSink> sinkFrom(
+ DataStream input, String commitUser, StoreSinkWrite.Provider sinkProvider) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
@@ -101,8 +105,7 @@ public DataStreamSink> sinkFrom(DataStream input) {
input.transform(
WRITER_NAME,
typeInfo,
- createWriteOperator(
- createWriteProvider(initialCommitUser), isStreaming))
+ createWriteOperator(sinkProvider, isStreaming))
.setParallelism(input.getParallelism());
SingleOutputStreamOperator> committed =
@@ -111,7 +114,7 @@ public DataStreamSink> sinkFrom(DataStream input) {
typeInfo,
new CommitterOperator(
streamingCheckpointEnabled,
- initialCommitUser,
+ commitUser,
createCommitterFactory(streamingCheckpointEnabled),
createCommittableStateManager()))
.setParallelism(1)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index d93e1577dd09..7d0e570c1e76 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -18,11 +18,13 @@
package org.apache.flink.table.store.connector.sink;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.LogSinkFunction;
@@ -41,6 +43,8 @@ public class FlinkSinkBuilder {
@Nullable private Map overwritePartition;
@Nullable private LogSinkFunction logSinkFunction;
@Nullable private Integer parallelism;
+ @Nullable private String commitUser;
+ @Nullable private StoreSinkWrite.Provider sinkProvider;
public FlinkSinkBuilder(FileStoreTable table) {
this.table = table;
@@ -71,8 +75,21 @@ public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) {
return this;
}
+ @VisibleForTesting
+ public FlinkSinkBuilder withSinkProvider(
+ String commitUser, StoreSinkWrite.Provider sinkProvider) {
+ this.commitUser = commitUser;
+ this.sinkProvider = sinkProvider;
+ return this;
+ }
+
public DataStreamSink> build() {
- BucketStreamPartitioner partitioner = new BucketStreamPartitioner(table.schema());
+ BucketStreamPartitioner partitioner =
+ new BucketStreamPartitioner(
+ table.schema(),
+ table.options()
+ .toConfiguration()
+ .getBoolean(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION));
PartitionTransformation partitioned =
new PartitionTransformation<>(input.getTransformation(), partitioner);
if (parallelism != null) {
@@ -82,6 +99,8 @@ public DataStreamSink> build() {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
FileStoreSink sink =
new FileStoreSink(table, lockFactory, overwritePartition, logSinkFunction);
- return sink.sinkFrom(new DataStream<>(env, partitioned));
+ return commitUser != null && sinkProvider != null
+ ? sink.sinkFrom(new DataStream<>(env, partitioned), commitUser, sinkProvider)
+ : sink.sinkFrom(new DataStream<>(env, partitioned));
}
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileStoreShuffleBucketTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileStoreShuffleBucketTest.java
new file mode 100644
index 000000000000..c7588063300b
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileStoreShuffleBucketTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.connector.CatalogITCaseBase;
+import org.apache.flink.table.store.connector.FlinkConnectorOptions;
+import org.apache.flink.table.store.data.BinaryRow;
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.fs.local.LocalFileIO;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.table.store.connector.LogicalTypeConversion.toLogicalType;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests of shuffle data by bucket and partition. */
+public class FileStoreShuffleBucketTest extends CatalogITCaseBase {
+ private static final int TOTAL_SOURCE_RECORD_COUNT = 1000;
+
+ @Before
+ public void after() throws Exception {
+ super.before();
+ CollectStoreSinkWrite.writeRowsMap.clear();
+ }
+
+ @Override
+ protected List ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE T (a INT, b INT, c INT, d INT, PRIMARY KEY (a, b) NOT ENFORCED) PARTITIONED BY (a)");
+ }
+
+ @Test
+ public void testShuffleByBucket() throws Exception {
+ FileStoreTable table =
+ FileStoreTableFactory.create(LocalFileIO.create(), getTableDirectory("T"));
+
+ insertDataToTable(table);
+
+ // Only one task will write records shuffled by bucket
+ assertEquals(CollectStoreSinkWrite.writeRowsMap.size(), 1);
+ }
+
+ @Test
+ public void testShuffleByBucketPartition() throws Exception {
+ FileStoreTable originalTable =
+ FileStoreTableFactory.create(LocalFileIO.create(), getTableDirectory("T"));
+ Map dynamicOptions = originalTable.options().toMap();
+ dynamicOptions.put(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key(), "true");
+ FileStoreTable table = originalTable.copy(dynamicOptions);
+
+ insertDataToTable(table);
+
+ // Two tasks will write records shuffled by bucket and partition
+ assertEquals(CollectStoreSinkWrite.writeRowsMap.size(), 2);
+ }
+
+ private void insertDataToTable(FileStoreTable table) throws Exception {
+ RowType rowType = toLogicalType(table.rowType());
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setParallelism(2);
+
+ List sourceDataList = generateData();
+ DataStreamSource sourceStream =
+ env.fromCollection(sourceDataList, InternalTypeInfo.of(rowType));
+ new FlinkSinkBuilder(table)
+ .withInput(sourceStream)
+ .withParallelism(env.getParallelism())
+ .withSinkProvider(
+ "testUser",
+ (StoreSinkWrite.Provider)
+ (table1, context, ioManager) ->
+ (StoreSinkWrite) new CollectStoreSinkWrite())
+ .build();
+ env.execute();
+
+ assertEquals(
+ CollectStoreSinkWrite.writeRowsMap.values().stream().mapToInt(List::size).sum(),
+ TOTAL_SOURCE_RECORD_COUNT);
+ }
+
+ private List generateData() {
+ List rowDataList = new ArrayList<>(TOTAL_SOURCE_RECORD_COUNT);
+ for (int i = 0; i < TOTAL_SOURCE_RECORD_COUNT; i++) {
+ rowDataList.add(GenericRowData.of(i, i + 1, i + 2, i + 3));
+ }
+ return rowDataList;
+ }
+
+ /** Collect all received data with writer. */
+ private static class CollectStoreSinkWrite implements StoreSinkWrite {
+ private static final Map> writeRowsMap =
+ new ConcurrentHashMap<>();
+
+ @Override
+ public SinkRecord write(InternalRow rowData) throws Exception {
+ List rows = writeRowsMap.computeIfAbsent(this, key -> new ArrayList<>());
+ rows.add(rowData);
+ return null;
+ }
+
+ @Override
+ public SinkRecord toLogRecord(SinkRecord record) {
+ return record;
+ }
+
+ @Override
+ public void compact(BinaryRow partition, int bucket, boolean fullCompaction)
+ throws Exception {}
+
+ @Override
+ public void notifyNewFiles(
+ long snapshotId, BinaryRow partition, int bucket, List files) {}
+
+ @Override
+ public List prepareCommit(boolean doCompaction, long checkpointId)
+ throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/PartitionComputer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/PartitionComputer.java
new file mode 100644
index 000000000000..dd611afefd70
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/PartitionComputer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.store.codegen.CodeGenUtils;
+import org.apache.flink.table.store.codegen.Projection;
+import org.apache.flink.table.store.data.BinaryRow;
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.types.RowType;
+
+/** A {@link PartitionComputer} to compute partition by partition keys. */
+public class PartitionComputer {
+ private final Projection partitionProjection;
+
+ public PartitionComputer(TableSchema tableSchema) {
+ this(tableSchema.logicalRowType(), tableSchema.projection(tableSchema.partitionKeys()));
+ }
+
+ public PartitionComputer(RowType rowType, int[] partitionKeys) {
+ this.partitionProjection = CodeGenUtils.newProjection(rowType, partitionKeys);
+ }
+
+ public BinaryRow partition(InternalRow row) {
+ return this.partitionProjection.apply(row);
+ }
+}