Skip to content

Commit

Permalink
Drop the groupId from the KafkaProperties
Browse files Browse the repository at this point in the history
It is no longer possible to configure a default groupId. Thus, this
field should be removed from the KafkaProperties object

#18
  • Loading branch information
smcvb committed Oct 24, 2019
1 parent 2d6baad commit d7b6d4a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 31 deletions.
Expand Up @@ -70,8 +70,7 @@
public class KafkaProperties {

/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
* Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
*/
private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));

Expand All @@ -88,8 +87,7 @@ public class KafkaProperties {
/**
* Controls the mode of event processor responsible for sending messages to Kafka.
* <p>
* Depending on this, different error handling behaviours are taken in case of
* any errors during Kafka publishing.
* Depending on this, different error handling behaviours are taken in case of any errors during Kafka publishing.
* </p>
* <p>
* Possible values are "SUBSCRIBING" (default) and "TRACKING".
Expand Down Expand Up @@ -248,7 +246,8 @@ public static class Consumer {
private final Ssl ssl = new Ssl();

/**
* Frequency in milliseconds that the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' true.
* Frequency in milliseconds that the consumer offsets are auto-committed to Kafka if 'enable.auto.commit'
* true.
*/
private Integer autoCommitInterval;

Expand Down Expand Up @@ -284,11 +283,6 @@ public static class Consumer {
*/
private Integer fetchMinSize;

/**
* Unique string that identifies the consumer group this consumer belongs to.
*/
private String groupId;

/**
* Expected time in milliseconds between heartbeats to the consumer coordinator.
*/
Expand Down Expand Up @@ -374,14 +368,6 @@ public void setFetchMinSize(Integer fetchMinSize) {
this.fetchMinSize = fetchMinSize;
}

public String getGroupId() {
return this.groupId;
}

public void setGroupId(String groupId) {
this.groupId = groupId;
}

public Integer getHeartbeatInterval() {
return this.heartbeatInterval;
}
Expand Down Expand Up @@ -442,9 +428,6 @@ public Map<String, Object> buildProperties() {
if (this.fetchMinSize != null) {
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinSize);
}
if (this.groupId != null) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
}
if (this.heartbeatInterval != null) {
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, this.heartbeatInterval);
}
Expand Down Expand Up @@ -503,9 +486,9 @@ public enum EventProcessorMode {
public static class Fetcher {

/**
* The time, in milliseconds, spent waiting in poll if data is not available in the buffer.
* If 0, returns immediately with any records that are available currently in the buffer, else returns empty.
* Must not be negative.
* The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns
* immediately with any records that are available currently in the buffer, else returns empty. Must not be
* negative.
*
* @see KafkaConsumer#poll(Duration)
*/
Expand Down
Expand Up @@ -35,7 +35,6 @@
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.extensions.kafka.eventhandling.consumer.KafkaMessageSource;
import org.axonframework.extensions.kafka.eventhandling.producer.ConfirmationMode;
import org.axonframework.extensions.kafka.eventhandling.producer.DefaultProducerFactory;
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher;
Expand Down Expand Up @@ -78,8 +77,7 @@ public void testAutoConfigurationWithMinimalRequiredProperties() {
this.contextRunner.withUserConfiguration(TestConfiguration.class)
.withPropertyValues(
"axon.kafka.default-topic=testTopic",
"axon.kafka.producer.transaction-id-prefix=foo",
"axon.kafka.consumer.group-id=bar"
"axon.kafka.producer.transaction-id-prefix=foo"
).run(context -> {
// Required bean assertions
assertThat(context.getBeanNamesForType(ProducerFactory.class)).hasSize(1);
Expand Down Expand Up @@ -132,7 +130,6 @@ public void testConsumerPropertiesAreAdjustedAsExpected() {
this.contextRunner.withUserConfiguration(TestConfiguration.class)
.withPropertyValues(
"axon.kafka.default-topic=testTopic",
"axon.kafka.consumer.group-id=bar",
// Overrides 'axon.kafka.bootstrap-servers'
"axon.kafka.bootstrap-servers=foo:1234",
"axon.kafka.properties.foo=bar",
Expand Down Expand Up @@ -181,7 +178,6 @@ public void testConsumerPropertiesAreAdjustedAsExpected() {
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
assertThat(configs.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456);
assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(789);
assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234);
assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)).isEqualTo(LongDeserializer.class);
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
Expand Down Expand Up @@ -251,7 +247,6 @@ public void testKafkaPropertiesTrackingMode() {
// Minimal Required Properties
"axon.kafka.default-topic=testTopic",
"axon.kafka.producer.transaction-id-prefix=foo",
"axon.kafka.consumer.group-id=bar",
// Event Handling Mode
"axon.kafka.event-processor-mode=TRACKING"
).run(context -> {
Expand All @@ -277,7 +272,6 @@ public void testKafkaPropertiesSubscribingMode() {
// Minimal Required Properties
"axon.kafka.default-topic=testTopic",
"axon.kafka.producer.transaction-id-prefix=foo",
"axon.kafka.consumer.group-id=bar",
// Event Handling Mode
"axon.kafka.event-processor-mode=SUBSCRIBING"
).run(context -> {
Expand Down

0 comments on commit d7b6d4a

Please sign in to comment.