Skip to content

Commit

Permalink
Polish "Add a configuration property for KLC's idleBetweenPolls"
Browse files Browse the repository at this point in the history
  • Loading branch information
wilkinsona committed Aug 25, 2020
1 parent 03a8937 commit e9ab269
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ private void configureContainer(ContainerProperties container) {
map.from(properties::getAckTime).as(Duration::toMillis).to(container::setAckTime);
map.from(properties::getPollTimeout).as(Duration::toMillis).to(container::setPollTimeout);
map.from(properties::getNoPollThreshold).to(container::setNoPollThreshold);
map.from(properties.getIdleBetweenPolls()).as(Duration::toMillis).to(container::setIdleBetweenPolls);
map.from(properties::getIdleEventInterval).as(Duration::toMillis).to(container::setIdleEventInterval);
map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue)
.to(container::setMonitorInterval);
map.from(properties.getIdleBetweenPolls()).as(Duration::toMillis).to(container::setIdleBetweenPolls);
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
map.from(this.transactionManager).to(container::setTransactionManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,15 +886,14 @@ public enum Type {
private Duration ackTime;

/**
* Time between publishing idle consumer events (no data received).
* Sleep interval between Consumer.poll(Duration) calls.
*/
private Duration idleEventInterval;
private Duration idleBetweenPolls = Duration.ZERO;

/**
* The sleep interval in milliseconds between
* {@link org.apache.kafka.clients.consumer.Consumer#poll(Duration)} calls.
* Time between publishing idle consumer events (no data received).
*/
private Duration idleBetweenPolls;
private Duration idleEventInterval;

/**
* Time between checks for non-responsive consumers. If a duration suffix is not
Expand Down Expand Up @@ -978,20 +977,20 @@ public void setAckTime(Duration ackTime) {
this.ackTime = ackTime;
}

public Duration getIdleEventInterval() {
return this.idleEventInterval;
public Duration getIdleBetweenPolls() {
return this.idleBetweenPolls;
}

public void setIdleEventInterval(Duration idleEventInterval) {
this.idleEventInterval = idleEventInterval;
public void setIdleBetweenPolls(Duration idleBetweenPolls) {
this.idleBetweenPolls = idleBetweenPolls;
}

public Duration getIdleBetweenPolls() {
return idleBetweenPolls;
public Duration getIdleEventInterval() {
return this.idleEventInterval;
}

public void setIdleBetweenPolls(Duration idleBetweenPolls) {
this.idleBetweenPolls = idleBetweenPolls;
public void setIdleEventInterval(Duration idleEventInterval) {
this.idleEventInterval = idleEventInterval;
}

public Duration getMonitorInterval() {
Expand All @@ -1017,6 +1016,7 @@ public boolean isMissingTopicsFatal() {
public void setMissingTopicsFatal(boolean missingTopicsFatal) {
this.missingTopicsFatal = missingTopicsFatal;
}

}

public static class Ssl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ void listenerProperties() {
"spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456",
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000",
"spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch",
"spring.kafka.listener.idle-event-interval=1s", "spring.kafka.listener.monitor-interval=45",
"spring.kafka.listener.log-container-config=true",
"spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s",
"spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true",
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
"spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo",
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true")
Expand All @@ -392,6 +392,7 @@ void listenerProperties() {
assertThat(containerProperties.getAckTime()).isEqualTo(456L);
assertThat(containerProperties.getPollTimeout()).isEqualTo(2000L);
assertThat(containerProperties.getNoPollThreshold()).isEqualTo(2.5f);
assertThat(containerProperties.getIdleBetweenPolls()).isEqualTo(1000L);
assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L);
assertThat(containerProperties.getMonitorInterval()).isEqualTo(45);
assertThat(containerProperties.isLogContainerConfig()).isTrue();
Expand Down

0 comments on commit e9ab269

Please sign in to comment.