Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/content/docs/how-to/writing-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ Use `INSERT INTO` to apply records and changes to tables.
INSERT INTO MyTable SELECT ...
```

Table Store supports shuffle data by bucket in sink phase. To improve data skew, Table Store also
supports shuffling data by partition fields. You can add option `sink.partition-shuffle` to the table.

{{< /tab >}}

{{< tab "Spark3" >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,11 @@
<td>Integer</td>
<td>Defines a custom parallelism for the sink. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.</td>
</tr>
<tr>
<td><h5>sink.partition-shuffle</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>The option to enable shuffle data by dynamic partition fields in sink phase for table store.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ public class FlinkConnectorOptions {

public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;

public static final ConfigOption<Boolean> SINK_SHUFFLE_BY_PARTITION =
ConfigOptions.key("sink.partition-shuffle")
.booleanType()
.defaultValue(false)
.withDescription(
"The option to enable shuffle data by dynamic partition fields in sink phase for table store.");

public static final ConfigOption<Integer> SCAN_PARALLELISM =
ConfigOptions.key("scan.parallelism")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,48 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.connector.FlinkRowWrapper;
import org.apache.flink.table.store.data.InternalRow;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.table.sink.BucketComputer;
import org.apache.flink.table.store.table.sink.PartitionComputer;

import java.util.Objects;
import java.util.function.Function;

/** A {@link StreamPartitioner} to partition records by bucket. */
public class BucketStreamPartitioner extends StreamPartitioner<RowData> {

private final TableSchema schema;
private final boolean shuffleByPartitionEnable;

private transient BucketComputer computer;
private transient int numberOfChannels;
private transient Function<InternalRow, Integer> partitioner;

public BucketStreamPartitioner(TableSchema schema) {
public BucketStreamPartitioner(TableSchema schema, boolean shuffleByPartitionEnable) {
this.schema = schema;
this.shuffleByPartitionEnable = shuffleByPartitionEnable;
}

@Override
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
this.computer = new BucketComputer(schema);
this.numberOfChannels = numberOfChannels;
BucketComputer bucketComputer = new BucketComputer(schema);
if (shuffleByPartitionEnable) {
PartitionComputer partitionComputer = new PartitionComputer(schema);
partitioner =
row ->
Math.abs(
Objects.hash(
bucketComputer.bucket(row),
partitionComputer.partition(row)))
% numberOfChannels;
} else {
partitioner = row -> bucketComputer.bucket(row) % numberOfChannels;
}
}

@Override
public int selectChannel(SerializationDelegate<StreamRecord<RowData>> record) {
return computer.bucket(new FlinkRowWrapper(record.getInstance().getValue()))
% numberOfChannels;
return partitioner.apply(new FlinkRowWrapper(record.getInstance().getValue()));
}

@Override
Expand All @@ -69,6 +85,6 @@ public boolean isPointwise() {

@Override
public String toString() {
return "bucket-assigner";
return shuffleByPartitionEnable ? "bucket-partition-assigner" : "bucket-assigner";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
// When the job restarts, commitUser will be recovered from states and this value is
// ignored.
String initialCommitUser = UUID.randomUUID().toString();
return sinkFrom(input, initialCommitUser, createWriteProvider(initialCommitUser));
}

public DataStreamSink<?> sinkFrom(
DataStream<RowData> input, String commitUser, StoreSinkWrite.Provider sinkProvider) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
Expand All @@ -101,8 +105,7 @@ public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
input.transform(
WRITER_NAME,
typeInfo,
createWriteOperator(
createWriteProvider(initialCommitUser), isStreaming))
createWriteOperator(sinkProvider, isStreaming))
.setParallelism(input.getParallelism());

SingleOutputStreamOperator<?> committed =
Expand All @@ -111,7 +114,7 @@ public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
typeInfo,
new CommitterOperator(
streamingCheckpointEnabled,
initialCommitUser,
commitUser,
createCommitterFactory(streamingCheckpointEnabled),
createCommittableStateManager()))
.setParallelism(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.flink.table.store.connector.sink;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.LogSinkFunction;
Expand All @@ -41,6 +43,8 @@ public class FlinkSinkBuilder {
@Nullable private Map<String, String> overwritePartition;
@Nullable private LogSinkFunction logSinkFunction;
@Nullable private Integer parallelism;
@Nullable private String commitUser;
@Nullable private StoreSinkWrite.Provider sinkProvider;

public FlinkSinkBuilder(FileStoreTable table) {
this.table = table;
Expand Down Expand Up @@ -71,8 +75,21 @@ public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) {
return this;
}

@VisibleForTesting
public FlinkSinkBuilder withSinkProvider(
String commitUser, StoreSinkWrite.Provider sinkProvider) {
this.commitUser = commitUser;
this.sinkProvider = sinkProvider;
return this;
}

public DataStreamSink<?> build() {
BucketStreamPartitioner partitioner = new BucketStreamPartitioner(table.schema());
BucketStreamPartitioner partitioner =
new BucketStreamPartitioner(
table.schema(),
table.options()
.toConfiguration()
.getBoolean(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION));
PartitionTransformation<RowData> partitioned =
new PartitionTransformation<>(input.getTransformation(), partitioner);
if (parallelism != null) {
Expand All @@ -82,6 +99,8 @@ public DataStreamSink<?> build() {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
FileStoreSink sink =
new FileStoreSink(table, lockFactory, overwritePartition, logSinkFunction);
return sink.sinkFrom(new DataStream<>(env, partitioned));
return commitUser != null && sinkProvider != null
? sink.sinkFrom(new DataStream<>(env, partitioned), commitUser, sinkProvider)
: sink.sinkFrom(new DataStream<>(env, partitioned));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.store.connector.sink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.connector.CatalogITCaseBase;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.data.BinaryRow;
import org.apache.flink.table.store.data.InternalRow;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.fs.local.LocalFileIO;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.types.logical.RowType;

import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.flink.table.store.connector.LogicalTypeConversion.toLogicalType;
import static org.junit.jupiter.api.Assertions.assertEquals;

/** Tests of shuffle data by bucket and partition. */
public class FileStoreShuffleBucketTest extends CatalogITCaseBase {
private static final int TOTAL_SOURCE_RECORD_COUNT = 1000;

@Before
public void after() throws Exception {
super.before();
CollectStoreSinkWrite.writeRowsMap.clear();
}

@Override
protected List<String> ddl() {
return Collections.singletonList(
"CREATE TABLE T (a INT, b INT, c INT, d INT, PRIMARY KEY (a, b) NOT ENFORCED) PARTITIONED BY (a)");
}

@Test
public void testShuffleByBucket() throws Exception {
FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), getTableDirectory("T"));

insertDataToTable(table);

// Only one task will write records shuffled by bucket
assertEquals(CollectStoreSinkWrite.writeRowsMap.size(), 1);
}

@Test
public void testShuffleByBucketPartition() throws Exception {
FileStoreTable originalTable =
FileStoreTableFactory.create(LocalFileIO.create(), getTableDirectory("T"));
Map<String, String> dynamicOptions = originalTable.options().toMap();
dynamicOptions.put(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key(), "true");
FileStoreTable table = originalTable.copy(dynamicOptions);

insertDataToTable(table);

// Two tasks will write records shuffled by bucket and partition
assertEquals(CollectStoreSinkWrite.writeRowsMap.size(), 2);
}

private void insertDataToTable(FileStoreTable table) throws Exception {
RowType rowType = toLogicalType(table.rowType());
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(2);

List<RowData> sourceDataList = generateData();
DataStreamSource<RowData> sourceStream =
env.fromCollection(sourceDataList, InternalTypeInfo.of(rowType));
new FlinkSinkBuilder(table)
.withInput(sourceStream)
.withParallelism(env.getParallelism())
.withSinkProvider(
"testUser",
(StoreSinkWrite.Provider)
(table1, context, ioManager) ->
(StoreSinkWrite) new CollectStoreSinkWrite())
.build();
env.execute();

assertEquals(
CollectStoreSinkWrite.writeRowsMap.values().stream().mapToInt(List::size).sum(),
TOTAL_SOURCE_RECORD_COUNT);
}

private List<RowData> generateData() {
List<RowData> rowDataList = new ArrayList<>(TOTAL_SOURCE_RECORD_COUNT);
for (int i = 0; i < TOTAL_SOURCE_RECORD_COUNT; i++) {
rowDataList.add(GenericRowData.of(i, i + 1, i + 2, i + 3));
}
return rowDataList;
}

/** Collect all received data with writer. */
private static class CollectStoreSinkWrite implements StoreSinkWrite {
private static final Map<StoreSinkWrite, List<InternalRow>> writeRowsMap =
new ConcurrentHashMap<>();

@Override
public SinkRecord write(InternalRow rowData) throws Exception {
List<InternalRow> rows = writeRowsMap.computeIfAbsent(this, key -> new ArrayList<>());
rows.add(rowData);
return null;
}

@Override
public SinkRecord toLogRecord(SinkRecord record) {
return record;
}

@Override
public void compact(BinaryRow partition, int bucket, boolean fullCompaction)
throws Exception {}

@Override
public void notifyNewFiles(
long snapshotId, BinaryRow partition, int bucket, List<DataFileMeta> files) {}

@Override
public List<Committable> prepareCommit(boolean doCompaction, long checkpointId)
throws IOException {
return Collections.emptyList();
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {}

@Override
public void close() throws Exception {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.store.table.sink;

import org.apache.flink.table.store.codegen.CodeGenUtils;
import org.apache.flink.table.store.codegen.Projection;
import org.apache.flink.table.store.data.BinaryRow;
import org.apache.flink.table.store.data.InternalRow;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.types.RowType;

/** A {@link PartitionComputer} to compute partition by partition keys. */
public class PartitionComputer {
private final Projection partitionProjection;

public PartitionComputer(TableSchema tableSchema) {
this(tableSchema.logicalRowType(), tableSchema.projection(tableSchema.partitionKeys()));
}

public PartitionComputer(RowType rowType, int[] partitionKeys) {
this.partitionProjection = CodeGenUtils.newProjection(rowType, partitionKeys);
}

public BinaryRow partition(InternalRow row) {
return this.partitionProjection.apply(row);
}
}