diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/config/ClientConfig.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/config/ClientConfig.java index b0ff446d..8b8712bc 100644 --- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/config/ClientConfig.java +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/config/ClientConfig.java @@ -75,7 +75,7 @@ public class ClientConfig { /** * How long to timeout poll requests. */ - private final long pollTimeoutMs = 2000; + private final long pollTimeoutMs; /** * Constructor. @@ -89,7 +89,7 @@ private ClientConfig( final FilterConfig filterConfig, final String consumerId, final StartingPosition startingPosition) { - this(topicConfig, filterConfig, consumerId, startingPosition, new ArrayList<>(), 10, true); + this(topicConfig, filterConfig, consumerId, startingPosition, new ArrayList<>(), 10, true, 2000); } /** @@ -101,6 +101,7 @@ private ClientConfig( * @param partitionIds List of partitionIds to limit consuming from. * @param maxResultsPerPartition How many records to poll per partition. * @param isAutoCommitEnabled If the consumer should auto commit state or not. + * @param pollTimeoutMs poll timeout in milliseconds. */ private ClientConfig( final TopicConfig topicConfig, @@ -109,7 +110,8 @@ private ClientConfig( final StartingPosition startingPosition, final Collection partitionIds, final int maxResultsPerPartition, - final boolean isAutoCommitEnabled) { + final boolean isAutoCommitEnabled, + final long pollTimeoutMs) { this.topicConfig = topicConfig; this.filterConfig = filterConfig; @@ -120,6 +122,7 @@ private ClientConfig( this.partitionIds = Collections.unmodifiableSet(tempSet); this.maxResultsPerPartition = maxResultsPerPartition; this.isAutoCommitEnabled = isAutoCommitEnabled; + this.pollTimeoutMs = pollTimeoutMs; } public TopicConfig getTopicConfig() { @@ -206,6 +209,7 @@ public static class Builder { private StartingPosition startingPosition = StartingPosition.newResumeFromExistingState(); private int maxResultsPerPartition = 10; private boolean autoCommit = true; + private long pollTimeoutMs = 2000; /** * Private constructor. @@ -303,12 +307,22 @@ public Builder withAutoCommitDisabled() { return this; } + + /** + * Declare pollTimeout in milliseconds. + */ + public Builder withPollTimeoutMs(final long pollTimeoutMs) { + this.pollTimeoutMs = pollTimeoutMs; + return this; + } + /** * Create new ClientConfig instance. */ public ClientConfig build() { return new ClientConfig( - topicConfig, filterConfig, consumerId, startingPosition, limitPartitions, maxResultsPerPartition, autoCommit); + topicConfig, filterConfig, consumerId, startingPosition, limitPartitions, maxResultsPerPartition, + autoCommit, pollTimeoutMs); } } }