Skip to content

Commit

Permalink
Allow kafka group.id to be configured (#5867)
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre De Paepe <pierre.de-paepe@corp.ovh.com>

Closes #3976
  • Loading branch information
pdepaepe authored and bernd committed Apr 17, 2019
1 parent 998cb6f commit dd5c025
Showing 1 changed file with 10 additions and 2 deletions.
Expand Up @@ -72,13 +72,13 @@
import static com.codahale.metrics.MetricRegistry.name;

public class KafkaTransport extends ThrottleableTransport {
public static final String GROUP_ID = "graylog2";
public static final String CK_FETCH_MIN_BYTES = "fetch_min_bytes";
public static final String CK_FETCH_WAIT_MAX = "fetch_wait_max";
public static final String CK_ZOOKEEPER = "zookeeper";
public static final String CK_TOPIC_FILTER = "topic_filter";
public static final String CK_THREADS = "threads";
public static final String CK_OFFSET_RESET = "offset_reset";
public static final String CK_GROUP_ID = "group_id";

// See https://kafka.apache.org/090/documentation.html for available values for "auto.offset.reset".
private static final ImmutableMap<String, String> OFFSET_RESET_VALUES = ImmutableMap.of(
Expand All @@ -87,6 +87,7 @@ public class KafkaTransport extends ThrottleableTransport {
);

private static final String DEFAULT_OFFSET_RESET = "largest";
private static final String DEFAULT_GROUP_ID = "graylog2";

private static final Logger LOG = LoggerFactory.getLogger(KafkaTransport.class);

Expand Down Expand Up @@ -185,7 +186,7 @@ public void run() {

final Properties props = new Properties();

props.put("group.id", GROUP_ID);
props.put("group.id", configuration.getString(CK_GROUP_ID, DEFAULT_GROUP_ID));
props.put("client.id", "gl2-" + nodeId + "-" + input.getId());

props.put("fetch.min.bytes", String.valueOf(configuration.getInt(CK_FETCH_MIN_BYTES)));
Expand Down Expand Up @@ -382,6 +383,13 @@ public ConfigurationRequest getRequestedConfiguration() {
"What to do when there is no initial offset in ZooKeeper or if an offset is out of range",
ConfigurationField.Optional.OPTIONAL));

cr.addField(new TextField(
CK_GROUP_ID,
"Consumer group id",
DEFAULT_GROUP_ID,
"Name of the consumer group the Kafka input belongs to",
ConfigurationField.Optional.OPTIONAL));

return cr;
}
}
Expand Down

0 comments on commit dd5c025

Please sign in to comment.