Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream fixes #5

Merged
merged 21 commits into from
Jul 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions app/assets/stylesheets/index.less
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,60 @@

.glow-red {
outline: none;
border-color: #ff6c47;
box-shadow: 0 0 10px #ff6c47;
border-color: #ffd1d1;
box-shadow: 0 0 10px #ffd1d1;
border-style: solid;
}

.assignment-pane {
margin: 0 auto;
}

.assignment-cell {
margin: 1% 1% 1% 1%;
border-style: solid;
border-width: thin;
padding: inherit;
padding-top: 0;
border-color: rgb(190, 190, 190);
text-align: center;
}

.assignment-cell h4 {
text-align: center;
}

.partition-cell {
display: inline-block;
vertical-align: top;
border-style: solid;
border-width: thin;
border-color: rgb(200, 200, 200);
padding: 1% 1% 1% 1%;
margin: 0 0 0 0;
text-align: left;
}

.borderless {
border: hidden;
width: 25%;
}

.sub-heading {
padding-top: 0;
padding-bottom: 0;
text-align: center;
}

.sub-heading input {
width: 50%;
}

.btn-group-vertical button .glyphicon {
float: left;
}

#selectMetrics {
width: 100%;
margin-bottom: 3%;
}
116 changes: 110 additions & 6 deletions app/controllers/ReassignPartitions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

package controllers

import kafka.manager.ActorModel.TopicList
import kafka.manager.ApiError
import kafka.manager.ActorModel._
import kafka.manager.{BrokerListExtended, ApiError, TopicListExtended}
import models.navigation.Menus
import models.{navigation, FollowLink}
import models.form._
Expand All @@ -16,7 +16,7 @@ import play.api.data.validation.{Valid, Invalid, Constraint}
import play.api.mvc._

import scala.concurrent.Future
import scalaz.{\/-, -\/}
import scalaz.{\/, \/-, -\/}

/**
* @author hiral
Expand All @@ -33,6 +33,7 @@ object ReassignPartitions extends Controller{
case any: Any => Invalid(s"Invalid operation value: $any")
}


val reassignPartitionsForm = Form(
mapping(
"operation" -> nonEmptyText.verifying(validateOperation)
Expand All @@ -49,7 +50,21 @@ object ReassignPartitions extends Controller{
}
)(RunMultipleAssignments.apply)(RunMultipleAssignments.unapply)
)


val manualReassignmentForm: Form[List[(String, List[(Int, List[Int])])]] = Form(
"topics" -> list (
tuple (
"topic" -> text,
"assignments" -> list (
tuple (
"partition" -> number,
"brokers" -> list(number)
)
)
)
)
)

val generateAssignmentsForm = Form(
mapping(
"brokers" -> seq {
Expand Down Expand Up @@ -103,8 +118,7 @@ object ReassignPartitions extends Controller{
}

def confirmMultipleAssignments(c: String) = Action.async {
val topicList = kafkaManager.getTopicList(c)
topicList.flatMap { errOrTL =>
kafkaManager.getTopicList(c).flatMap { errOrTL =>
errOrTL.fold(
{ err: ApiError =>
Future.successful( Ok(views.html.topic.confirmMultipleAssignments( c, -\/(err) )))
Expand All @@ -119,6 +133,96 @@ object ReassignPartitions extends Controller{
}
}

def manualMultipleAssignments(c: String): Action[AnyContent] = Action.async {
val topicList = kafkaManager.getTopicListExtended(c)
val brokersViews = kafkaManager.getBrokersView(c)

def flattenedTopicListExtended(topicListExtended: TopicListExtended) = {
topicListExtended.list.map {
case (topic, Some(topicIdentity)) =>
(topic, topicIdentity.partitionsIdentity.toList.map { case (partition, identity) =>
(partition, identity.replicas.toList)
})
case (topic, None) => (topic, List[(Int, List[Int])]())
} toList
}

topicList.flatMap { errOrTL =>
errOrTL.fold(
{ err: ApiError =>
Future.successful( Ok(views.html.topic.confirmMultipleAssignments( c, -\/(err) )))
},
{ topics: TopicListExtended =>
kafkaManager.getBrokerList(c).flatMap { errOrCV =>
errOrCV.fold(
{err: ApiError =>
Future.successful( Ok(views.html.topic.confirmMultipleAssignments( c, -\/(err) )))
},
{ brokers: BrokerListExtended => {
brokersViews.flatMap { errorOrBVs =>
errorOrBVs.fold (
{err: ApiError => Future.successful( Ok(views.html.topic.confirmMultipleAssignments( c, -\/(err) )))},
{bVs: Seq[BVView] => Future {
Ok(views.html.topic.manualMultipleAssignments(
c, manualReassignmentForm.fill(flattenedTopicListExtended(topics)), brokers , bVs, manualReassignmentForm.errors
))
}}
)
}
}
}
)
}
}
)
}
}

def handleManualAssignment(c: String) = Action.async { implicit request =>
def validateAssignment(assignment: List[(String, List[(Int, List[Int])])]) = {
(for {
(topic, assign) <- assignment
(partition, replicas) <- assign
} yield {
replicas.size == replicas.toSet.size
}) forall { b => b }
}

def responseScreen(title: String, errorOrResult: \/[IndexedSeq[ApiError], Unit]) = {
Ok(views.html.common.resultsOfCommand(
views.html.navigation.clusterMenu(c, title, "", Menus.clusterMenus(c)),
models.navigation.BreadCrumbs.withNamedViewAndClusterAndTopic("Manual Reassignment View", c, "", title),
errorOrResult,
title,
FollowLink("Go to topic list.", routes.Topic.topics(c).toString()),
FollowLink("Try again.", routes.Topic.topics(c).toString())
))
}

manualReassignmentForm.bindFromRequest.fold (
errors => kafkaManager.getClusterList.map { errorOrClusterList =>
responseScreen(
"Manual Reassign Partitions Failure",
-\/(IndexedSeq(ApiError("There is something really wrong with your submitted data!")))
)
},
assignment => {
if (validateAssignment(assignment)) {
kafkaManager.manualPartitionAssignments(c, assignment).map { errorOrClusterList =>
responseScreen("Manual Partitions Reassignment Successful", errorOrClusterList)
}
} else {
Future {
responseScreen(
"Manual Partitions Reassignment Failure",
-\/(IndexedSeq(ApiError("You cannot (or at least should not) assign two replicas of the same partition to the same broker!!")))
)
}
}
}
)
}

def handleGenerateAssignment(c: String, t: String) = Action.async { implicit request =>
generateAssignmentsForm.bindFromRequest.fold(
errors => Future.successful( Ok(views.html.topic.confirmAssignment( c, t, \/-(errors) ))),
Expand Down
11 changes: 8 additions & 3 deletions app/kafka/manager/ActorModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ object ActorModel {
case object BVForceUpdate extends CommandRequest
case object BVGetTopicIdentities extends BVRequest
case class BVGetView(id: Int) extends BVRequest
case object BVGetViews extends BVRequest
case class BVGetTopicMetrics(topic: String) extends BVRequest
case object BVGetBrokerMetrics extends BVRequest
case class BVView(topicPartitions: Map[TopicIdentity, IndexedSeq[Int]], clusterConfig: ClusterConfig,
Expand Down Expand Up @@ -64,6 +65,7 @@ object ActorModel {
case class CMRunPreferredLeaderElection(topics: Set[String]) extends CommandRequest
case class CMRunReassignPartition(topics: Set[String]) extends CommandRequest
case class CMGeneratePartitionAssignments(topics: Set[String], brokers: Seq[Int]) extends CommandRequest
case class CMManualPartitionAssignments(assignments: List[(String, List[(Int, List[Int])])]) extends CommandRequest


case class CMCommandResult(result: Try[Unit]) extends CommandResponse
Expand Down Expand Up @@ -314,15 +316,17 @@ object ActorModel {
bytesRejectedPerSec: MeterMetric,
failedFetchRequestsPerSec: MeterMetric,
failedProduceRequestsPerSec: MeterMetric,
messagesInPerSec: MeterMetric) {
messagesInPerSec: MeterMetric,
oSystemMetrics: OSMetric) {
def +(o: BrokerMetrics) : BrokerMetrics = {
BrokerMetrics(
o.bytesInPerSec + bytesInPerSec,
o.bytesOutPerSec + bytesOutPerSec,
o.bytesRejectedPerSec + bytesRejectedPerSec,
o.failedFetchRequestsPerSec + failedFetchRequestsPerSec,
o.failedProduceRequestsPerSec + failedProduceRequestsPerSec,
o.messagesInPerSec + messagesInPerSec)
o.messagesInPerSec + messagesInPerSec,
oSystemMetrics)
}

}
Expand All @@ -334,7 +338,8 @@ object ActorModel {
MeterMetric(0, 0, 0, 0, 0),
MeterMetric(0, 0, 0, 0, 0),
MeterMetric(0, 0, 0, 0, 0),
MeterMetric(0, 0, 0, 0, 0))
MeterMetric(0, 0, 0, 0, 0),
OSMetric(0D, 0D))
}

case class BrokerClusterStats(perMessages: BigDecimal, perIncoming: BigDecimal, perOutgoing: BigDecimal)
Expand Down
71 changes: 45 additions & 26 deletions app/kafka/manager/BrokerViewCacheActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
new mutable.HashMap[String, mutable.Map[Int, BrokerMetrics]]()

private[this] var combinedBrokerMetric : Option[BrokerMetrics] = None

private[this] val EMPTY_BVVIEW = BVView(Map.empty, config.clusterConfig, Option(BrokerMetrics.DEFAULT))

override def preStart() = {
log.info("Started actor %s".format(self.path))
Expand All @@ -65,6 +67,40 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
log.error("Long running pool queue full, skipping!")
}

private def produceBViewWithBrokerClusterState(bv: BVView) : BVView = {
val bcs = for {
metrics <- bv.metrics
cbm <- combinedBrokerMetric
} yield {
val perMessages = if(cbm.messagesInPerSec.oneMinuteRate > 0) {
BigDecimal(metrics.messagesInPerSec.oneMinuteRate / cbm.messagesInPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP)
} else ZERO
val perIncoming = if(cbm.bytesInPerSec.oneMinuteRate > 0) {
BigDecimal(metrics.bytesInPerSec.oneMinuteRate / cbm.bytesInPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP)
} else ZERO
val perOutgoing = if(cbm.bytesOutPerSec.oneMinuteRate > 0) {
BigDecimal(metrics.bytesOutPerSec.oneMinuteRate / cbm.bytesOutPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP)
} else ZERO
BrokerClusterStats(perMessages, perIncoming, perOutgoing)
}
if(bcs.isDefined) {
bv.copy(stats=bcs)
} else {
bv
}
}

private def allBrokerViews(): Seq[BVView] = {
var bvs = mutable.MutableList[BVView]()
for (key <- brokerTopicPartitions.keySet.toSeq.sorted) {
val bv = brokerTopicPartitions.get(key).map { bv => produceBViewWithBrokerClusterState(bv) }
if (bv.isDefined) {
bvs += bv.get
}
}
bvs.asInstanceOf[Seq[BVView]]
}

override def processActorRequest(request: ActorRequest): Unit = {
request match {
case BVForceUpdate =>
Expand All @@ -74,28 +110,13 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
context.actorSelection(config.kafkaStateActorPath).tell(KSGetAllTopicDescriptions(lastUpdateMillisOption), self)
context.actorSelection(config.kafkaStateActorPath).tell(KSGetBrokers, self)

case BVGetViews =>
sender ! allBrokerViews()


case BVGetView(id) =>
sender ! brokerTopicPartitions.get(id).map { bv =>
val bcs = for {
metrics <- bv.metrics
cbm <- combinedBrokerMetric
} yield {
val perMessages = if(cbm.messagesInPerSec.oneMinuteRate > 0) {
BigDecimal(metrics.messagesInPerSec.oneMinuteRate / cbm.messagesInPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP)
} else ZERO
val perIncoming = if(cbm.bytesInPerSec.oneMinuteRate > 0) {
BigDecimal(metrics.bytesInPerSec.oneMinuteRate / cbm.bytesInPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP)
} else ZERO
val perOutgoing = if(cbm.bytesOutPerSec.oneMinuteRate > 0) {
BigDecimal(metrics.bytesOutPerSec.oneMinuteRate / cbm.bytesOutPerSec.oneMinuteRate * 100D).setScale(3, BigDecimal.RoundingMode.HALF_UP)
} else ZERO
BrokerClusterStats(perMessages, perIncoming, perOutgoing)
}
if(bcs.isDefined) {
bv.copy(stats=bcs)
} else {
bv
}
produceBViewWithBrokerClusterState(bv)
}

case BVGetBrokerMetrics =>
Expand All @@ -118,11 +139,9 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
case BVUpdateBrokerMetrics(id, metrics) =>
brokerMetrics += (id -> metrics)
combinedBrokerMetric = Option(brokerMetrics.values.foldLeft(BrokerMetrics.DEFAULT)((acc, m) => acc + m))
for {
bv <- brokerTopicPartitions.get(id)
} {
brokerTopicPartitions.put(id, bv.copy(metrics = Option(metrics)))
}

val updatedBVView = brokerTopicPartitions.getOrElse(id, EMPTY_BVVIEW).copy(metrics = Option(metrics))
brokerTopicPartitions.put(id, updatedBVView)

case any: Any => log.warning("bvca : processActorRequest : Received unknown message: {}", any)
}
Expand Down Expand Up @@ -209,7 +228,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
} else if(config.clusterConfig.jmxEnabled) {
log.warning("Not scheduling update of JMX for all brokers, not enough capacity!")
}

topicPartitionByBroker.foreach {
case (brokerId, topicPartitions) =>
val topicPartitionsMap : Map[TopicIdentity, IndexedSeq[Int]] = topicPartitions.map {
Expand Down
Loading