Permalink
Browse files

KAFKA-800 inSyncReplica in Partition needs some tweaks; reviewed by J…

…un Rao and Neha Narkhede
  • Loading branch information...
1 parent da1dc17 commit 485afe646af282c59927b177ddc70742349cdad8 @sriramsub sriramsub committed with nehanarkhede Mar 12, 2013
@@ -67,7 +67,9 @@ class Partition(val topic: String,
)
def isUnderReplicated(): Boolean = {
- inSyncReplicas.size < replicationFactor
+ leaderIsrUpdateLock synchronized {
+ inSyncReplicas.size < replicationFactor
+ }
}
def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
@@ -339,12 +341,14 @@ class Partition(val topic: String,
}
override def toString(): String = {
- val partitionString = new StringBuilder
- partitionString.append("Topic: " + topic)
- partitionString.append("; Partition: " + partitionId)
- partitionString.append("; Leader: " + leaderReplicaIdOpt)
- partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
- partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
- partitionString.toString()
+ leaderIsrUpdateLock synchronized {
+ val partitionString = new StringBuilder
+ partitionString.append("Topic: " + topic)
+ partitionString.append("; Partition: " + partitionId)
+ partitionString.append("; Leader: " + leaderReplicaIdOpt)
+ partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
+ partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+ partitionString.toString()
+ }
}
}
@@ -57,7 +57,11 @@ class ReplicaManager(val config: KafkaConfig,
newGauge(
"LeaderCount",
new Gauge[Int] {
- def getValue = leaderPartitions.size
+ def getValue = {
+ leaderPartitionsLock synchronized {
+ leaderPartitions.size
+ }
+ }
}
)
newGauge(

0 comments on commit 485afe6

Please sign in to comment.