Permalink
Browse files

KAFKA-513 Add state change log to Kafka brokers; reviewed by Neha Nar…

…khede
  • Loading branch information...
Swapnil Ghike authored and nehanarkhede committed Mar 6, 2013
1 parent 2e64c6a commit 2457bc49ef39c70622816250025eefc3bfcc7640
Showing with 414 additions and 136 deletions.
  1. +1 −1 bin/kafka-run-class.sh
  2. +12 −1 config/log4j.properties
  3. +2 −2 core/src/main/scala/kafka/api/FetchRequest.scala
  4. +13 −8 core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
  5. +2 −2 core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
  6. +2 −2 core/src/main/scala/kafka/api/OffsetRequest.scala
  7. +2 −2 core/src/main/scala/kafka/api/OffsetResponse.scala
  8. +2 −2 core/src/main/scala/kafka/api/ProducerRequest.scala
  9. +3 −2 core/src/main/scala/kafka/api/ProducerResponse.scala
  10. +1 −1 core/src/main/scala/kafka/api/RequestOrResponse.scala
  11. +2 −2 core/src/main/scala/kafka/api/StopReplicaRequest.scala
  12. +3 −2 core/src/main/scala/kafka/api/StopReplicaResponse.scala
  13. +2 −2 core/src/main/scala/kafka/api/TopicMetadataRequest.scala
  14. +2 −1 core/src/main/scala/kafka/api/TopicMetadataResponse.scala
  15. +21 −16 core/src/main/scala/kafka/cluster/Partition.scala
  16. +1 −1 core/src/main/scala/kafka/common/Topic.scala
  17. +22 −12 core/src/main/scala/kafka/controller/ControllerChannelManager.scala
  18. +2 −2 core/src/main/scala/kafka/controller/KafkaController.scala
  19. +51 −30 core/src/main/scala/kafka/controller/PartitionStateMachine.scala
  20. +20 −13 core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
  21. +5 −2 core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
  22. +47 −24 core/src/main/scala/kafka/server/ReplicaManager.scala
  23. +188 −0 core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
  24. +1 −1 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
  25. +7 −5 core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
View
@@ -80,4 +80,4 @@ else
JAVA="$JAVA_HOME/bin/java"
fi
-$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@
+$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH "$@"
View
@@ -36,6 +36,12 @@ log4j.appender.requestAppender.File=kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.controllerAppender.File=controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
# Turn on all our debugging info
#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
@@ -53,5 +59,10 @@ log4j.additivity.kafka.network.RequestChannel$=false
log4j.logger.kafka.request.logger=TRACE, requestAppender
log4j.additivity.kafka.request.logger=false
-log4j.logger.kafka.controller=TRACE, stateChangeAppender
+log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false
+
+log4j.logger.state.change.logger=TRACE, stateChangeAppender
+log4j.additivity.state.change.logger=false
+
+
@@ -59,13 +59,13 @@ object FetchRequest {
}
case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion,
- correlationId: Int = FetchRequest.DefaultCorrelationId,
+ override val correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId,
maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes,
requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
- extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
+ extends RequestOrResponse(Some(RequestKeys.FetchKey), correlationId) {
/**
* Partitions the request info into a map of maps (one for each topic).
@@ -93,6 +93,7 @@ object LeaderAndIsrRequest {
val correlationId = buffer.getInt
val clientId = readShortString(buffer)
val ackTimeoutMs = buffer.getInt
+ val controllerId = buffer.getInt
val controllerEpoch = buffer.getInt
val partitionStateInfosCount = buffer.getInt
val partitionStateInfos = new collection.mutable.HashMap[(String, Int), PartitionStateInfo]
@@ -110,30 +111,32 @@ object LeaderAndIsrRequest {
for (i <- 0 until leadersCount)
leaders += Broker.readFrom(buffer)
- new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch)
+ new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, controllerId, controllerEpoch, partitionStateInfos.toMap, leaders)
}
}
case class LeaderAndIsrRequest (versionId: Short,
- correlationId: Int,
+ override val correlationId: Int,
clientId: String,
ackTimeoutMs: Int,
+ controllerId: Int,
+ controllerEpoch: Int,
partitionStateInfos: Map[(String, Int), PartitionStateInfo],
- leaders: Set[Broker],
- controllerEpoch: Int)
- extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
+ leaders: Set[Broker])
+ extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) {
- def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker],
+ def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerId: Int,
controllerEpoch: Int, correlationId: Int) = {
this(LeaderAndIsrRequest.CurrentVersion, correlationId, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
- partitionStateInfos, liveBrokers, controllerEpoch)
+ controllerId, controllerEpoch, partitionStateInfos, liveBrokers)
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putInt(correlationId)
writeShortString(buffer, clientId)
buffer.putInt(ackTimeoutMs)
+ buffer.putInt(controllerId)
buffer.putInt(controllerEpoch)
buffer.putInt(partitionStateInfos.size)
for((key, value) <- partitionStateInfos){
@@ -151,6 +154,7 @@ case class LeaderAndIsrRequest (versionId: Short,
4 /* correlation id */ +
(2 + clientId.length) /* client id */ +
4 /* ack timeout */ +
+ 4 /* controller id */ +
4 /* controller epoch */ +
4 /* number of partitions */
for((key, value) <- partitionStateInfos)
@@ -165,10 +169,11 @@ case class LeaderAndIsrRequest (versionId: Short,
val leaderAndIsrRequest = new StringBuilder
leaderAndIsrRequest.append("Name: " + this.getClass.getSimpleName)
leaderAndIsrRequest.append("; Version: " + versionId)
+ leaderAndIsrRequest.append("; Controller: " + controllerId)
+ leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch)
leaderAndIsrRequest.append("; CorrelationId: " + correlationId)
leaderAndIsrRequest.append("; ClientId: " + clientId)
leaderAndIsrRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
- leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch)
leaderAndIsrRequest.append("; PartitionStateInfo: " + partitionStateInfos.mkString(","))
leaderAndIsrRequest.append("; Leaders: " + leaders.mkString(","))
leaderAndIsrRequest.toString()
@@ -41,10 +41,10 @@ object LeaderAndIsrResponse {
}
-case class LeaderAndIsrResponse(correlationId: Int,
+case class LeaderAndIsrResponse(override val correlationId: Int,
responseMap: Map[(String, Int), Short],
errorCode: Short = ErrorMapping.NoError)
- extends RequestOrResponse {
+ extends RequestOrResponse(correlationId = correlationId) {
def sizeInBytes(): Int ={
var size =
4 /* correlation id */ +
@@ -57,10 +57,10 @@ case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int)
case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo],
versionId: Short = OffsetRequest.CurrentVersion,
- correlationId: Int = 0,
+ override val correlationId: Int = 0,
clientId: String = OffsetRequest.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId)
- extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
+ extends RequestOrResponse(Some(RequestKeys.OffsetsKey), correlationId) {
def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], correlationId: Int, replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, correlationId, OffsetRequest.DefaultClientId, replicaId)
@@ -47,9 +47,9 @@ object OffsetResponse {
case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long])
-case class OffsetResponse(correlationId: Int,
+case class OffsetResponse(override val correlationId: Int,
partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse])
- extends RequestOrResponse {
+ extends RequestOrResponse(correlationId = correlationId) {
lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic)
@@ -54,12 +54,12 @@ object ProducerRequest {
}
case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
- correlationId: Int,
+ override val correlationId: Int,
clientId: String,
requiredAcks: Short,
ackTimeoutMs: Int,
data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])
- extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
+ extends RequestOrResponse(Some(RequestKeys.ProduceKey), correlationId) {
/**
* Partitions the data into a map of maps (one for each topic).
@@ -43,8 +43,9 @@ object ProducerResponse {
case class ProducerResponseStatus(error: Short, offset: Long)
-case class ProducerResponse(correlationId: Int,
- status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse {
+case class ProducerResponse(override val correlationId: Int,
+ status: Map[TopicAndPartition, ProducerResponseStatus])
+ extends RequestOrResponse(correlationId = correlationId) {
/**
* Partitions the status map into a map of maps (one for each topic).
@@ -27,7 +27,7 @@ object Request {
}
-private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) extends Logging{
+private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None, val correlationId: Int) extends Logging{
def sizeInBytes: Int
@@ -53,13 +53,13 @@ object StopReplicaRequest extends Logging {
}
case class StopReplicaRequest(versionId: Short,
- correlationId: Int,
+ override val correlationId: Int,
clientId: String,
ackTimeoutMs: Int,
deletePartitions: Boolean,
partitions: Set[(String, Int)],
controllerEpoch: Int)
- extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
+ extends RequestOrResponse(Some(RequestKeys.StopReplicaKey), correlationId) {
def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int, correlationId: Int) = {
this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
@@ -42,9 +42,10 @@ object StopReplicaResponse {
}
-case class StopReplicaResponse(val correlationId: Int,
+case class StopReplicaResponse(override val correlationId: Int,
val responseMap: Map[(String, Int), Short],
- val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
+ val errorCode: Short = ErrorMapping.NoError)
+ extends RequestOrResponse(correlationId = correlationId) {
def sizeInBytes(): Int ={
var size =
4 /* correlation id */ +
@@ -47,10 +47,10 @@ object TopicMetadataRequest extends Logging {
}
case class TopicMetadataRequest(val versionId: Short,
- val correlationId: Int,
+ override val correlationId: Int,
val clientId: String,
val topics: Seq[String])
- extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
+ extends RequestOrResponse(Some(RequestKeys.MetadataKey), correlationId){
def this(topics: Seq[String], correlationId: Int) =
this(TopicMetadataRequest.CurrentVersion, correlationId, TopicMetadataRequest.DefaultClientId, topics)
@@ -34,7 +34,8 @@ object TopicMetadataResponse {
}
case class TopicMetadataResponse(topicsMetadata: Seq[TopicMetadata],
- correlationId: Int) extends RequestOrResponse {
+ override val correlationId: Int)
+ extends RequestOrResponse(correlationId = correlationId) {
val sizeInBytes: Int = {
val brokers = extractBrokers(topicsMetadata).values
4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum
@@ -25,6 +25,7 @@ import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
import kafka.common.ErrorMapping
import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
+import org.apache.log4j.Logger
/**
@@ -51,7 +52,8 @@ class Partition(val topic: String,
* In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
* each partition. */
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
- this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId)
+ this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
+ private val stateChangeLogger = Logger.getLogger("state.change.logger")
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
@@ -124,15 +126,17 @@ class Partition(val topic: String,
* 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
* 4. set the new leader and ISR
*/
- def makeLeader(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Boolean = {
+ def makeLeader(controllerId: Int, topic: String, partitionId: Int,
+ leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = {
leaderIsrUpdateLock synchronized {
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
if (leaderEpoch >= leaderAndIsr.leaderEpoch){
- info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become leader request"
- .format(leaderEpoch, leaderAndIsr.leaderEpoch))
+ stateChangeLogger.trace(("Broker %d discarded the become-leader request with correlation id %d from " +
+ "controller %d epoch %d for partition [%s,%d] since current leader epoch %d is >= the request's leader epoch %d")
+ .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, topic,
+ partitionId, leaderEpoch, leaderAndIsr.leaderEpoch))
return false
}
- trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
@@ -159,22 +163,21 @@ class Partition(val topic: String,
* 3. set the leader and set ISR to empty
* 4. start a fetcher to the new leader
*/
- def makeFollower(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
- liveBrokers: Set[Broker]): Boolean = {
+ def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+ liveBrokers: Set[Broker], correlationId: Int): Boolean = {
leaderIsrUpdateLock synchronized {
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
- if (leaderEpoch >= leaderAndIsr.leaderEpoch){
- info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follower request"
- .format(leaderEpoch, leaderAndIsr.leaderEpoch))
+ if (leaderEpoch >= leaderAndIsr.leaderEpoch) {
+ stateChangeLogger.trace(("Broker %d discarded the become-follower request with correlation id %d from " +
+ "controller %d epoch %d for partition [%s,%d] since current leader epoch %d is >= the request's leader epoch %d")
+ .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, topic,
+ partitionId, leaderEpoch, leaderAndIsr.leaderEpoch))
return false
}
- trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
val newLeaderBrokerId: Int = leaderAndIsr.leader
- info("Starting the follower state transition to follow leader %d for topic %s partition %d"
- .format(newLeaderBrokerId, topic, partitionId))
liveBrokers.find(_.id == newLeaderBrokerId) match {
case Some(leaderBroker) =>
// stop fetcher thread to previous leader
@@ -189,16 +192,18 @@ class Partition(val topic: String,
// start fetcher thread to current leader
replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
case None => // leader went down
- warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId, newLeaderBrokerId) +
- " topic %s partition %d became unavailble during the state change operation".format(topic, partitionId))
+ stateChangeLogger.trace("Broker %d aborted the become-follower state change with correlation id %d from " +
+ " controller %d epoch %d since leader %d for partition [%s,%d] became unavailable during the state change operation"
+ .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
+ newLeaderBrokerId, topic, partitionId))
}
true
}
}
def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
leaderIsrUpdateLock synchronized {
- debug("Recording follower %d position %d for topic %s partition %d.".format(replicaId, offset, topic, partitionId))
+ debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId))
val replica = getOrCreateReplica(replicaId)
replica.logEndOffset = offset
@@ -20,7 +20,7 @@ package kafka.common
import util.matching.Regex
object Topic {
- private val legalChars = "[a-zA-Z0-9\\._\\-]"
+ val legalChars = "[a-zA-Z0-9\\._\\-]"
private val maxNameLength = 255
private val rgx = new Regex(legalChars + "+")
Oops, something went wrong.

0 comments on commit 2457bc4

Please sign in to comment.