(
+ () -> new UpsertRecordSerializer(rowType, partitionArity)) {};
+ }
+
+ @Override
+ public int hashCode() {
+ return rowType.hashCode();
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof UpsertRecordTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof UpsertRecordTypeInfo)) {
+ return false;
+ }
+ UpsertRecordTypeInfo other = (UpsertRecordTypeInfo) obj;
+ return rowType.equals(other.rowType) && partitionArity == other.partitionArity;
+ }
+
+ @Override
+ public String toString() {
+ return "UpsertRecordTypeInfo";
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/UpsertWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/UpsertWriteOperator.java
new file mode 100644
index 000000000000..54291e9f747b
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/UpsertWriteOperator.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.dataevolution;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.PrepareCommitOperator;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.RecordWriter;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static org.apache.paimon.types.VectorType.isVectorStoreFile;
+
+/**
+ * Phase 2 operator for data evolution streaming upsert. Receives tagged {@link UpsertRecord}s from
+ * {@link UpsertClassifyOperator} after the firstRowId-based shuffle. Performs partial writes for
+ * UPDATE records and normal appends for INSERT records.
+ *
+ * The shuffle guarantees that all updates targeting the same file (same firstRowId) arrive at
+ * the same subtask, eliminating concurrent-write conflicts.
+ */
+public class UpsertWriteOperator extends PrepareCommitOperator {
+
+ private final FileStoreTable table;
+
+ private transient List buffered;
+ private transient AbstractFileStoreWrite tableWrite;
+ private transient InnerTableRead tableRead;
+ private transient ProjectedRow projectedRow;
+ private transient SnapshotManager snapshotManager;
+ private transient Map> firstIdToFiles;
+ private transient RowType fullWriteType;
+ private transient RowType readType;
+ private transient InternalRow.FieldGetter[] fieldGetters;
+
+ public UpsertWriteOperator(
+ StreamOperatorParameters parameters, FileStoreTable table) {
+ super(parameters, Options.fromMap(table.options()));
+ this.table =
+ table.copy(Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G"));
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ this.fullWriteType = table.rowType();
+ this.buffered = new ArrayList<>();
+
+ this.snapshotManager = table.store().snapshotManager();
+
+ this.readType = SpecialFields.rowTypeWithRowId(fullWriteType);
+ this.tableRead = table.newRead().withReadType(readType);
+ this.projectedRow = ProjectedRow.from(fullWriteType, readType);
+
+ int colCount = fullWriteType.getFieldCount();
+ this.fieldGetters = new InternalRow.FieldGetter[colCount];
+ for (int i = 0; i < colCount; i++) {
+ fieldGetters[i] = InternalRow.createFieldGetter(fullWriteType.getTypeAt(i), i);
+ }
+
+ @SuppressWarnings({"unchecked", "resource"})
+ TableWriteImpl writeImpl =
+ (TableWriteImpl)
+ table.newBatchWriteBuilder().newWrite().withWriteType(fullWriteType);
+ this.tableWrite = (AbstractFileStoreWrite) writeImpl.getWrite();
+
+ this.firstIdToFiles = new HashMap<>();
+ refreshFileMetadata();
+ }
+
+ @Override
+ public void processElement(StreamRecord element) throws Exception {
+ buffered.add(element.getValue());
+ }
+
+ @Override
+ protected List prepareCommit(boolean waitCompaction, long checkpointId)
+ throws IOException {
+ try {
+ refreshFileMetadata();
+
+ Map>> updatesByPartition =
+ new HashMap<>();
+ Map> insertsByPartition = new HashMap<>();
+
+ for (UpsertRecord record : buffered) {
+ if (record.isInsert()) {
+ insertsByPartition
+ .computeIfAbsent(record.partition(), k -> new ArrayList<>())
+ .add(record.row());
+ } else {
+ updatesByPartition
+ .computeIfAbsent(record.partition(), k -> new TreeMap<>())
+ .computeIfAbsent(record.firstRowId(), k -> new TreeMap<>())
+ .put(record.offset(), record.row());
+ }
+ }
+ buffered.clear();
+
+ List committables = new ArrayList<>();
+
+ for (Map.Entry>> partEntry :
+ updatesByPartition.entrySet()) {
+ BinaryRow partition = partEntry.getKey();
+ for (Map.Entry> fileEntry :
+ partEntry.getValue().entrySet()) {
+ CommitMessage msg =
+ writePartialUpdate(partition, fileEntry.getKey(), fileEntry.getValue());
+ committables.add(new Committable(checkpointId, msg));
+ }
+ }
+
+ for (Map.Entry> partEntry :
+ insertsByPartition.entrySet()) {
+ BinaryRow partition = partEntry.getKey();
+ List rows = partEntry.getValue();
+
+ RecordWriter writer = tableWrite.createWriter(partition, 0);
+ try {
+ for (InternalRow row : rows) {
+ writer.write(row);
+ }
+ CommitIncrement increment = writer.prepareCommit(false);
+ List newFiles = increment.newFilesIncrement().newFiles();
+ CommitMessage msg =
+ new CommitMessageImpl(
+ partition,
+ 0,
+ null,
+ new DataIncrement(
+ newFiles,
+ Collections.emptyList(),
+ Collections.emptyList()),
+ CompactIncrement.emptyIncrement());
+ committables.add(new Committable(checkpointId, msg));
+ } finally {
+ writer.close();
+ }
+ }
+
+ return committables;
+ } catch (Exception e) {
+ throw new IOException("Error in prepareCommit", e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (tableWrite != null) {
+ tableWrite.close();
+ }
+ }
+
+ private void refreshFileMetadata() {
+ Long latestId = snapshotManager.latestSnapshotId();
+ if (latestId == null) {
+ return;
+ }
+
+ List allEntries =
+ table.store()
+ .newScan()
+ .withManifestEntryFilter(
+ entry ->
+ entry.file().firstRowId() != null
+ && !isBlobFile(entry.file().fileName())
+ && !isVectorStoreFile(entry.file().fileName()))
+ .withSnapshot(latestId)
+ .plan()
+ .files();
+
+ firstIdToFiles.clear();
+ for (ManifestEntry entry : allEntries) {
+ long firstRowId = entry.file().nonNullFirstRowId();
+ firstIdToFiles.computeIfAbsent(firstRowId, k -> new ArrayList<>()).add(entry.file());
+ }
+ }
+
+ private CommitMessage writePartialUpdate(
+ BinaryRow partition, long firstRowId, TreeMap updates)
+ throws Exception {
+ List oldFiles =
+ firstIdToFiles.getOrDefault(firstRowId, Collections.emptyList());
+ Preconditions.checkState(
+ !oldFiles.isEmpty(),
+ String.format("Cannot find files for firstRowId: %s", firstRowId));
+
+ long rowCount = oldFiles.get(0).rowCount();
+
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withPartition(partition)
+ .withBucket(0)
+ .withDataFiles(oldFiles)
+ .withBucketPath(
+ table.store().pathFactory().bucketPath(partition, 0).toString())
+ .rawConvertible(false)
+ .build();
+
+ int[] nonNullCols = computeAnyNonNullColumns(updates);
+ boolean isPartialColumn = nonNullCols.length < fullWriteType.getFieldCount();
+
+ ProjectedRow writeProjection;
+ if (isPartialColumn) {
+ RowType partialWriteType = fullWriteType.project(nonNullCols);
+ tableWrite.withWriteType(partialWriteType);
+ writeProjection = ProjectedRow.from(partialWriteType, readType);
+ } else {
+ writeProjection = projectedRow;
+ }
+
+ RecordWriter writer = tableWrite.createWriter(partition, 0);
+ try {
+ //noinspection resource
+ try (CloseableIterator reader =
+ tableRead.createReader(dataSplit).toCloseableIterator()) {
+ long offset = 0;
+ while (reader.hasNext()) {
+ InternalRow originalRow = reader.next();
+ InternalRow updateRow = updates.get(offset);
+ if (updateRow != null) {
+ writeProjection.replaceRow(mergeUpdateWithOriginal(updateRow, originalRow));
+ } else {
+ writeProjection.replaceRow(originalRow);
+ }
+ writer.write(writeProjection);
+ offset++;
+ }
+
+ Preconditions.checkState(
+ offset == rowCount,
+ String.format(
+ "Written num %s not equal to original row num %s",
+ offset, rowCount));
+ }
+
+ CommitIncrement written = writer.prepareCommit(false);
+ List newFiles = written.newFilesIncrement().newFiles();
+ Preconditions.checkState(
+ newFiles.size() == 1, "Partial update should produce exactly one file");
+ DataFileMeta newFile = newFiles.get(0).assignFirstRowId(firstRowId);
+
+ List deletedFiles = isPartialColumn ? Collections.emptyList() : oldFiles;
+ return new CommitMessageImpl(
+ partition,
+ 0,
+ null,
+ new DataIncrement(
+ Collections.singletonList(newFile),
+ deletedFiles,
+ Collections.emptyList()),
+ CompactIncrement.emptyIncrement());
+ } finally {
+ writer.close();
+ if (isPartialColumn) {
+ tableWrite.withWriteType(fullWriteType);
+ }
+ }
+ }
+
+ private int[] computeAnyNonNullColumns(TreeMap updates) {
+ int colCount = fullWriteType.getFieldCount();
+ boolean[] anyNonNull = new boolean[colCount];
+ for (InternalRow row : updates.values()) {
+ for (int i = 0; i < colCount; i++) {
+ if (!row.isNullAt(i)) {
+ anyNonNull[i] = true;
+ }
+ }
+ }
+ int count = 0;
+ for (boolean b : anyNonNull) {
+ if (b) {
+ count++;
+ }
+ }
+ int[] result = new int[count];
+ int idx = 0;
+ for (int i = 0; i < colCount; i++) {
+ if (anyNonNull[i]) {
+ result[idx++] = i;
+ }
+ }
+ return result;
+ }
+
+ private InternalRow mergeUpdateWithOriginal(InternalRow updateRow, InternalRow originalRow) {
+ int colCount = fullWriteType.getFieldCount();
+ boolean needsMerge = false;
+ for (int i = 0; i < colCount; i++) {
+ if (updateRow.isNullAt(i)) {
+ needsMerge = true;
+ break;
+ }
+ }
+ if (!needsMerge) {
+ return updateRow;
+ }
+ GenericRow merged = new GenericRow(colCount);
+ for (int i = 0; i < colCount; i++) {
+ if (!updateRow.isNullAt(i)) {
+ merged.setField(i, fieldGetters[i].getFieldOrNull(updateRow));
+ } else {
+ merged.setField(i, fieldGetters[i].getFieldOrNull(originalRow));
+ }
+ }
+ return merged;
+ }
+
+ /** Factory for creating {@link UpsertWriteOperator}. */
+ public static class Factory extends PrepareCommitOperator.Factory {
+
+ private final FileStoreTable table;
+
+ public Factory(FileStoreTable table) {
+ super(Options.fromMap(table.options()));
+ this.table = table;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public > T createStreamOperator(
+ StreamOperatorParameters parameters) {
+ return (T) new UpsertWriteOperator(parameters, table);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Class extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+ return UpsertWriteOperator.class;
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionUpsertSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionUpsertSink.java
new file mode 100644
index 000000000000..4708150a1356
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionUpsertSink.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.dataevolution.UpsertClassifyOperator;
+import org.apache.paimon.flink.dataevolution.UpsertRecord;
+import org.apache.paimon.flink.dataevolution.UpsertRecordChannelComputer;
+import org.apache.paimon.flink.dataevolution.UpsertRecordTypeInfo;
+import org.apache.paimon.flink.dataevolution.UpsertWriteOperator;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.flink.utils.ParallelismUtils.forwardParallelism;
+
+/**
+ * A {@link FlinkWriteSink} for data evolution streaming upsert. Uses a two-phase pipeline:
+ *
+ *
+ * - Phase 1 ({@link UpsertClassifyOperator}): classifies records as INSERT or UPDATE using a
+ * business-key index
+ *
- Network shuffle by firstRowId to ensure single-writer-per-file
+ *
- Phase 2 ({@link UpsertWriteOperator}): performs partial writes for updates and appends for
+ * inserts
+ *
+ */
+public class DataEvolutionUpsertSink extends FlinkWriteSink {
+
+ private static final long serialVersionUID = 1L;
+
+ private final List upsertKeyColumns;
+
+ public DataEvolutionUpsertSink(
+ FileStoreTable table,
+ @Nullable Map overwritePartition,
+ List upsertKeyColumns) {
+ super(table, overwritePartition);
+ this.upsertKeyColumns = upsertKeyColumns;
+ }
+
+ @Override
+ public DataStreamSink> sinkFrom(DataStream input, String initialCommitUser) {
+ // Phase 1: classify each record as INSERT or UPDATE
+ SingleOutputStreamOperator classified =
+ input.transform(
+ "Upsert Classify : " + table.name(),
+ new UpsertRecordTypeInfo(table.rowType(), table.partitionKeys().size()),
+ new UpsertClassifyOperator.Factory(table, upsertKeyColumns));
+ forwardParallelism(classified, input);
+
+ // Shuffle by firstRowId to guarantee single-writer-per-file
+ DataStream shuffled =
+ FlinkStreamPartitioner.partition(
+ classified, new UpsertRecordChannelComputer(), null);
+
+ // Phase 2: write (partial updates + inserts)
+ SingleOutputStreamOperator written =
+ shuffled.transform(
+ "Upsert Write : " + table.name(),
+ new CommittableTypeInfo(),
+ new UpsertWriteOperator.Factory(table));
+ forwardParallelism(written, shuffled);
+
+ return doCommit(written, initialCommitUser);
+ }
+
+ @Override
+ protected OneInputStreamOperatorFactory createWriteOperatorFactory(
+ StoreSinkWrite.Provider writeProvider, String commitUser) {
+ throw new UnsupportedOperationException(
+ "DataEvolutionUpsertSink overrides sinkFrom directly");
+ }
+
+ @Override
+ protected CommittableStateManager createCommittableStateManager() {
+ return createRestoreOnlyCommittableStateManager(table);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index ae8013b7e709..f97607404eee 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -339,6 +339,26 @@ private DataStreamSink> buildUnawareBucketSink(DataStream input)
table.primaryKeys().isEmpty(),
"Unaware bucket mode only works with append-only table for now.");
+ List upsertKeys = table.coreOptions().dataEvolutionUpsertKeyColumns();
+ if (!upsertKeys.isEmpty()) {
+ checkArgument(
+ table.coreOptions().dataEvolutionEnabled(),
+ "data-evolution.upsert-keys requires data-evolution.enabled = true.");
+ checkArgument(
+ table.coreOptions().rowTrackingEnabled(),
+ "data-evolution.upsert-keys requires row-tracking.enabled = true.");
+ DataStream keyed =
+ partition(
+ input,
+ new UpsertKeyChannelComputer(
+ table.schema(),
+ upsertKeys,
+ table.coreOptions().dataEvolutionUpsertIndexParallelism()),
+ parallelism);
+ return new DataEvolutionUpsertSink(table, overwritePartition, upsertKeys)
+ .sinkFrom(keyed);
+ }
+
if (!table.partitionKeys().isEmpty()) {
PartitionSinkStrategy strategy = table.coreOptions().partitionSinkStrategy();
if (strategy == PartitionSinkStrategy.HASH) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UpsertKeyChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UpsertKeyChannelComputer.java
new file mode 100644
index 000000000000..0bde77c14ba0
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UpsertKeyChannelComputer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalRow.FieldGetter;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+
+/**
+ * A {@link ChannelComputer} that partitions records by a compound hash of partition fields and
+ * upsert key columns. Uses {@code indexParallelism} to control how many subtasks each partition can
+ * be distributed to:
+ *
+ *
+ * slot = abs(partitionHash) * indexParallelism + abs(keyHash) % indexParallelism
+ * channel = slot % numChannels
+ *
+ *
+ * With {@code indexParallelism=1}, each partition maps to exactly one subtask, so the upsert key
+ * index is loaded only once per partition. Higher values spread the load across more subtasks at
+ * the cost of redundant index loading.
+ */
+public class UpsertKeyChannelComputer implements ChannelComputer {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TableSchema schema;
+ private final List upsertKeyColumns;
+ private final int indexParallelism;
+
+ private transient int numChannels;
+ private transient FieldGetter[] keyFieldGetters;
+ private transient FieldGetter[] partitionFieldGetters;
+
+ public UpsertKeyChannelComputer(
+ TableSchema schema, List upsertKeyColumns, int indexParallelism) {
+ this.schema = schema;
+ this.upsertKeyColumns = upsertKeyColumns;
+ this.indexParallelism = indexParallelism;
+ }
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ RowType rowType = schema.logicalRowType();
+
+ this.keyFieldGetters = new FieldGetter[upsertKeyColumns.size()];
+ for (int i = 0; i < upsertKeyColumns.size(); i++) {
+ int idx = rowType.getFieldIndex(upsertKeyColumns.get(i));
+ keyFieldGetters[i] = InternalRow.createFieldGetter(rowType.getTypeAt(idx), idx);
+ }
+
+ List partitionKeys = schema.partitionKeys();
+ this.partitionFieldGetters = new FieldGetter[partitionKeys.size()];
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ int idx = rowType.getFieldIndex(partitionKeys.get(i));
+ partitionFieldGetters[i] = InternalRow.createFieldGetter(rowType.getTypeAt(idx), idx);
+ }
+ }
+
+ @Override
+ public int channel(InternalRow record) {
+ int partHash = 0;
+ for (FieldGetter getter : partitionFieldGetters) {
+ Object val = getter.getFieldOrNull(record);
+ partHash = partHash * 31 + (val == null ? 0 : val.hashCode());
+ }
+
+ int keyHash = 0;
+ for (FieldGetter getter : keyFieldGetters) {
+ Object val = getter.getFieldOrNull(record);
+ keyHash = keyHash * 31 + (val == null ? 0 : val.hashCode());
+ }
+
+ long slot =
+ (long) (partHash & Integer.MAX_VALUE) * indexParallelism
+ + (keyHash & Integer.MAX_VALUE) % indexParallelism;
+ return (int) (slot % numChannels);
+ }
+
+ @Override
+ public String toString() {
+ return "shuffle by partition+upsert key "
+ + upsertKeyColumns
+ + " (indexParallelism="
+ + indexParallelism
+ + ")";
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/dataevolution/DataEvolutionUpsertITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/dataevolution/DataEvolutionUpsertITCase.java
new file mode 100644
index 000000000000..d0253efcf50c
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/dataevolution/DataEvolutionUpsertITCase.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.dataevolution;
+
+import org.apache.paimon.flink.action.ActionITCaseBase;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED;
+import static org.apache.paimon.CoreOptions.ROW_TRACKING_ENABLED;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildDdl;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.sEnv;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
+
+/** ITCase for data evolution streaming upsert via {@code data-evolution.upsert-keys}. */
+public class DataEvolutionUpsertITCase extends ActionITCaseBase {
+
+ @BeforeEach
+ public void setup() throws Exception {
+ init(warehouse);
+ }
+
+ @Test
+ public void testBasicUpsert() throws Exception {
+ createTable("T", false);
+ batchInsert("T", "(1, 'a', 1.0)", "(2, 'b', 2.0)", "(3, 'c', 3.0)");
+
+ upsert("T", "id", "(1, 'a_new', 10.0)", "(3, 'c_new', 30.0)");
+
+ List expected =
+ Arrays.asList(
+ Row.of(1, "a_new", 10.0), Row.of(2, "b", 2.0), Row.of(3, "c_new", 30.0));
+ testBatchRead("SELECT * FROM T ORDER BY id", expected);
+ }
+
+ @Test
+ public void testInsertOnly() throws Exception {
+ createTable("T", false);
+
+ upsert("T", "id", "(1, 'a', 1.0)", "(2, 'b', 2.0)", "(3, 'c', 3.0)");
+
+ List expected =
+ Arrays.asList(Row.of(1, "a", 1.0), Row.of(2, "b", 2.0), Row.of(3, "c", 3.0));
+ testBatchRead("SELECT * FROM T ORDER BY id", expected);
+ }
+
+ @Test
+ public void testMixedInsertAndUpdate() throws Exception {
+ createTable("T", false);
+ batchInsert("T", "(1, 'a', 1.0)", "(2, 'b', 2.0)", "(3, 'c', 3.0)");
+
+ upsert(
+ "T",
+ "id",
+ "(2, 'b_new', 20.0)",
+ "(3, 'c_new', 30.0)",
+ "(4, 'd', 4.0)",
+ "(5, 'e', 5.0)");
+
+ List expected =
+ Arrays.asList(
+ Row.of(1, "a", 1.0),
+ Row.of(2, "b_new", 20.0),
+ Row.of(3, "c_new", 30.0),
+ Row.of(4, "d", 4.0),
+ Row.of(5, "e", 5.0));
+ testBatchRead("SELECT * FROM T ORDER BY id", expected);
+ }
+
+ @Test
+ public void testMultipleUpserts() throws Exception {
+ createTable("T", false);
+ batchInsert("T", "(1, 'a', 1.0)", "(2, 'b', 2.0)", "(3, 'c', 3.0)");
+
+ upsert("T", "id", "(1, 'a2', 10.0)", "(4, 'd', 4.0)");
+
+ upsert("T", "id", "(2, 'b2', 20.0)", "(4, 'd2', 40.0)");
+
+ List expected =
+ Arrays.asList(
+ Row.of(1, "a2", 10.0),
+ Row.of(2, "b2", 20.0),
+ Row.of(3, "c", 3.0),
+ Row.of(4, "d2", 40.0));
+ testBatchRead("SELECT * FROM T ORDER BY id", expected);
+ }
+
+ @Test
+ public void testCompositeKey() throws Exception {
+ Map options = baseOptions();
+ sEnv.executeSql(
+ buildDdl(
+ "T",
+ Arrays.asList("id INT", "sub_id INT", "name STRING"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options));
+
+ bEnv.executeSql("INSERT INTO T VALUES (1, 1, 'a'), (1, 2, 'b'), (2, 1, 'c')").await();
+
+ sEnv.executeSql(
+ "INSERT INTO T /*+ OPTIONS('data-evolution.upsert-keys'='id,sub_id') */ "
+ + "VALUES (1, 2, 'b_new'), (2, 1, 'c_new'), (3, 1, 'd')")
+ .await();
+
+ List expected =
+ Arrays.asList(
+ Row.of(1, 1, "a"),
+ Row.of(1, 2, "b_new"),
+ Row.of(2, 1, "c_new"),
+ Row.of(3, 1, "d"));
+ testBatchRead("SELECT * FROM T ORDER BY id, sub_id", expected);
+ }
+
+ @Test
+ public void testPartialColumnUpdate() throws Exception {
+ createTable("T", false);
+ batchInsert("T", "(1, 'a', 1.0)", "(2, 'b', 2.0)", "(3, 'c', 3.0)");
+
+ // Update only 'name' column; 'value' is NULL meaning "don't change"
+ upsert(
+ "T",
+ "id",
+ "(1, 'a_new', CAST(NULL AS DOUBLE))",
+ "(3, 'c_new', CAST(NULL AS DOUBLE))");
+
+ List expected =
+ Arrays.asList(
+ Row.of(1, "a_new", 1.0), Row.of(2, "b", 2.0), Row.of(3, "c_new", 3.0));
+ testBatchRead("SELECT * FROM T ORDER BY id", expected);
+ }
+
+ @Test
+ public void testPartialColumnUpdateValueOnly() throws Exception {
+ createTable("T", false);
+ batchInsert("T", "(1, 'a', 1.0)", "(2, 'b', 2.0)", "(3, 'c', 3.0)");
+
+ // Update only 'value' column; 'name' is NULL meaning "don't change"
+ upsert("T", "id", "(1, CAST(NULL AS STRING), 10.0)", "(2, CAST(NULL AS STRING), 20.0)");
+
+ List expected =
+ Arrays.asList(Row.of(1, "a", 10.0), Row.of(2, "b", 20.0), Row.of(3, "c", 3.0));
+ testBatchRead("SELECT * FROM T ORDER BY id", expected);
+ }
+
+ @Test
+ public void testPartialColumnUpdateThenFullUpdate() throws Exception {
+ createTable("T", false);
+ batchInsert("T", "(1, 'a', 1.0)", "(2, 'b', 2.0)");
+
+ // First: partial update (only name)
+ upsert("T", "id", "(1, 'a_v2', CAST(NULL AS DOUBLE))");
+
+ // Second: full update (all columns non-NULL)
+ upsert("T", "id", "(2, 'b_v2', 22.0)");
+
+ List expected = Arrays.asList(Row.of(1, "a_v2", 1.0), Row.of(2, "b_v2", 22.0));
+ testBatchRead("SELECT * FROM T ORDER BY id", expected);
+ }
+
+ @Test
+ public void testPartialColumnUpdateWithInsert() throws Exception {
+ createTable("T", false);
+ batchInsert("T", "(1, 'a', 1.0)", "(2, 'b', 2.0)");
+
+ // Mix partial update and new insert in the same upsert
+ upsert("T", "id", "(1, 'a_new', CAST(NULL AS DOUBLE))", "(3, 'new_row', 3.0)");
+
+ List expected =
+ Arrays.asList(
+ Row.of(1, "a_new", 1.0), Row.of(2, "b", 2.0), Row.of(3, "new_row", 3.0));
+ testBatchRead("SELECT * FROM T ORDER BY id", expected);
+ }
+
+ @Test
+ public void testMultiplePartialUpdates() throws Exception {
+ createTable("T", false);
+ batchInsert("T", "(1, 'a', 1.0)", "(2, 'b', 2.0)");
+
+ // First partial update: only name
+ upsert("T", "id", "(1, 'a_v2', CAST(NULL AS DOUBLE))");
+
+ // Second partial update: only value
+ upsert("T", "id", "(1, CAST(NULL AS STRING), 100.0)");
+
+ List expected = Arrays.asList(Row.of(1, "a_v2", 100.0), Row.of(2, "b", 2.0));
+ testBatchRead("SELECT * FROM T ORDER BY id", expected);
+ }
+
+ @Test
+ public void testMixedPartialColumnUpdatesInSameBatch() throws Exception {
+ createTable("T", false);
+ batchInsert("T", "(1, 'a', 1.0)", "(2, 'b', 2.0)");
+
+ // Row 1 updates 'name' only, row 2 updates 'value' only — different columns in same batch
+ upsert("T", "id", "(1, 'a_v2', CAST(NULL AS DOUBLE))", "(2, CAST(NULL AS STRING), 20.0)");
+
+ List expected = Arrays.asList(Row.of(1, "a_v2", 1.0), Row.of(2, "b", 20.0));
+ testBatchRead("SELECT * FROM T ORDER BY id", expected);
+ }
+
+ private void createTable(String tableName, boolean partitioned) {
+ List fields = Arrays.asList("id INT", "name STRING", "`value` DOUBLE");
+ List partitionKeys =
+ partitioned ? Collections.singletonList("dt") : Collections.emptyList();
+ if (partitioned) {
+ fields = Arrays.asList("id INT", "name STRING", "`value` DOUBLE", "dt STRING");
+ }
+ sEnv.executeSql(
+ buildDdl(tableName, fields, Collections.emptyList(), partitionKeys, baseOptions()));
+ }
+
+ private static Map baseOptions() {
+ Map options = new HashMap<>();
+ options.put(ROW_TRACKING_ENABLED.key(), "true");
+ options.put(DATA_EVOLUTION_ENABLED.key(), "true");
+ options.put("bucket", "-1");
+ return options;
+ }
+
+ private void batchInsert(String tableName, String... records) throws Exception {
+ bEnv.executeSql(
+ String.format(
+ "INSERT INTO `%s` VALUES %s", tableName, String.join(",", records)))
+ .await();
+ }
+
+ private void upsert(String tableName, String upsertKeys, String... records) throws Exception {
+ sEnv.executeSql(
+ String.format(
+ "INSERT INTO `%s` /*+ OPTIONS('data-evolution.upsert-keys'='%s') */ VALUES %s",
+ tableName, upsertKeys, String.join(",", records)))
+ .await();
+ }
+}