diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java index fba1803237..8035028dec 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java @@ -160,7 +160,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { "Fluss table sink does not support partial updates for table without primary key. Please make sure the " + "number of specified columns in INSERT INTO matches columns of the Fluss table."); } - if (mergeEngineType != null) { + if (mergeEngineType != null && mergeEngineType != MergeEngineType.AGGREGATION) { throw new ValidationException( String.format( "Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the " diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java index 08b6b6c9f2..6916f62b75 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java @@ -1437,6 +1437,40 @@ void testComprehensiveAggregationFunctions() throws Exception { assertResultsIgnoreOrder(rowIter, expectedRows, true); } + @Test + void testPartialUpdateOnAggregationMergeEngine() throws Exception { + tEnv.executeSql( + "create table agg_partial_update (" + + "id int not null primary key not enforced, " + + "sum_a int, " + + "sum_b int" + + ") with (" + + "'table.merge-engine' = 'aggregation', " + + "'fields.sum_a.agg' = 'sum', " + + "'fields.sum_b.agg' = 'sum')"); + + // Full insert: all fields + tEnv.executeSql("INSERT INTO agg_partial_update VALUES (1, 100, 200)").await(); + + // Partial insert: only update sum_a + tEnv.executeSql("INSERT INTO agg_partial_update(id, sum_a) VALUES (1, 50)").await(); + + CloseableIterator rowIter = + tEnv.executeSql("SELECT * FROM agg_partial_update").collect(); + + List expectedRows = + Arrays.asList("+I[1, 100, 200]", "-U[1, 100, 200]", "+U[1, 150, 200]"); + + assertResultsIgnoreOrder(rowIter, expectedRows, false); + + // Partial insert on a new key (no prior full insert) — unspecified columns should be null + tEnv.executeSql("INSERT INTO agg_partial_update(id, sum_a) VALUES (2, 77)").await(); + + expectedRows = Arrays.asList("+I[2, 77, null]"); + + assertResultsIgnoreOrder(rowIter, expectedRows, true); + } + private InsertAndExpectValues rowsToInsertInto(Collection partitions) { List insertValues = new ArrayList<>(); List expectedValues = new ArrayList<>();