diff --git a/paimon-core/src/main/java/org/apache/paimon/predicate/BucketSelector.java b/paimon-core/src/main/java/org/apache/paimon/predicate/BucketSelector.java index 295910f038bc..a7d5ee43c066 100644 --- a/paimon-core/src/main/java/org/apache/paimon/predicate/BucketSelector.java +++ b/paimon-core/src/main/java/org/apache/paimon/predicate/BucketSelector.java @@ -23,7 +23,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet; -import org.apache.paimon.table.sink.BucketComputer; +import org.apache.paimon.table.sink.KeyAndBucketExtractor; import org.apache.paimon.types.RowType; import javax.annotation.concurrent.ThreadSafe; @@ -71,7 +71,7 @@ int[] hashCodes() { Set createBucketSet(int numBucket) { ImmutableSet.Builder builder = new ImmutableSet.Builder<>(); for (int hash : hashCodes) { - builder.add(BucketComputer.bucket(hash, numBucket)); + builder.add(KeyAndBucketExtractor.bucket(hash, numBucket)); } return builder.build(); } @@ -139,7 +139,7 @@ public static Optional create( private static int hash(List columns, InternalRowSerializer serializer) { BinaryRow binaryRow = serializer.toBinaryRow(GenericRow.of(columns.toArray())); - return BucketComputer.hashcode(binaryRow); + return KeyAndBucketExtractor.bucketKeyHashCode(binaryRow); } private static void assembleRows( diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java index 1ee97d971e30..c169cd6349a3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -131,8 +131,19 @@ public Map options() { return options; } + public List bucketKeys() { + List bucketKeys = originalBucketKeys(); + if (bucketKeys.isEmpty()) { + bucketKeys = trimmedPrimaryKeys(); + } + if (bucketKeys.isEmpty()) { + bucketKeys = fieldNames(); + } + return bucketKeys; + } + /** Original bucket keys, maybe empty. */ - public List originalBucketKeys() { + private List originalBucketKeys() { String key = options.get(BUCKET_KEY.key()); if (StringUtils.isNullOrWhitespaceOnly(key)) { return Collections.emptyList(); @@ -178,14 +189,7 @@ public RowType logicalPartitionType() { } public RowType logicalBucketKeyType() { - List bucketKeys = originalBucketKeys(); - if (bucketKeys.isEmpty()) { - bucketKeys = trimmedPrimaryKeys(); - } - if (bucketKeys.isEmpty()) { - bucketKeys = fieldNames(); - } - return projectedLogicalRowType(bucketKeys); + return projectedLogicalRowType(bucketKeys()); } public RowType logicalTrimmedPrimaryKeysType() { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 4f2e1537339e..8dcf5c757ca1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -31,7 +31,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.sink.SinkRecordConverter; +import org.apache.paimon.table.sink.InternalRowKeyAndBucketExtractor; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.AppendOnlySplitGenerator; import org.apache.paimon.table.source.DataSplit; @@ -125,7 +125,7 @@ public RecordReader createReader(Split split) throws IOException { public TableWriteImpl newWrite(String commitUser) { return new TableWriteImpl<>( store().newWrite(commitUser), - new SinkRecordConverter(tableSchema), + new InternalRowKeyAndBucketExtractor(tableSchema), record -> { Preconditions.checkState( record.row().getRowKind() == RowKind.INSERT, diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java index 993ef5ac0a62..cab4f57c555e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java @@ -36,7 +36,7 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.BinaryTableStats; -import org.apache.paimon.table.sink.SinkRecordConverter; +import org.apache.paimon.table.sink.InternalRowKeyAndBucketExtractor; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.KeyValueTableRead; @@ -144,7 +144,7 @@ public TableWriteImpl newWrite(String commitUser) { final KeyValue kv = new KeyValue(); return new TableWriteImpl<>( store().newWrite(commitUser), - new SinkRecordConverter(tableSchema), + new InternalRowKeyAndBucketExtractor(tableSchema), record -> { switch (record.row().getRowKind()) { case INSERT: diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java index e06cd4329784..934eaa9e24e3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java @@ -38,8 +38,8 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.sink.InternalRowKeyAndBucketExtractor; import org.apache.paimon.table.sink.SequenceGenerator; -import org.apache.paimon.table.sink.SinkRecordConverter; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.KeyValueTableRead; @@ -222,7 +222,7 @@ public TableWriteImpl newWrite(String commitUser) { final KeyValue kv = new KeyValue(); return new TableWriteImpl<>( store().newWrite(commitUser), - new SinkRecordConverter(tableSchema), + new InternalRowKeyAndBucketExtractor(tableSchema), record -> { long sequenceNumber = sequenceGenerator == null diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/BucketComputer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/BucketComputer.java deleted file mode 100644 index ea17ea0cb250..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/BucketComputer.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.table.sink; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.codegen.CodeGenUtils; -import org.apache.paimon.codegen.Projection; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.types.RowKind; -import org.apache.paimon.types.RowType; - -import java.util.stream.IntStream; - -/** A {@link BucketComputer} to compute bucket by bucket keys or primary keys or whole row. */ -public class BucketComputer { - - private final int numBucket; - - private final Projection rowProjection; - private final Projection bucketProjection; - private final Projection pkProjection; - - public BucketComputer(TableSchema tableSchema) { - this( - new CoreOptions(tableSchema.options()).bucket(), - tableSchema.logicalRowType(), - tableSchema.projection(tableSchema.originalBucketKeys()), - tableSchema.projection(tableSchema.trimmedPrimaryKeys())); - } - - private BucketComputer(int numBucket, RowType rowType, int[] bucketKeys, int[] primaryKeys) { - this.numBucket = numBucket; - this.rowProjection = - CodeGenUtils.newProjection( - rowType, IntStream.range(0, rowType.getFieldCount()).toArray()); - this.bucketProjection = CodeGenUtils.newProjection(rowType, bucketKeys); - this.pkProjection = CodeGenUtils.newProjection(rowType, primaryKeys); - } - - private int hashRow(InternalRow row) { - if (row instanceof BinaryRow) { - RowKind rowKind = row.getRowKind(); - row.setRowKind(RowKind.INSERT); - int hash = hashcode((BinaryRow) row); - row.setRowKind(rowKind); - return hash; - } else { - return hashcode(rowProjection.apply(row)); - } - } - - public int bucket(InternalRow row) { - int hashcode = hashBucketKey(row); - return bucket(hashcode, numBucket); - } - - public int bucket(InternalRow row, BinaryRow pk) { - int hashcode = hashBucketKey(row, pk); - return bucket(hashcode, numBucket); - } - - private int hashBucketKey(InternalRow row) { - BinaryRow bucketKey = bucketProjection.apply(row); - if (bucketKey.getFieldCount() == 0) { - bucketKey = pkProjection.apply(row); - } - if (bucketKey.getFieldCount() == 0) { - return hashRow(row); - } - return bucketKey.hashCode(); - } - - private int hashBucketKey(InternalRow row, BinaryRow pk) { - BinaryRow bucketKey = bucketProjection.apply(row); - if (bucketKey.getFieldCount() == 0) { - bucketKey = pk; - } - if (bucketKey.getFieldCount() == 0) { - return hashRow(row); - } - return bucketKey.hashCode(); - } - - public static int hashcode(BinaryRow rowData) { - assert rowData.getRowKind() == RowKind.INSERT; - return rowData.hashCode(); - } - - public static int bucket(int hashcode, int numBucket) { - return Math.abs(hashcode % numBucket); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/InternalRowKeyAndBucketExtractor.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/InternalRowKeyAndBucketExtractor.java new file mode 100644 index 000000000000..1c0942540265 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/InternalRowKeyAndBucketExtractor.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.sink; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.Projection; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.schema.TableSchema; + +/** {@link KeyAndBucketExtractor} for {@link InternalRow}. */ +public class InternalRowKeyAndBucketExtractor implements KeyAndBucketExtractor { + + private final int numBuckets; + private final boolean sameBucketKeyAndTrimmedPrimaryKey; + + private final Projection partitionProjection; + private final Projection bucketKeyProjection; + private final Projection trimmedPrimaryKeyProjection; + private final Projection logPrimaryKeyProjection; + + private InternalRow record; + + private BinaryRow partition; + private BinaryRow bucketKey; + private Integer bucket; + private BinaryRow trimmedPrimaryKey; + private BinaryRow logPrimaryKey; + + public InternalRowKeyAndBucketExtractor(TableSchema schema) { + numBuckets = new CoreOptions(schema.options()).bucket(); + sameBucketKeyAndTrimmedPrimaryKey = schema.bucketKeys().equals(schema.trimmedPrimaryKeys()); + + partitionProjection = + CodeGenUtils.newProjection( + schema.logicalRowType(), schema.projection(schema.partitionKeys())); + bucketKeyProjection = + CodeGenUtils.newProjection( + schema.logicalRowType(), schema.projection(schema.bucketKeys())); + trimmedPrimaryKeyProjection = + CodeGenUtils.newProjection( + schema.logicalRowType(), schema.projection(schema.trimmedPrimaryKeys())); + logPrimaryKeyProjection = + CodeGenUtils.newProjection( + schema.logicalRowType(), schema.projection(schema.primaryKeys())); + } + + @Override + public void setRecord(InternalRow record) { + this.record = record; + + this.partition = null; + this.bucketKey = null; + this.bucket = null; + this.trimmedPrimaryKey = null; + this.logPrimaryKey = null; + } + + @Override + public BinaryRow partition() { + if (partition == null) { + partition = partitionProjection.apply(record); + } + return partition; + } + + @Override + public int bucket() { + if (bucketKey == null) { + bucketKey = bucketKeyProjection.apply(record); + if (sameBucketKeyAndTrimmedPrimaryKey) { + trimmedPrimaryKey = bucketKey; + } + } + if (bucket == null) { + bucket = + KeyAndBucketExtractor.bucket( + KeyAndBucketExtractor.bucketKeyHashCode(bucketKey), numBuckets); + } + return bucket; + } + + @Override + public BinaryRow trimmedPrimaryKey() { + if (trimmedPrimaryKey == null) { + trimmedPrimaryKey = trimmedPrimaryKeyProjection.apply(record); + if (sameBucketKeyAndTrimmedPrimaryKey) { + bucketKey = trimmedPrimaryKey; + } + } + return trimmedPrimaryKey; + } + + @Override + public BinaryRow logPrimaryKey() { + if (logPrimaryKey == null) { + assert logPrimaryKeyProjection != null; + logPrimaryKey = logPrimaryKeyProjection.apply(record); + } + return logPrimaryKey; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/PartitionComputer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java similarity index 53% rename from paimon-core/src/main/java/org/apache/paimon/table/sink/PartitionComputer.java rename to paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java index 5858160d1b9b..66378b8739eb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/PartitionComputer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java @@ -18,26 +18,34 @@ package org.apache.paimon.table.sink; -import org.apache.paimon.codegen.CodeGenUtils; -import org.apache.paimon.codegen.Projection; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.types.RowType; +import org.apache.paimon.types.RowKind; -/** A {@link PartitionComputer} to compute partition by partition keys. */ -public class PartitionComputer { - private final Projection partitionProjection; +/** + * Utility interface to extract partition keys, bucket id, primary keys for file store ({@code + * trimmedPrimaryKey}) and primary keys for external log system ({@code logPrimaryKey}) from the + * given record. + * + * @param type of record + */ +public interface KeyAndBucketExtractor { - public PartitionComputer(TableSchema tableSchema) { - this(tableSchema.logicalRowType(), tableSchema.projection(tableSchema.partitionKeys())); - } + void setRecord(T record); + + BinaryRow partition(); + + int bucket(); + + BinaryRow trimmedPrimaryKey(); + + BinaryRow logPrimaryKey(); - public PartitionComputer(RowType rowType, int[] partitionKeys) { - this.partitionProjection = CodeGenUtils.newProjection(rowType, partitionKeys); + static int bucketKeyHashCode(BinaryRow bucketKey) { + assert bucketKey.getRowKind() == RowKind.INSERT; + return bucketKey.hashCode(); } - public BinaryRow partition(InternalRow row) { - return this.partitionProjection.apply(row); + static int bucket(int hashcode, int numBuckets) { + return Math.abs(hashcode % numBuckets); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/SinkRecordConverter.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/SinkRecordConverter.java deleted file mode 100644 index e553110632a7..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/SinkRecordConverter.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.table.sink; - -import org.apache.paimon.codegen.CodeGenUtils; -import org.apache.paimon.codegen.Projection; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.types.RowType; - -import javax.annotation.Nullable; - -import java.util.Arrays; - -/** Converter for converting {@link InternalRow} to {@link SinkRecord}. */ -public class SinkRecordConverter { - - private final BucketComputer bucketComputer; - - private final Projection partProjection; - - private final Projection pkProjection; - - @Nullable private final Projection logPkProjection; - - public SinkRecordConverter(TableSchema tableSchema) { - this( - tableSchema.logicalRowType(), - tableSchema.projection(tableSchema.partitionKeys()), - tableSchema.projection(tableSchema.trimmedPrimaryKeys()), - tableSchema.projection(tableSchema.primaryKeys()), - new BucketComputer(tableSchema)); - } - - private SinkRecordConverter( - RowType inputType, - int[] partitions, - int[] primaryKeys, - int[] logPrimaryKeys, - BucketComputer bucketComputer) { - this.bucketComputer = bucketComputer; - this.partProjection = CodeGenUtils.newProjection(inputType, partitions); - this.pkProjection = CodeGenUtils.newProjection(inputType, primaryKeys); - this.logPkProjection = - Arrays.equals(primaryKeys, logPrimaryKeys) - ? null - : CodeGenUtils.newProjection(inputType, logPrimaryKeys); - } - - public SinkRecord convert(InternalRow row) { - BinaryRow partition = partProjection.apply(row); - BinaryRow primaryKey = primaryKey(row); - int bucket = bucketComputer.bucket(row, primaryKey); - return new SinkRecord(partition, bucket, primaryKey, row); - } - - public SinkRecord convertToLogSinkRecord(SinkRecord record) { - if (logPkProjection == null) { - return record; - } - BinaryRow logPrimaryKey = logPrimaryKey(record.row()); - return new SinkRecord(record.partition(), record.bucket(), logPrimaryKey, record.row()); - } - - public BinaryRow partition(InternalRow row) { - return partProjection.apply(row).copy(); - } - - public int bucket(InternalRow row) { - return bucketComputer.bucket(row); - } - - private BinaryRow primaryKey(InternalRow row) { - return pkProjection.apply(row); - } - - private BinaryRow logPrimaryKey(InternalRow row) { - assert logPkProjection != null; - return logPkProjection.apply(row); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index 338f1308c1f3..79acf0ecd866 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -40,17 +40,17 @@ public class TableWriteImpl implements InnerTableWrite, Restorable> { private final AbstractFileStoreWrite write; - private final SinkRecordConverter recordConverter; + private final KeyAndBucketExtractor keyAndBucketExtractor; private final RecordExtractor recordExtractor; private boolean batchCommitted = false; public TableWriteImpl( FileStoreWrite write, - SinkRecordConverter recordConverter, + KeyAndBucketExtractor keyAndBucketExtractor, RecordExtractor recordExtractor) { this.write = (AbstractFileStoreWrite) write; - this.recordConverter = recordConverter; + this.keyAndBucketExtractor = keyAndBucketExtractor; this.recordExtractor = recordExtractor; } @@ -68,12 +68,14 @@ public TableWriteImpl withIOManager(IOManager ioManager) { @Override public BinaryRow getPartition(InternalRow row) { - return recordConverter.partition(row); + keyAndBucketExtractor.setRecord(row); + return keyAndBucketExtractor.partition(); } @Override public int getBucket(InternalRow row) { - return recordConverter.bucket(row); + keyAndBucketExtractor.setRecord(row); + return keyAndBucketExtractor.bucket(); } @Override @@ -82,13 +84,24 @@ public void write(InternalRow row) throws Exception { } public SinkRecord writeAndReturn(InternalRow row) throws Exception { - SinkRecord record = recordConverter.convert(row); + keyAndBucketExtractor.setRecord(row); + SinkRecord record = + new SinkRecord( + keyAndBucketExtractor.partition(), + keyAndBucketExtractor.bucket(), + keyAndBucketExtractor.trimmedPrimaryKey(), + row); write.write(record.partition(), record.bucket(), recordExtractor.extract(record)); return record; } public SinkRecord toLogRecord(SinkRecord record) { - return recordConverter.convertToLogSinkRecord(record); + keyAndBucketExtractor.setRecord(record.row()); + return new SinkRecord( + record.partition(), + record.bucket(), + keyAndBucketExtractor.logPrimaryKey(), + record.row()); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index 4a0b3e0c084f..454d2d2f624b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -51,8 +51,8 @@ import java.util.function.Consumer; import java.util.stream.Collectors; -import static org.apache.paimon.table.sink.BucketComputer.bucket; -import static org.apache.paimon.table.sink.BucketComputer.hashcode; +import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket; +import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link AppendOnlyFileStoreTable}. */ @@ -226,7 +226,7 @@ public void testSequentialRead() throws Exception { serializer .toBinaryRow(rowData(i, random.nextInt(), random.nextLong())) .copy(); - int bucket = bucket(hashcode(data), numOfBucket); + int bucket = bucket(bucketKeyHashCode(data), numOfBucket); dataPerBucket.compute( bucket, (k, v) -> { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/SinkRecordConverterTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/InternalRowKeyAndBucketExtractorTest.java similarity index 72% rename from paimon-core/src/test/java/org/apache/paimon/table/sink/SinkRecordConverterTest.java rename to paimon-core/src/test/java/org/apache/paimon/table/sink/InternalRowKeyAndBucketExtractorTest.java index 2b932c516158..a36ffb9bd6e5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/SinkRecordConverterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/InternalRowKeyAndBucketExtractorTest.java @@ -38,45 +38,43 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -/** Test for {@link SinkRecordConverter}. */ -public class SinkRecordConverterTest { +/** Test for {@link InternalRowKeyAndBucketExtractor}. */ +public class InternalRowKeyAndBucketExtractorTest { @Test public void testInvalidBucket() { - assertThatThrownBy(() -> converter("n", "b")) + assertThatThrownBy(() -> extractor("n", "b")) .hasMessageContaining("Field names [a, b, c] should contains all bucket keys [n]."); - assertThatThrownBy(() -> converter("a", "b")) + assertThatThrownBy(() -> extractor("a", "b")) .hasMessageContaining("Primary keys [b] should contains all bucket keys [a]."); - assertThatThrownBy(() -> converter("a", "a", "a,b")) + assertThatThrownBy(() -> extractor("a", "a", "a,b")) .hasMessageContaining("Bucket keys [a] should not in partition keys [a]."); } @Test public void testBucket() { GenericRow row = GenericRow.of(5, 6, 7); - assertThat(bucket(converter("a", "a,b"), row)).isEqualTo(96); - assertThat(bucket(converter("", "a"), row)).isEqualTo(96); - assertThat(bucket(converter("", "a,b"), row)).isEqualTo(27); - assertThat(bucket(converter("a,b", "a,b"), row)).isEqualTo(27); - assertThat(bucket(converter("", ""), row)).isEqualTo(40); - assertThat(bucket(converter("a,b,c", ""), row)).isEqualTo(40); - assertThat(bucket(converter("", "a,b,c"), row)).isEqualTo(40); + assertThat(bucket(extractor("a", "a,b"), row)).isEqualTo(96); + assertThat(bucket(extractor("", "a"), row)).isEqualTo(96); + assertThat(bucket(extractor("", "a,b"), row)).isEqualTo(27); + assertThat(bucket(extractor("a,b", "a,b"), row)).isEqualTo(27); + assertThat(bucket(extractor("", ""), row)).isEqualTo(40); + assertThat(bucket(extractor("a,b,c", ""), row)).isEqualTo(40); + assertThat(bucket(extractor("", "a,b,c"), row)).isEqualTo(40); } - private int bucket(SinkRecordConverter converter, InternalRow row) { - int bucket1 = converter.bucket(row); - int bucket2 = converter.convert(row).bucket(); - assertThat(bucket1).isEqualTo(bucket2); - return bucket1; + private int bucket(InternalRowKeyAndBucketExtractor extractor, InternalRow row) { + extractor.setRecord(row); + return extractor.bucket(); } - private SinkRecordConverter converter(String bk, String pk) { - return converter("", bk, pk); + private InternalRowKeyAndBucketExtractor extractor(String bk, String pk) { + return extractor("", bk, pk); } - private SinkRecordConverter converter(String partK, String bk, String pk) { + private InternalRowKeyAndBucketExtractor extractor(String partK, String bk, String pk) { RowType rowType = new RowType( Arrays.asList( @@ -98,6 +96,6 @@ private SinkRecordConverter converter(String partK, String bk, String pk) { "".equals(pk) ? Collections.emptyList() : Arrays.asList(pk.split(",")), options, ""); - return new SinkRecordConverter(schema); + return new InternalRowKeyAndBucketExtractor(schema); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractChannelComputer.java new file mode 100644 index 000000000000..bb5d364ad86f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AbstractChannelComputer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.table.sink.KeyAndBucketExtractor; + +import java.io.Serializable; +import java.util.Objects; + +/** + * A utility class to compute which downstream channel a given record should be sent to. + * + * @param type of record + */ +public abstract class AbstractChannelComputer { + + private final int numChannels; + private final KeyAndBucketExtractor extractor; + protected final boolean shuffleByPartitionEnable; + + public AbstractChannelComputer( + int numChannels, KeyAndBucketExtractor extractor, boolean shuffleByPartitionEnable) { + this.numChannels = numChannels; + this.extractor = extractor; + this.shuffleByPartitionEnable = shuffleByPartitionEnable; + } + + public abstract int channel(T record); + + protected int channelImpl(T record, Object... otherChannelKeys) { + extractor.setRecord(record); + int bucket = extractor.bucket(); + int otherChannelKeysHash = Objects.hash(otherChannelKeys); + + if (shuffleByPartitionEnable) { + BinaryRow partition = extractor.partition(); + return Math.abs(Objects.hash(bucket, partition, otherChannelKeysHash)) % numChannels; + } else { + return Math.abs(Objects.hash(bucket, otherChannelKeysHash)) % numChannels; + } + } + + /** + * Provider of {@link AbstractChannelComputer}. + * + * @param type of record + */ + public interface Provider extends Serializable { + + AbstractChannelComputer provide(int numChannels); + + String toString(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketStreamPartitioner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketStreamPartitioner.java deleted file mode 100644 index 5cf9fbfbd90b..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketStreamPartitioner.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.sink; - -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.flink.FlinkRowWrapper; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.sink.BucketComputer; -import org.apache.paimon.table.sink.PartitionComputer; - -import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper; -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.table.data.RowData; - -import java.util.Objects; -import java.util.function.Function; - -/** A {@link StreamPartitioner} to partition records by bucket. */ -public class BucketStreamPartitioner extends StreamPartitioner { - - private final TableSchema schema; - private final boolean shuffleByPartitionEnable; - - private transient Function partitioner; - - public BucketStreamPartitioner(TableSchema schema, boolean shuffleByPartitionEnable) { - this.schema = schema; - this.shuffleByPartitionEnable = shuffleByPartitionEnable; - } - - @Override - public void setup(int numberOfChannels) { - super.setup(numberOfChannels); - BucketComputer bucketComputer = new BucketComputer(schema); - if (shuffleByPartitionEnable) { - PartitionComputer partitionComputer = new PartitionComputer(schema); - partitioner = - row -> - Math.abs( - Objects.hash( - bucketComputer.bucket(row), - partitionComputer.partition(row))) - % numberOfChannels; - } else { - partitioner = row -> bucketComputer.bucket(row) % numberOfChannels; - } - } - - @Override - public int selectChannel(SerializationDelegate> record) { - return partitioner.apply(new FlinkRowWrapper(record.getInstance().getValue())); - } - - @Override - public StreamPartitioner copy() { - return this; - } - - @Override - public SubtaskStateMapper getDownstreamSubtaskStateMapper() { - return SubtaskStateMapper.FULL; - } - - @Override - public boolean isPointwise() { - return false; - } - - @Override - public String toString() { - return shuffleByPartitionEnable ? "bucket-partition-assigner" : "bucket-assigner"; - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketingStreamPartitioner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketingStreamPartitioner.java new file mode 100644 index 000000000000..c32b56b80fcf --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketingStreamPartitioner.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * A {@link StreamPartitioner} which sends records from the same bucket to the same downstream + * channel. + * + * @param type of record + */ +public class BucketingStreamPartitioner extends StreamPartitioner { + + private final AbstractChannelComputer.Provider channelComputerProvider; + + private transient AbstractChannelComputer channelComputer; + + public BucketingStreamPartitioner(AbstractChannelComputer.Provider channelComputerProvider) { + this.channelComputerProvider = channelComputerProvider; + } + + @Override + public void setup(int numberOfChannels) { + super.setup(numberOfChannels); + channelComputer = channelComputerProvider.provide(numberOfChannels); + } + + @Override + public int selectChannel(SerializationDelegate> record) { + return channelComputer.channel(record.getInstance().getValue()); + } + + @Override + public StreamPartitioner copy() { + return this; + } + + @Override + public SubtaskStateMapper getDownstreamSubtaskStateMapper() { + return SubtaskStateMapper.FULL; + } + + @Override + public boolean isPointwise() { + return false; + } + + @Override + public String toString() { + return channelComputerProvider.toString(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 4de1363cb000..acdc76f3321b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -21,6 +21,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.operation.Lock; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.streaming.api.datastream.DataStream; @@ -84,12 +85,14 @@ public FlinkSinkBuilder withSinkProvider( } public DataStreamSink build() { - BucketStreamPartitioner partitioner = - new BucketStreamPartitioner( - table.schema(), - table.coreOptions() - .toConfiguration() - .get(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION)); + TableSchema schema = table.schema(); + boolean shuffleByPartitionEnable = + table.coreOptions() + .toConfiguration() + .get(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION); + BucketingStreamPartitioner partitioner = + new BucketingStreamPartitioner<>( + new ChannelComputerProvider(schema, shuffleByPartitionEnable)); PartitionTransformation partitioned = new PartitionTransformation<>(input.getTransformation(), partitioner); if (parallelism != null) { @@ -103,4 +106,32 @@ public DataStreamSink build() { ? sink.sinkFrom(new DataStream<>(env, partitioned), commitUser, sinkProvider) : sink.sinkFrom(new DataStream<>(env, partitioned)); } + + private static class ChannelComputerProvider + implements AbstractChannelComputer.Provider { + + private static final long serialVersionUID = 1L; + + private final TableSchema schema; + private final boolean shuffleByPartitionEnable; + + private ChannelComputerProvider(TableSchema schema, boolean shuffleByPartitionEnable) { + this.schema = schema; + this.shuffleByPartitionEnable = shuffleByPartitionEnable; + } + + @Override + public AbstractChannelComputer provide(int numChannels) { + return new RowDataChannelComputer(numChannels, schema, shuffleByPartitionEnable); + } + + @Override + public String toString() { + if (shuffleByPartitionEnable) { + return "HASH[bucket, partition]"; + } else { + return "HASH[bucket]"; + } + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java new file mode 100644 index 000000000000..2ae41e577a1e --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.schema.TableSchema; + +import org.apache.flink.table.data.RowData; + +/** {@link AbstractChannelComputer} for {@link RowData}. */ +public class RowDataChannelComputer extends AbstractChannelComputer { + + public RowDataChannelComputer( + int numChannels, TableSchema schema, boolean shuffleByPartitionEnable) { + super(numChannels, new RowDataKeyAndBucketExtractor(schema), shuffleByPartitionEnable); + } + + @Override + public int channel(RowData record) { + return channelImpl(record); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataKeyAndBucketExtractor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataKeyAndBucketExtractor.java new file mode 100644 index 000000000000..8eec1c6e8ad1 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataKeyAndBucketExtractor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.FlinkRowWrapper; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.sink.InternalRowKeyAndBucketExtractor; +import org.apache.paimon.table.sink.KeyAndBucketExtractor; + +import org.apache.flink.table.data.RowData; + +/** {@link KeyAndBucketExtractor} for {@link RowData}. */ +public class RowDataKeyAndBucketExtractor implements KeyAndBucketExtractor { + + private final InternalRowKeyAndBucketExtractor wrapped; + + public RowDataKeyAndBucketExtractor(TableSchema schema) { + wrapped = new InternalRowKeyAndBucketExtractor(schema); + } + + @Override + public void setRecord(RowData record) { + wrapped.setRecord(new FlinkRowWrapper(record)); + } + + @Override + public BinaryRow partition() { + return wrapped.partition(); + } + + @Override + public int bucket() { + return wrapped.bucket(); + } + + @Override + public BinaryRow trimmedPrimaryKey() { + return wrapped.trimmedPrimaryKey(); + } + + @Override + public BinaryRow logPrimaryKey() { + return wrapped.logPrimaryKey(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java deleted file mode 100644 index 69b64740f9a8..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.sink.cdc; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.codegen.CodeGenUtils; -import org.apache.paimon.codegen.Projection; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.sink.BucketComputer; -import org.apache.paimon.types.DataType; -import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.Preconditions; -import org.apache.paimon.utils.TypeUtils; - -import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper; -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.IntStream; - -/** - * A {@link StreamPartitioner} which partitions {@link CdcRecord}s according to the hash value of - * bucket keys (or primary keys if bucket keys are not specified). - * - *

TODO: merge this class with {@link org.apache.paimon.flink.sink.BucketStreamPartitioner} and - * refactor {@link BucketComputer} if possible. - */ -public class CdcBucketStreamPartitioner extends StreamPartitioner { - - private final int numBuckets; - private final List bucketKeys; - private final DataType[] bucketTypes; - private final List partitionKeys; - private final DataType[] partitionTypes; - private final boolean shuffleByPartitionEnable; - - private transient int numberOfChannels; - private transient Projection bucketProjection; - private transient Projection partitionProjection; - - public CdcBucketStreamPartitioner(TableSchema schema, boolean shuffleByPartitionEnable) { - List bucketKeys = schema.originalBucketKeys(); - if (bucketKeys.isEmpty()) { - bucketKeys = schema.trimmedPrimaryKeys(); - } - Preconditions.checkArgument( - bucketKeys.size() > 0, "Either bucket keys or primary keys must be defined"); - - this.numBuckets = new CoreOptions(schema.options()).bucket(); - this.bucketKeys = bucketKeys; - this.bucketTypes = getTypes(this.bucketKeys, schema); - this.partitionKeys = schema.partitionKeys(); - this.partitionTypes = getTypes(this.partitionKeys, schema); - this.shuffleByPartitionEnable = shuffleByPartitionEnable; - } - - private DataType[] getTypes(List keys, TableSchema schema) { - List types = new ArrayList<>(); - for (String key : keys) { - int idx = schema.fieldNames().indexOf(key); - types.add(schema.fields().get(idx).type()); - } - return types.toArray(new DataType[0]); - } - - @Override - public void setup(int numberOfChannels) { - super.setup(numberOfChannels); - this.numberOfChannels = numberOfChannels; - this.bucketProjection = - CodeGenUtils.newProjection( - RowType.of(bucketTypes), IntStream.range(0, bucketTypes.length).toArray()); - this.partitionProjection = - CodeGenUtils.newProjection( - RowType.of(partitionTypes), - IntStream.range(0, partitionTypes.length).toArray()); - } - - @Override - public int selectChannel( - SerializationDelegate> streamRecordSerializationDelegate) { - CdcRecord record = streamRecordSerializationDelegate.getInstance().getValue(); - BinaryRow bucketKeyRow = - toBinaryRow(record.fields(), bucketKeys, bucketTypes, bucketProjection); - int bucket = BucketComputer.bucket(bucketKeyRow.hashCode(), numBuckets); - if (shuffleByPartitionEnable) { - BinaryRow partitionKeyRow = - toBinaryRow( - record.fields(), partitionKeys, partitionTypes, partitionProjection); - return Math.abs(Objects.hash(bucket, partitionKeyRow.hashCode())) % numberOfChannels; - } else { - return bucket % numberOfChannels; - } - } - - private BinaryRow toBinaryRow( - Map fields, - List keys, - DataType[] types, - Projection projection) { - GenericRow genericRow = new GenericRow(keys.size()); - for (int i = 0; i < keys.size(); i++) { - genericRow.setField(i, TypeUtils.castFromString(fields.get(keys.get(i)), types[i])); - } - return projection.apply(genericRow); - } - - @Override - public StreamPartitioner copy() { - return null; - } - - @Override - public SubtaskStateMapper getDownstreamSubtaskStateMapper() { - return SubtaskStateMapper.FULL; - } - - @Override - public boolean isPointwise() { - return false; - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java index e6692c072780..3bc5a563cb5d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java @@ -18,17 +18,29 @@ package org.apache.paimon.flink.sink.cdc; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; +import org.apache.paimon.utils.TypeUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; /** A data change message from the CDC source. */ public class CdcRecord implements Serializable { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(CdcRecord.class); + private final RowKind kind; private final Map fields; @@ -46,6 +58,79 @@ public Map fields() { return fields; } + /** + * Project {@code fields} to a {@link GenericRow}. The fields of row are specified by the given + * {@code dataFields}. + * + *

NOTE: This method will always return a {@link GenericRow} even if some keys of {@code + * fields} are not in {@code dataFields}. If you want to make sure all field names of {@code + * dataFields} existed in keys of {@code fields}, use {@link CdcRecord#toGenericRow} instead. + * + * @param dataFields {@link DataField}s of the converted {@link GenericRow}. + * @return the projected {@link GenericRow}. + */ + public GenericRow project(List dataFields) { + GenericRow genericRow = new GenericRow(dataFields.size()); + for (int i = 0; i < dataFields.size(); i++) { + DataField dataField = dataFields.get(i); + genericRow.setField( + i, TypeUtils.castFromString(fields.get(dataField.name()), dataField.type())); + } + return genericRow; + } + + /** + * Convert {@code fields} to a {@link GenericRow}. The fields of row are specified by the given + * {@code dataFields}. + * + *

NOTE: This method requires all field names of {@code dataFields} existed in keys of {@code + * fields}. If you only want to convert some {@code fields}, use {@link CdcRecord#project} + * instead. + * + * @param dataFields {@link DataField}s of the converted {@link GenericRow}. + * @return if all field names of {@code dataFields} existed in keys of {@code fields} and all + * values of {@code fields} can be correctly converted to the specified type, an {@code + * Optional#of(GenericRow)} will be returned, otherwise an {@code Optional#empty()} will be + * returned + */ + public Optional toGenericRow(List dataFields) { + GenericRow genericRow = new GenericRow(dataFields.size()); + List fieldNames = + dataFields.stream().map(DataField::name).collect(Collectors.toList()); + + for (Map.Entry field : fields.entrySet()) { + String key = field.getKey(); + String value = field.getValue(); + + int idx = fieldNames.indexOf(key); + if (idx < 0) { + LOG.info("Field " + key + " not found. Waiting for schema update."); + return Optional.empty(); + } + + if (value == null) { + continue; + } + + DataType type = dataFields.get(idx).type(); + // TODO TypeUtils.castFromString cannot deal with complex types like arrays and + // maps. Change type of CdcRecord#field if needed. + try { + genericRow.setField(idx, TypeUtils.castFromString(value, type)); + } catch (Exception e) { + LOG.info( + "Failed to convert value " + + value + + " to type " + + type + + ". Waiting for schema update.", + e); + return Optional.empty(); + } + } + return Optional.of(genericRow); + } + @Override public boolean equals(Object o) { if (!(o instanceof CdcRecord)) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java new file mode 100644 index 000000000000..8a815bb69a9d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.cdc; + +import org.apache.paimon.flink.sink.AbstractChannelComputer; +import org.apache.paimon.schema.TableSchema; + +/** {@link AbstractChannelComputer} for {@link CdcRecord}. */ +public class CdcRecordChannelComputer extends AbstractChannelComputer { + + public CdcRecordChannelComputer( + int numChannels, TableSchema schema, boolean shuffleByPartitionEnable) { + super(numChannels, new CdcRecordKeyAndBucketExtractor(schema), shuffleByPartitionEnable); + } + + @Override + public int channel(CdcRecord record) { + return channelImpl(record); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java new file mode 100644 index 000000000000..b1b89f8536b0 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.cdc; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.Projection; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.sink.KeyAndBucketExtractor; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; + +import java.util.List; +import java.util.stream.IntStream; + +/** {@link KeyAndBucketExtractor} for {@link CdcRecord}. */ +public class CdcRecordKeyAndBucketExtractor implements KeyAndBucketExtractor { + + private final int numBuckets; + + private final List partitionFields; + private final Projection partitionProjection; + private final List bucketKeyFields; + private final Projection bucketKeyProjection; + + private CdcRecord record; + + private BinaryRow partition; + private BinaryRow bucketKey; + private Integer bucket; + + public CdcRecordKeyAndBucketExtractor(TableSchema schema) { + numBuckets = new CoreOptions(schema.options()).bucket(); + + RowType partitionType = schema.logicalPartitionType(); + this.partitionFields = partitionType.getFields(); + this.partitionProjection = + CodeGenUtils.newProjection( + partitionType, IntStream.range(0, partitionType.getFieldCount()).toArray()); + + RowType bucketKeyType = schema.logicalBucketKeyType(); + this.bucketKeyFields = bucketKeyType.getFields(); + this.bucketKeyProjection = + CodeGenUtils.newProjection( + bucketKeyType, IntStream.range(0, bucketKeyType.getFieldCount()).toArray()); + } + + @Override + public void setRecord(CdcRecord record) { + this.record = record; + } + + @Override + public BinaryRow partition() { + if (partition == null) { + partition = partitionProjection.apply(record.project(partitionFields)); + } + return partition; + } + + @Override + public int bucket() { + if (bucketKey == null) { + bucketKey = bucketKeyProjection.apply(record.project(bucketKeyFields)); + } + if (bucket == null) { + bucket = + KeyAndBucketExtractor.bucket( + KeyAndBucketExtractor.bucketKeyHashCode(bucketKey), numBuckets); + } + return bucket; + } + + @Override + public BinaryRow trimmedPrimaryKey() { + throw new UnsupportedOperationException(); + } + + @Override + public BinaryRow logPrimaryKey() { + throw new UnsupportedOperationException(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java index bd97100700bb..4e770e57c1ca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java @@ -19,10 +19,13 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.sink.AbstractChannelComputer; +import org.apache.paimon.flink.sink.BucketingStreamPartitioner; import org.apache.paimon.flink.sink.LogSinkFunction; import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils; import org.apache.paimon.operation.Lock; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Preconditions; @@ -97,12 +100,14 @@ public DataStreamSink build() { new SchemaManager(table.fileIO(), table.location()))); schemaChangeProcessFunction.getTransformation().setParallelism(1); - CdcBucketStreamPartitioner partitioner = - new CdcBucketStreamPartitioner( - table.schema(), - table.coreOptions() - .toConfiguration() - .get(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION)); + TableSchema schema = table.schema(); + boolean shuffleByPartitionEnable = + table.coreOptions() + .toConfiguration() + .get(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION); + BucketingStreamPartitioner partitioner = + new BucketingStreamPartitioner<>( + new ChannelComputerProvider(schema, shuffleByPartitionEnable)); PartitionTransformation partitioned = new PartitionTransformation<>(parsed.getTransformation(), partitioner); if (parallelism != null) { @@ -113,4 +118,32 @@ public DataStreamSink build() { FlinkCdcSink sink = new FlinkCdcSink(table, lockFactory, logSinkFunction); return sink.sinkFrom(new DataStream<>(env, partitioned)); } + + private static class ChannelComputerProvider + implements AbstractChannelComputer.Provider { + + private static final long serialVersionUID = 1L; + + private final TableSchema schema; + private final boolean shuffleByPartitionEnable; + + private ChannelComputerProvider(TableSchema schema, boolean shuffleByPartitionEnable) { + this.schema = schema; + this.shuffleByPartitionEnable = shuffleByPartitionEnable; + } + + @Override + public AbstractChannelComputer provide(int numChannels) { + return new CdcRecordChannelComputer(numChannels, schema, shuffleByPartitionEnable); + } + + @Override + public String toString() { + if (shuffleByPartitionEnable) { + return "HASH[bucket, partition]"; + } else { + return "HASH[bucket]"; + } + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java index c637d2f73687..5b986d4e7579 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java @@ -27,8 +27,6 @@ import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.SinkRecord; -import org.apache.paimon.types.DataType; -import org.apache.paimon.utils.TypeUtils; import org.apache.flink.runtime.state.StateInitializationContext; @@ -36,8 +34,7 @@ import java.io.IOException; import java.time.Duration; -import java.util.HashMap; -import java.util.Map; +import java.util.Optional; /** * An {@link AbstractStoreWriteOperator} which is aware of schema changes. @@ -70,12 +67,12 @@ public void initializeState(StateInitializationContext context) throws Exception @Override protected SinkRecord processRecord(CdcRecord record) throws Exception { - Map convertedFields = tryConvert(record.fields()); - if (convertedFields == null) { + Optional optionalConverted = record.toGenericRow(table.schema().fields()); + if (!optionalConverted.isPresent()) { while (true) { table = table.copyWithLatestSchema(); - convertedFields = tryConvert(record.fields()); - if (convertedFields != null) { + optionalConverted = record.toGenericRow(table.schema().fields()); + if (optionalConverted.isPresent()) { break; } Thread.sleep(retrySleepMillis); @@ -83,58 +80,10 @@ protected SinkRecord processRecord(CdcRecord record) throws Exception { write.replace(commitUser -> table.newWrite(commitUser)); } - TableSchema schema = table.schema(); - GenericRow row = new GenericRow(schema.fields().size()); - row.setRowKind(record.kind()); - for (Map.Entry convertedField : convertedFields.entrySet()) { - String key = convertedField.getKey(); - Object value = convertedField.getValue(); - int idx = schema.fieldNames().indexOf(key); - row.setField(idx, value); - } - try { - return write.write(row); + return write.write(optionalConverted.get()); } catch (Exception e) { throw new IOException(e); } } - - private Map tryConvert(Map fields) { - Map converted = new HashMap<>(); - TableSchema schema = table.schema(); - - for (Map.Entry field : fields.entrySet()) { - String key = field.getKey(); - String value = field.getValue(); - - int idx = schema.fieldNames().indexOf(key); - if (idx < 0) { - LOG.info("Field " + key + " not found. Waiting for schema update."); - return null; - } - - if (value == null) { - converted.put(key, null); - } else { - DataType type = schema.fields().get(idx).type(); - // TODO TypeUtils.castFromString cannot deal with complex types like arrays and - // maps. Change type of CdcRecord#field if needed. - try { - converted.put(key, TypeUtils.castFromString(value, type)); - } catch (Exception e) { - LOG.info( - "Failed to convert value " - + value - + " to type " - + type - + ". Waiting for schema update.", - e); - return null; - } - } - } - - return converted; - } }