Skip to content

Commit

Permalink
Merge pull request #1 from yahoo/master
Browse files Browse the repository at this point in the history
changes from upstream
  • Loading branch information
joestein committed Jul 1, 2015
2 parents 97bad8d + ac71562 commit eeab67d
Show file tree
Hide file tree
Showing 34 changed files with 410 additions and 120 deletions.
8 changes: 6 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
language: scala
sudo: false
jdk: oraclejdk8
install: true
script: "./sbt clean test"
script: "./sbt clean coverage assembly"
scala:
- 2.11.5
- 2.11.5
#after_success:
# - sbt coverageReport coveralls
2 changes: 1 addition & 1 deletion app/GlobalKafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object GlobalKafkaManager extends GlobalSettings {

override def beforeStart(app: Application): Unit = {
Logger.info("Init kafka manager...")
KafkaManagerContext.getKafkaManger
KafkaManagerContext.getKafkaManager
Thread.sleep(5000)
}

Expand Down
2 changes: 1 addition & 1 deletion app/controllers/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object Application extends Controller {

import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManger
private[this] val kafkaManager = KafkaManagerContext.getKafkaManager

def index = Action.async {
kafkaManager.getClusterList.map { errorOrClusterList =>
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scalaz.{-\/, \/-}
object Cluster extends Controller {
import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManger
private[this] val kafkaManager = KafkaManagerContext.getKafkaManager

val validateName : Constraint[String] = Constraint("validate name") { name =>
Try {
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/KafkaManagerContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object KafkaManagerContext {
import play.api.Play.current

private[this] val kafkaManager : KafkaManager = new KafkaManager(play.api.Play.configuration.underlying)
def getKafkaManger : KafkaManager = kafkaManager
def getKafkaManager : KafkaManager = kafkaManager
def shutdown() : Unit = {
kafkaManager.shutdown()
}
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/PreferredReplicaElection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scalaz.-\/
object PreferredReplicaElection extends Controller{
import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManger
private[this] val kafkaManager = KafkaManagerContext.getKafkaManager


val validateOperation : Constraint[String] = Constraint("validate operation value") {
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/ReassignPartitions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scalaz.{\/-, -\/}
object ReassignPartitions extends Controller{
import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManger
private[this] val kafkaManager = KafkaManagerContext.getKafkaManager

val validateOperation : Constraint[String] = Constraint("validate operation value") {
case "confirm" => Valid
Expand Down
11 changes: 7 additions & 4 deletions app/controllers/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.Properties

import kafka.manager.ActorModel.TopicIdentity
import kafka.manager.utils.TopicConfigs
import kafka.manager.{ApiError, Kafka_0_8_2_0, Kafka_0_8_1_1}
import kafka.manager.{Kafka_0_8_2_1, ApiError, Kafka_0_8_2_0, Kafka_0_8_1_1}
import models.FollowLink
import models.form._
import models.navigation.Menus
Expand All @@ -29,7 +29,7 @@ import scalaz.{\/-, -\/}
object Topic extends Controller{
import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManger
private[this] val kafkaManager = KafkaManagerContext.getKafkaManager

val validateName : Constraint[String] = Constraint("validate name") { name =>
Try {
Expand All @@ -42,7 +42,8 @@ object Topic extends Controller{

val kafka_0_8_1_1_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_8_1_1).map(n => TConfig(n,None)).toList)
val kafka_0_8_2_0_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_8_2_0).map(n => TConfig(n,None)).toList)

val kafka_0_8_2_1_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_8_2_1).map(n => TConfig(n,None)).toList)

val defaultCreateForm = Form(
mapping(
"topic" -> nonEmptyText.verifying(maxLength(250), validateName),
Expand Down Expand Up @@ -97,13 +98,14 @@ object Topic extends Controller{
clusterConfig.version match {
case Kafka_0_8_1_1 => defaultCreateForm.fill(kafka_0_8_1_1_Default)
case Kafka_0_8_2_0 => defaultCreateForm.fill(kafka_0_8_2_0_Default)
case Kafka_0_8_2_1 => defaultCreateForm.fill(kafka_0_8_2_1_Default)
}
}
}
}

def topics(c: String) = Action.async {
kafkaManager.getTopicListWithMoreInfo(c).map { errorOrTopicList =>
kafkaManager.getTopicListExtended(c).map { errorOrTopicList =>
Ok(views.html.topic.topicList(c,errorOrTopicList))
}
}
Expand Down Expand Up @@ -201,6 +203,7 @@ object Topic extends Controller{
val defaultConfigMap = clusterConfig.version match {
case Kafka_0_8_1_1 => TopicConfigs.configNames(Kafka_0_8_1_1).map(n => (n,TConfig(n,None))).toMap
case Kafka_0_8_2_0 => TopicConfigs.configNames(Kafka_0_8_2_0).map(n => (n,TConfig(n,None))).toMap
case Kafka_0_8_2_1 => TopicConfigs.configNames(Kafka_0_8_2_1).map(n => (n,TConfig(n,None))).toMap
}
val combinedMap = defaultConfigMap ++ ti.config.toMap.map(tpl => tpl._1 -> TConfig(tpl._1,Option(tpl._2)))
defaultUpdateConfigForm.fill(UpdateTopicConfig(ti.topic,combinedMap.toList.map(_._2),ti.configReadVersion))
Expand Down
45 changes: 45 additions & 0 deletions app/controllers/api/KafkaHealthCheck.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
* See accompanying LICENSE file.
*/

package controllers.api

import controllers.KafkaManagerContext
import play.api.libs.json._
import play.api.mvc._

object KafkaHealthCheck extends Controller {

import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManager

def availableBrokers(c: String) = Action.async { implicit request =>
kafkaManager.getBrokerList(c).map { errorOrBrokerList =>
errorOrBrokerList.fold(
error => BadRequest(Json.obj("msg" -> error.msg)),
brokerList => Ok(Json.obj("availableBrokers" -> brokerList.list.map(bi => bi.id)))
)
}
}

def underReplicatedPartitions(c: String, t: String) = Action.async { implicit request =>
kafkaManager.getTopicIdentity(c,t).map { errorOrTopicIdentity =>
errorOrTopicIdentity.fold(
error => BadRequest(Json.obj("msg" -> error.msg)),
topicIdentity => Ok(Json.obj("topic" -> t, "underReplicatedPartitions" -> topicIdentity.partitionsIdentity.filter(_._2.isUnderReplicated).map{case (num, pi) => pi.partNum}))
)
}
}

def unavailablePartitions(c: String, t: String) = Action.async { implicit request =>
kafkaManager.getTopicIdentity(c,t).map { errorOrTopicIdentity =>
errorOrTopicIdentity.fold(
error => BadRequest(Json.obj("msg" -> error.msg)),
topicIdentity => Ok(Json.obj("topic" -> t, "unavailablePartitions" -> topicIdentity.partitionsIdentity.filter(_._2.isr.isEmpty).map{case (num, pi) => pi.partNum}))
)
}
}
}

18 changes: 10 additions & 8 deletions app/kafka/manager/ActorModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ object ActorModel {
case class BVGetView(id: Int) extends BVRequest
case class BVGetTopicMetrics(topic: String) extends BVRequest
case object BVGetBrokerMetrics extends BVRequest
case class BVView(topicPartitions: Map[TopicIdentity, IndexedSeq[Int]],
metrics: Option[BrokerMetrics] = None,
case class BVView(topicPartitions: Map[TopicIdentity, IndexedSeq[Int]], clusterConfig: ClusterConfig,
metrics: Option[BrokerMetrics] = None,
stats: Option[BrokerClusterStats] = None) extends QueryResponse {
def numTopics : Int = topicPartitions.size
def numPartitions : Int = topicPartitions.values.foldLeft(0)((acc,i) => acc + i.size)
Expand Down Expand Up @@ -134,7 +134,7 @@ object ActorModel {
deleteSupported: Boolean) extends QueryResponse
case class TopicDescriptions(descriptions: IndexedSeq[TopicDescription], lastUpdateMillis: Long) extends QueryResponse

case class BrokerList(list: IndexedSeq[BrokerIdentity]) extends QueryResponse
case class BrokerList(list: IndexedSeq[BrokerIdentity], clusterConfig: ClusterConfig) extends QueryResponse

case class PreferredReplicaElection(startTime: DateTime, topicAndPartition: Set[TopicAndPartition], endTime: Option[DateTime]) extends QueryResponse
case class ReassignPartitions(startTime: DateTime, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], endTime: Option[DateTime]) extends QueryResponse
Expand Down Expand Up @@ -200,7 +200,8 @@ object ActorModel {
numBrokers: Int,
configReadVersion: Int,
config: List[(String,String)],
deleteSupported: Boolean,
deleteSupported: Boolean,
clusterConfig: ClusterConfig,
metrics: Option[BrokerMetrics] = None) {

val replicationFactor : Int = partitionsIdentity.head._2.replicas.size
Expand Down Expand Up @@ -248,7 +249,7 @@ object ActorModel {
import org.json4s.scalaz.JsonScalaz._
import scala.language.reflectiveCalls

implicit def from(brokers: Int,td: TopicDescription, tm: Option[BrokerMetrics]) : TopicIdentity = {
implicit def from(brokers: Int,td: TopicDescription, tm: Option[BrokerMetrics], clusterConfig: ClusterConfig) : TopicIdentity = {
val descJson = parse(td.description._2)
//val partMap = (descJson \ "partitions").as[Map[String,Seq[Int]]]
val partMap = field[Map[String,List[Int]]]("partitions")(descJson).fold({ e =>
Expand Down Expand Up @@ -276,11 +277,11 @@ object ActorModel {
(-1,Map.empty[String, String])
}
}
TopicIdentity(td.topic,td.description._1,partMap.size,tpi,brokers,config._1,config._2.toList,td.deleteSupported, tm)
TopicIdentity(td.topic,td.description._1,partMap.size,tpi,brokers,config._1,config._2.toList,td.deleteSupported, clusterConfig, tm)
}

implicit def from(bl: BrokerList,td: TopicDescription, tm: Option[BrokerMetrics]) : TopicIdentity = {
from(bl.list.size, td, tm)
implicit def from(bl: BrokerList,td: TopicDescription, tm: Option[BrokerMetrics], clusterConfig: ClusterConfig) : TopicIdentity = {
from(bl.list.size, td, tm, clusterConfig)
}

implicit def reassignReplicas(currentTopicIdentity: TopicIdentity,
Expand All @@ -302,6 +303,7 @@ object ActorModel {
currentTopicIdentity.configReadVersion,
currentTopicIdentity.config,
currentTopicIdentity.deleteSupported,
currentTopicIdentity.clusterConfig,
currentTopicIdentity.metrics)
}
}
Expand Down
20 changes: 14 additions & 6 deletions app/kafka/manager/BrokerViewCacheActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
log.info("Stopped actor %s".format(self.path))
log.info("Cancelling updater...")
Try(cancellable.map(_.cancel()))
super.postStop()
}

override protected def longRunningPoolConfig: LongRunningPoolConfig = config.longRunningPoolConfig
Expand Down Expand Up @@ -146,11 +147,14 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
brokerList <- brokerListOption
topicDescriptions <- topicDescriptionsOption
} {
val topicIdentity : IndexedSeq[TopicIdentity] = topicDescriptions.descriptions.map(TopicIdentity.from(brokerList.list.size,_,None))
val topicIdentity : IndexedSeq[TopicIdentity] = topicDescriptions.descriptions.map(
TopicIdentity.from(brokerList.list.size,_,None, config.clusterConfig))
topicIdentities = topicIdentity.map(ti => (ti.topic, ti)).toMap
val topicPartitionByBroker = topicIdentity.flatMap(ti => ti.partitionsByBroker.map(btp => (ti,btp.id,btp.partitions))).groupBy(_._2)
val topicPartitionByBroker = topicIdentity.flatMap(
ti => ti.partitionsByBroker.map(btp => (ti,btp.id,btp.partitions))).groupBy(_._2)

if (config.clusterConfig.jmxEnabled) {
//check for 2*broker list size since we schedule 2 jmx calls for each broker
if (config.clusterConfig.jmxEnabled && hasCapacityFor(2*brokerListOption.size)) {
implicit val ec = longRunningExecutionContext
val brokerLookup = brokerList.list.map(bi => bi.id -> bi).toMap
topicPartitionByBroker.foreach {
Expand All @@ -164,7 +168,8 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
mbsc =>
topicPartitions.map {
case (topic, id, partitions) =>
(topic.topic, KafkaMetrics.getBrokerMetrics(mbsc, Option(topic.topic)))
(topic.topic,
KafkaMetrics.getBrokerMetrics(config.clusterConfig.version, mbsc, Option(topic.topic)))
}
}
val result = tryResult match {
Expand All @@ -188,7 +193,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
Future {
val tryResult = KafkaJMX.doWithConnection(broker.host, broker.jmxPort) {
mbsc =>
KafkaMetrics.getBrokerMetrics(mbsc)
KafkaMetrics.getBrokerMetrics(config.clusterConfig.version, mbsc)
}

val result = tryResult match {
Expand All @@ -201,6 +206,8 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
}
}
}
} else if(config.clusterConfig.jmxEnabled) {
log.warning("Not scheduling update of JMX for all brokers, not enough capacity!")
}

topicPartitionByBroker.foreach {
Expand All @@ -209,7 +216,8 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
case (topic, id, partitions) =>
(topic, partitions)
}.toMap
brokerTopicPartitions.put(brokerId,BVView(topicPartitionsMap, brokerMetrics.get(brokerId)))
brokerTopicPartitions.put(
brokerId,BVView(topicPartitionsMap, config.clusterConfig, brokerMetrics.get(brokerId)))
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions app/kafka/manager/ClusterManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)

private[this] val adminUtils = new AdminUtils(cmConfig.clusterConfig.version)

private[this] val ksProps = Props(classOf[KafkaStateActor],sharedClusterCurator, adminUtils.isDeleteSupported)
private[this] val ksProps = Props(classOf[KafkaStateActor],sharedClusterCurator, adminUtils.isDeleteSupported, cmConfig.clusterConfig)
private[this] val kafkaStateActor : ActorPath = context.actorOf(ksProps.withDispatcher(cmConfig.pinnedDispatcherName),"kafka-state").path

private[this] val bvConfig = BrokerViewCacheActorConfig(
Expand Down Expand Up @@ -176,7 +176,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
bl <- eventualBrokerList
tm <- eventualTopicMetrics
tdO <- eventualTopicDescription
} yield tdO.map( td => CMTopicIdentity(Try(TopicIdentity.from(bl,td,tm))))
} yield tdO.map( td => CMTopicIdentity(Try(TopicIdentity.from(bl,td,tm,cmConfig.clusterConfig))))
result pipeTo sender

case any: Any => log.warning("cma : processQueryResponse : Received unknown message: {}", any)
Expand Down Expand Up @@ -263,7 +263,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
val generated: Future[IndexedSeq[(String, Map[Int, Seq[Int]])]] = for {
bl <- eventualBrokerList
tds <- eventualDescriptions
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None))
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None,cmConfig.clusterConfig))
} yield {
bl.list.map(_.id.toInt)
// check if any nonexistent broker got selected for reassignment
Expand Down Expand Up @@ -301,7 +301,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
val preferredLeaderElections = for {
bl <- eventualBrokerList
tds <- eventualDescriptions
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None))
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None, cmConfig.clusterConfig))
toElect = tis.map(ti => ti.partitionsIdentity.values.filter(!_.isPreferredLeader).map(tpi => TopicAndPartition(ti.topic, tpi.partNum))).flatten.toSet
} yield toElect
preferredLeaderElections.map { toElect =>
Expand All @@ -317,7 +317,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
val topicsAndReassignments = for {
bl <- eventualBrokerList
tds <- eventualDescriptions
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None))
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None, cmConfig.clusterConfig))
} yield {
val reassignments = tis.map { ti =>
val topicZkPath = zkPathFrom(baseTopicsZkPath, ti.topic)
Expand Down
4 changes: 1 addition & 3 deletions app/kafka/manager/KafkaCommandActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ class KafkaCommandActor(kafkaCommandActorConfig: KafkaCommandActorConfig) extend

@scala.throws[Exception](classOf[Exception])
override def postStop(): Unit = {
log.info("Shutting down long running executor...")
Try(longRunningExecutor.shutdown())
super.postStop()
}

Expand All @@ -75,7 +73,7 @@ class KafkaCommandActor(kafkaCommandActorConfig: KafkaCommandActorConfig) extend
val result : KCCommandResult = KCCommandResult(Failure(new UnsupportedOperationException(
s"Delete topic not supported for kafka version ${kafkaCommandActorConfig.version}")))
sender ! result
case Kafka_0_8_2_0 =>
case Kafka_0_8_2_0 | Kafka_0_8_2_1 =>
longRunning {
Future {
KCCommandResult(Try {
Expand Down
Loading

0 comments on commit eeab67d

Please sign in to comment.