From c6b7ec93ecd4b07715369a9e052f4025dee9f7be Mon Sep 17 00:00:00 2001 From: Harish Narayana Date: Thu, 27 Oct 2022 12:53:35 -0700 Subject: [PATCH 1/2] parameterize pollTimeoutMs --- .../ui/manager/kafka/config/ClientConfig.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/config/ClientConfig.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/config/ClientConfig.java index b0ff446d..9f29b72e 100644 --- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/config/ClientConfig.java +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/config/ClientConfig.java @@ -75,7 +75,7 @@ public class ClientConfig { /** * How long to timeout poll requests. */ - private final long pollTimeoutMs = 2000; + private final long pollTimeoutMs; /** * Constructor. @@ -89,7 +89,7 @@ private ClientConfig( final FilterConfig filterConfig, final String consumerId, final StartingPosition startingPosition) { - this(topicConfig, filterConfig, consumerId, startingPosition, new ArrayList<>(), 10, true); + this(topicConfig, filterConfig, consumerId, startingPosition, new ArrayList<>(), 10, true, 2000); } /** @@ -101,6 +101,7 @@ private ClientConfig( * @param partitionIds List of partitionIds to limit consuming from. * @param maxResultsPerPartition How many records to poll per partition. * @param isAutoCommitEnabled If the consumer should auto commit state or not. + * @param pollTimeoutMs poll timeout in milliseconds. */ private ClientConfig( final TopicConfig topicConfig, @@ -109,7 +110,8 @@ private ClientConfig( final StartingPosition startingPosition, final Collection partitionIds, final int maxResultsPerPartition, - final boolean isAutoCommitEnabled) { + final boolean isAutoCommitEnabled, + final long pollTimeoutMs) { this.topicConfig = topicConfig; this.filterConfig = filterConfig; @@ -120,6 +122,7 @@ private ClientConfig( this.partitionIds = Collections.unmodifiableSet(tempSet); this.maxResultsPerPartition = maxResultsPerPartition; this.isAutoCommitEnabled = isAutoCommitEnabled; + this.pollTimeoutMs = pollTimeoutMs; } public TopicConfig getTopicConfig() { @@ -206,6 +209,7 @@ public static class Builder { private StartingPosition startingPosition = StartingPosition.newResumeFromExistingState(); private int maxResultsPerPartition = 10; private boolean autoCommit = true; + private long pollTimeoutMs = 2000; /** * Private constructor. @@ -303,12 +307,21 @@ public Builder withAutoCommitDisabled() { return this; } + + /** + * Declare pollTimeout in milliseconds. + */ + public Builder withPollTimeoutMs(final long pollTimeoutMs) { + this.pollTimeoutMs = pollTimeoutMs; + return this; + } + /** * Create new ClientConfig instance. */ public ClientConfig build() { return new ClientConfig( - topicConfig, filterConfig, consumerId, startingPosition, limitPartitions, maxResultsPerPartition, autoCommit); + topicConfig, filterConfig, consumerId, startingPosition, limitPartitions, maxResultsPerPartition, autoCommit, pollTimeoutMs); } } } From 00a5166b875f376fb54a1dcec1d8437578157669 Mon Sep 17 00:00:00 2001 From: Harish Narayana Date: Thu, 27 Oct 2022 21:55:33 -0700 Subject: [PATCH 2/2] fix(checkstyle): checkstyle violation fix --- .../kafka/webview/ui/manager/kafka/config/ClientConfig.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/config/ClientConfig.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/config/ClientConfig.java index 9f29b72e..8b8712bc 100644 --- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/config/ClientConfig.java +++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/config/ClientConfig.java @@ -321,7 +321,8 @@ public Builder withPollTimeoutMs(final long pollTimeoutMs) { */ public ClientConfig build() { return new ClientConfig( - topicConfig, filterConfig, consumerId, startingPosition, limitPartitions, maxResultsPerPartition, autoCommit, pollTimeoutMs); + topicConfig, filterConfig, consumerId, startingPosition, limitPartitions, maxResultsPerPartition, + autoCommit, pollTimeoutMs); } } }