Skip to content

Commit

Permalink
KAFKA-1147 Consumer socket timeout should be greater than fetch max w…
Browse files Browse the repository at this point in the history
…ait; reviewed by Neha Narkhede and Jun Rao
  • Loading branch information
Guozhang Wang authored and nehanarkhede committed Sep 14, 2014
1 parent e22f8ed commit 3d3c544
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/consumer/ConsumerConfig.scala
Expand Up @@ -102,8 +102,10 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
* Set this explicitly for only testing purpose. */
val consumerId: Option[String] = Option(props.getString("consumer.id", null))

/** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */
/** the socket timeout for network requests. Its value should be at least fetch.wait.max.ms. */
val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" +
" to prevent unnecessary socket timeouts")

/** the socket receive buffer for network requests */
val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize)
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Expand Up @@ -213,8 +213,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* If the lag in messages between a leader and a follower exceeds this number, the leader will remove the follower from isr */
val replicaLagMaxMessages = props.getLong("replica.lag.max.messages", 4000)

/* the socket timeout for network requests */
/* the socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms. */
val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
" to prevent unnecessary socket timeouts")

/* the socket receive buffer for network requests */
val replicaSocketReceiveBufferBytes = props.getInt("replica.socket.receive.buffer.bytes", ConsumerConfig.SocketBufferSize)
Expand Down

0 comments on commit 3d3c544

Please sign in to comment.