diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 82761adf7349..a045a9276c54 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -120,8 +120,8 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); } return dataStream - .transform(opIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory) - .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME)) + .transform(opName("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory) + .uid(opUID("bucket_bulk_insert", conf)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) .addSink(DummySink.INSTANCE) .name("dummy"); @@ -152,7 +152,7 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT } } return dataStream - .transform(opIdentifier("hoodie_bulk_insert_write", conf), + .transform(opName("hoodie_bulk_insert_write", conf), TypeInformation.of(Object.class), operatorFactory) // follow the parallelism of upstream operators to avoid shuffle @@ -196,8 +196,8 @@ public static DataStream append( WriteOperatorFactory operatorFactory = AppendWriteOperator.getFactory(conf, rowType); return dataStream - .transform(opIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory) - .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) + .transform(opName("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory) + .uid(opUID("hoodie_stream_write", conf)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } @@ -254,7 +254,7 @@ private static DataStream streamBootstrap( TypeInformation.of(HoodieRecord.class), new BootstrapOperator<>(conf)) .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism())) - .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); + .uid(opUID("index_bootstrap", conf)); } return dataStream1; @@ -280,7 +280,7 @@ private static DataStream boundedBootstrap( TypeInformation.of(HoodieRecord.class), new BatchBootstrapOperator<>(conf)) .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream.getParallelism())) - .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); + .uid(opUID("batch_index_bootstrap", conf)); } /** @@ -320,8 +320,8 @@ public static DataStream hoodieStreamWrite(Configuration conf, DataStrea String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD); BucketIndexPartitioner partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields); return dataStream.partitionCustom(partitioner, HoodieRecord::getKey) - .transform(opIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory) - .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME)) + .transform(opName("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory) + .uid(opUID("bucket_write", conf)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } else { WriteOperatorFactory operatorFactory = StreamWriteOperator.getFactory(conf); @@ -332,12 +332,12 @@ public static DataStream hoodieStreamWrite(Configuration conf, DataStrea "bucket_assigner", TypeInformation.of(HoodieRecord.class), new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) - .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)) + .uid(opUID("bucket_assigner", conf)) .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS)) // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory) - .uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) + .transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory) + .uid(opUID("stream_write", conf)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } } @@ -435,10 +435,14 @@ public static DataStreamSink dummySink(DataStream dataStream) { .name("dummy"); } - public static String opIdentifier(String operatorN, Configuration conf) { + public static String opName(String operatorN, Configuration conf) { return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME); } + public static String opUID(String operatorN, Configuration conf) { + return "uid_" + operatorN + "_" + conf.getString(FlinkOptions.TABLE_NAME); + } + /** * Dummy sink that does nothing. */ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 4533ef717960..f4c301892e58 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -37,6 +37,7 @@ import org.apache.hudi.configuration.OptionsInference; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.source.FileIndex; import org.apache.hudi.source.IncrementalInputSplits; import org.apache.hudi.source.StreamReadMonitoringFunction; @@ -188,9 +189,11 @@ public DataStream produceDataStream(StreamExecutionEnvironment execEnv) InputFormat inputFormat = getInputFormat(true); OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor")) + .uid(Pipelines.opUID("split_monitor", conf)) .setParallelism(1) .keyBy(MergeOnReadInputSplit::getFileId) .transform("split_reader", typeInfo, factory) + .uid(Pipelines.opUID("split_reader", conf)) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource<>(source); } else {