Permalink
Browse files

Fault tolerance broken with replication factor 1; kafka-691; patched …

…by Maxime Brugidou; reviewed by Jun Rao
  • Loading branch information...
1 parent ddd66cb commit b71e6dc352770f22daec0c9a3682138666f032be @junrao junrao committed Jan 10, 2013
@@ -24,9 +24,6 @@ private class DefaultPartitioner[T](props: VerifiableProperties = null) extends
private val random = new java.util.Random
def partition(key: T, numPartitions: Int): Int = {
- if(key == null)
- random.nextInt(numPartitions)
- else
- Utils.abs(key.hashCode) % numPartitions
+ Utils.abs(key.hashCode) % numPartitions
}
}
@@ -113,5 +113,15 @@ class ProducerConfig private (val props: VerifiableProperties)
val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
+ /**
+ * The producer generally refreshes the topic metadata from brokers when there is a failure
+ * (partition missing, leader not available...). It will also poll regularly (default: every 10min
+ * so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure.
+ * If you set this to zero, the metadata will get refreshed after each message sent (not recommended)
+ * Important note: the refresh happen only AFTER the message is sent, so if the producer never sends
+ * a message the metadata is never refreshed
+ */
+ val topicMetadataRefreshIntervalMs = props.getInt("producer.metadata.refresh.interval.ms", 600000)
+
validate(this)
}
@@ -21,13 +21,12 @@ import kafka.common._
import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.producer._
import kafka.serializer.Encoder
-import kafka.utils.{Utils, Logging}
+import kafka.utils.{Utils, Logging, SystemTime}
import scala.collection.{Seq, Map}
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
import java.util.concurrent.atomic._
import kafka.api.{TopicMetadata, ProducerRequest}
-
class DefaultEventHandler[K,V](config: ProducerConfig,
private val partitioner: Partitioner[K],
private val encoder: Encoder[V],
@@ -43,6 +42,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
private val lock = new Object()
+ private val topicMetadataRefreshInterval = config.topicMetadataRefreshIntervalMs
+ private var lastTopicMetadataRefreshTime = 0L
+ private val topicMetadataToRefresh = Set.empty[String]
+
private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
@@ -58,6 +61,13 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
var outstandingProduceRequests = serializedData
var remainingRetries = config.producerRetries + 1
while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
+ topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
+ if (topicMetadataRefreshInterval >= 0 &&
+ SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
+ Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet))
+ topicMetadataToRefresh.clear
+ lastTopicMetadataRefreshTime = SystemTime.milliseconds
+ }
outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
if (outstandingProduceRequests.size > 0) {
// back off and update the topic metadata cache before attempting another send operation
@@ -133,9 +143,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
try {
for (message <- messages) {
val topicPartitionsList = getPartitionListForTopic(message)
- val totalNumPartitions = topicPartitionsList.length
-
- val partitionIndex = getPartition(message.key, totalNumPartitions)
+ val partitionIndex = getPartition(message.key, topicPartitionsList)
val brokerPartition = topicPartitionsList(partitionIndex)
// postpone the failure until the send operation, so that requests for other brokers are handled correctly
@@ -184,17 +192,24 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
* Retrieves the partition id and throws an UnknownTopicOrPartitionException if
* the value of partition is not between 0 and numPartitions-1
* @param key the partition key
- * @param numPartitions the total number of available partitions
+ * @param topicPartitionList the list of available partitions
* @return the partition id
*/
- private def getPartition(key: K, numPartitions: Int): Int = {
+ private def getPartition(key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = {
+ val numPartitions = topicPartitionList.size
if(numPartitions <= 0)
throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions +
"\n Valid values are > 0")
val partition =
- if(key == null)
- Utils.abs(partitionCounter.getAndIncrement()) % numPartitions
- else
+ if(key == null) {
+ // If the key is null, we don't really need a partitioner so we just send to the next
+ // available partition
+ val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
+ if (availablePartitions.isEmpty)
+ throw new LeaderNotAvailableException("No leader for any partition")
+ val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size
+ availablePartitions(index).partitionId
+ } else
partitioner.partition(key, numPartitions)
if(partition < 0 || partition >= numPartitions)
throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition +

0 comments on commit b71e6dc

Please sign in to comment.