From 23a19d383aa409121346bc803618315142150d08 Mon Sep 17 00:00:00 2001 From: chenlei677 Date: Thu, 12 Mar 2026 18:55:39 +0800 Subject: [PATCH] fix: Fixed the issue of incorrect opName values in Flink bulk insert writing --- .../src/main/java/org/apache/hudi/sink/utils/Pipelines.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 5390c482ac5ac..567a8a00da4a0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -176,10 +176,11 @@ public static DataStream bulkInsert(Configuration conf, RowType rowType } // main write operator with following dummy sink in the end + String opName = isBucketIndexType ? "bucket_bulk_insert" : "hoodie_bulk_insert_write"; return dataStream - .transform(opName(isBucketIndexType ? "bucket_bulk_insert" : "hoodie_bulk_insert_write", conf), + .transform(opName(opName, conf), TypeInformation.of(RowData.class), BulkInsertWriteOperator.getFactory(conf, rowType)) - .uid(opUID("bucket_bulk_insert", conf)) + .uid(opUID(opName, conf)) .setParallelism(PARALLELISM_VALUE); }