Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class ClientConfig {
/**
* How long to timeout poll requests.
*/
private final long pollTimeoutMs = 2000;
private final long pollTimeoutMs;

/**
* Constructor.
Expand All @@ -89,7 +89,7 @@ private ClientConfig(
final FilterConfig filterConfig,
final String consumerId,
final StartingPosition startingPosition) {
this(topicConfig, filterConfig, consumerId, startingPosition, new ArrayList<>(), 10, true);
this(topicConfig, filterConfig, consumerId, startingPosition, new ArrayList<>(), 10, true, 2000);
}

/**
Expand All @@ -101,6 +101,7 @@ private ClientConfig(
* @param partitionIds List of partitionIds to limit consuming from.
* @param maxResultsPerPartition How many records to poll per partition.
* @param isAutoCommitEnabled If the consumer should auto commit state or not.
* @param pollTimeoutMs poll timeout in milliseconds.
*/
private ClientConfig(
final TopicConfig topicConfig,
Expand All @@ -109,7 +110,8 @@ private ClientConfig(
final StartingPosition startingPosition,
final Collection<Integer> partitionIds,
final int maxResultsPerPartition,
final boolean isAutoCommitEnabled) {
final boolean isAutoCommitEnabled,
final long pollTimeoutMs) {

this.topicConfig = topicConfig;
this.filterConfig = filterConfig;
Expand All @@ -120,6 +122,7 @@ private ClientConfig(
this.partitionIds = Collections.unmodifiableSet(tempSet);
this.maxResultsPerPartition = maxResultsPerPartition;
this.isAutoCommitEnabled = isAutoCommitEnabled;
this.pollTimeoutMs = pollTimeoutMs;
}

public TopicConfig getTopicConfig() {
Expand Down Expand Up @@ -206,6 +209,7 @@ public static class Builder {
private StartingPosition startingPosition = StartingPosition.newResumeFromExistingState();
private int maxResultsPerPartition = 10;
private boolean autoCommit = true;
private long pollTimeoutMs = 2000;

/**
* Private constructor.
Expand Down Expand Up @@ -303,12 +307,22 @@ public Builder withAutoCommitDisabled() {
return this;
}


/**
* Declare pollTimeout in milliseconds.
*/
public Builder withPollTimeoutMs(final long pollTimeoutMs) {
this.pollTimeoutMs = pollTimeoutMs;
return this;
}

/**
* Create new ClientConfig instance.
*/
public ClientConfig build() {
return new ClientConfig(
topicConfig, filterConfig, consumerId, startingPosition, limitPartitions, maxResultsPerPartition, autoCommit);
topicConfig, filterConfig, consumerId, startingPosition, limitPartitions, maxResultsPerPartition,
autoCommit, pollTimeoutMs);
}
}
}