Skip to content

Commit 82c784a

Browse files
authored
[Improve]]Connector-V2\[Kafka] Set kafka consumer default group (#4271)
1 parent ec4ebb4 commit 82c784a

File tree

2 files changed

+4
-4
lines changed
  • seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka

2 files changed

+4
-4
lines changed

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class Config {
7171
public static final Option<String> CONSUMER_GROUP =
7272
Options.key("consumer.group")
7373
.stringType()
74-
.noDefaultValue()
74+
.defaultValue("SeaTunnel-Consumer-Group")
7575
.withDescription(
7676
"Kafka consumer group id, used to distinguish different consumer groups.");
7777

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,6 @@ public class KafkaSource
7979
implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, KafkaSourceState>,
8080
SupportParallelism {
8181

82-
private static final String DEFAULT_CONSUMER_GROUP = "SeaTunnel-Consumer-Group";
83-
8482
private final ConsumerMetadata metadata = new ConsumerMetadata();
8583
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
8684
private SeaTunnelRowType typeInfo;
@@ -113,14 +111,16 @@ public void prepare(Config config) throws PrepareFailException {
113111
this.metadata.setTopic(config.getString(TOPIC.key()));
114112
if (config.hasPath(PATTERN.key())) {
115113
this.metadata.setPattern(config.getBoolean(PATTERN.key()));
114+
} else {
115+
this.metadata.setPattern(PATTERN.defaultValue());
116116
}
117117
this.metadata.setBootstrapServers(config.getString(BOOTSTRAP_SERVERS.key()));
118118
this.metadata.setProperties(new Properties());
119119

120120
if (config.hasPath(CONSUMER_GROUP.key())) {
121121
this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP.key()));
122122
} else {
123-
this.metadata.setConsumerGroup(DEFAULT_CONSUMER_GROUP);
123+
this.metadata.setConsumerGroup(CONSUMER_GROUP.defaultValue());
124124
}
125125

126126
if (config.hasPath(COMMIT_ON_CHECKPOINT.key())) {

0 commit comments

Comments
 (0)