From 0d0a4152cfd362185066519ae926ac4513c7a152 Mon Sep 17 00:00:00 2001 From: feiyang_deepnova <736320652@qq.com> Date: Fri, 12 Aug 2022 11:24:56 +0800 Subject: [PATCH] [HUDI-4611] Fix the duplicate creation of config in HoodieFlinkStreamer (#6369) Co-authored-by: linfey --- .../java/org/apache/hudi/streamer/HoodieFlinkStreamer.java | 4 ++-- .../src/main/java/org/apache/hudi/util/StreamerUtil.java | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 013753b6d92..29f55f78acf 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -69,12 +69,12 @@ public static void main(String[] args) throws Exception { TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps(); kafkaProps.putAll(StreamerUtil.appendKafkaProps(cfg)); + Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg); // Read from kafka source RowType rowType = - (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg)) + (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) .getLogicalType(); - Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg); long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout(); int parallelism = env.getParallelism(); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index e5e30c1c987..4b93faeaf72 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -116,10 +116,6 @@ public static TypedProperties getProps(FlinkStreamerConfig cfg) { new Path(cfg.propsFilePath), cfg.configs).getProps(); } - public static Schema getSourceSchema(FlinkStreamerConfig cfg) { - return new FilebasedSchemaProvider(FlinkStreamerConfig.toFlinkConfig(cfg)).getSourceSchema(); - } - public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) { if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) { return new FilebasedSchemaProvider(conf).getSourceSchema();