diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 5390c482ac5ac..567a8a00da4a0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -176,10 +176,11 @@ public static DataStream bulkInsert(Configuration conf, RowType rowType } // main write operator with following dummy sink in the end + String opName = isBucketIndexType ? "bucket_bulk_insert" : "hoodie_bulk_insert_write"; return dataStream - .transform(opName(isBucketIndexType ? "bucket_bulk_insert" : "hoodie_bulk_insert_write", conf), + .transform(opName(opName, conf), TypeInformation.of(RowData.class), BulkInsertWriteOperator.getFactory(conf, rowType)) - .uid(opUID("bucket_bulk_insert", conf)) + .uid(opUID(opName, conf)) .setParallelism(PARALLELISM_VALUE); }