Skip to content
This repository has been archived by the owner on Oct 6, 2023. It is now read-only.

Commit

Permalink
support preemption extender at /preempt_if_not_throttled
Browse files Browse the repository at this point in the history
  • Loading branch information
everpeace committed Mar 13, 2019
1 parent 1340e6b commit 3f812a0
Show file tree
Hide file tree
Showing 8 changed files with 336 additions and 74 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -52,7 +52,7 @@ This creates:
"urlPrefix": "http://extender.kube-throttler/",
"filterVerb": "check_throttle",
"prioritizeVerb": "",
"preemptVerb": "",
"preemptVerb": "preempt_if_not_throttled",
"bindVerb": "",
"weight": 1,
"enableHttps": false,
Expand Down
4 changes: 2 additions & 2 deletions example/my-scheduler.yaml
Expand Up @@ -32,7 +32,7 @@ data:
"urlPrefix": "http://kube-throttler.kube-throttler/",
"filterVerb": "check_throttle",
"prioritizeVerb": "",
"preemptVerb": "",
"preemptVerb": "preempt_if_not_throttled",
"bindVerb": "",
"weight": 1,
"enableHttps": false,
Expand Down Expand Up @@ -63,7 +63,7 @@ spec:
name: my-scheduler-config
containers:
- name: my-scheduler-ctr
image: gcr.io/google_containers/hyperkube:v1.11.1
image: gcr.io/google_containers/hyperkube:v1.12.1
imagePullPolicy: IfNotPresent
args:
- kube-scheduler
Expand Down
175 changes: 116 additions & 59 deletions src/main/scala/com/github/everpeace/k8s/throttler/Routes.scala
Expand Up @@ -17,6 +17,9 @@
package com.github.everpeace.k8s.throttler

import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ContentTypes._
import akka.http.scaladsl.model.headers.`Content-Type`
import akka.http.scaladsl.server.Directives._
import akka.pattern.ask
import akka.stream.ActorMaterializer
Expand Down Expand Up @@ -70,7 +73,7 @@ class Routes(
}

def all =
readinessProbe(requestHandlerReady).toRoute ~ livenessProbe(controllerAlive).toRoute ~ checkThrottle
readinessProbe(requestHandlerReady).toRoute ~ livenessProbe(controllerAlive).toRoute ~ checkThrottle ~ preemptIfNotThrottled

def errorResult(arg: ExtenderArgs, message: String): v1.ExtenderFilterResult = {
val nodeNames = if (arg.nodes.nonEmpty) {
Expand Down Expand Up @@ -117,64 +120,118 @@ class Routes(

def checkThrottle = path("check_throttle") {
post {
entity(as[v1.ExtenderArgs]) { extenderArgs =>
val pod = extenderArgs.pod
system.log.info("checking throttle status for pod {}", pod.key)
onComplete(requestHandleActor ? CheckThrottleRequest(pod)) {
// some throttles are active!! no nodes are schedulable
case Success(
Throttled(p,
activeThrottles,
activeClusterThrottles,
noSpaceThrottles,
noSpaceClusterThrottles)) if p == pod =>
val activeThrottleMessage = activeThrottles.toList.toNel
.map { thrs =>
val names = thrs.map(thr => thr.namespace -> thr.name).toList
s"throttles[active]=${names.mkString(",")}"
}

val activeClusterThrottleMessage = activeClusterThrottles.toList.toNel
.map { thrs =>
val names = thrs.map(_.name).toList
s"clusterthrottles[active]=${names.mkString(",")}"
}

val noSpaceThrottleMessage = noSpaceThrottles.toList.toNel
.map { thrs =>
val names = thrs.map(thr => thr.namespace -> thr.name).toList
s"throttles[insufficient]=${names.mkString(",")}"
}

val noSpaceClusterThrottleMessage = noSpaceClusterThrottles.toList.toNel
.map { thrs =>
val names = thrs.map(_.name).toList
s"clusterthrottles[insufficient]=${names.mkString(",")}"
}

val aggregatedMessage =
List(activeThrottleMessage,
activeClusterThrottleMessage,
noSpaceThrottleMessage,
noSpaceClusterThrottleMessage).filter(_.nonEmpty).map(_.get).mkString(", ")

val message = s"pod ${pod.key} is unschedulable due to $aggregatedMessage"
system.log.info(message)
complete(unSchedulableResult(extenderArgs, message))

// no throttles are active!! all nodes are schedulable.
case Success(NotThrottled(p)) if p == pod =>
system.log.info(
"pod {} is schedulable because no 'throttled' throttles/clusterthrottles for the pod.",
pod.key)
complete(schedulableResult(extenderArgs))

// failure. no nodes are schedulable.
case Failure(exp) =>
val message =
s"exception occurred in checking throttles for pod ${pod.key}: ${exp.getMessage}"
system.log.error(message)
complete(errorResult(extenderArgs, message))
logRequestResult("preempt_if_not_throttled") {
entity(as[v1.ExtenderArgs]) { extenderArgs =>
val pod = extenderArgs.pod
system.log.info("checking throttle status for pod {}", pod.key)
onComplete((requestHandleActor ? CheckThrottleRequest(pod)).mapTo[CheckThrottleResponse]) {
// some throttles are active!! no nodes are schedulable
case Success(
Throttled(p,
activeThrottles,
activeClusterThrottles,
noSpaceThrottles,
noSpaceClusterThrottles)) if p == pod =>
val activeThrottleMessage = activeThrottles.toList.toNel
.map { thrs =>
val names = thrs.map(thr => thr.namespace -> thr.name).toList
s"throttles[active]=${names.mkString(",")}"
}

val activeClusterThrottleMessage = activeClusterThrottles.toList.toNel
.map { thrs =>
val names = thrs.map(_.name).toList
s"clusterthrottles[active]=${names.mkString(",")}"
}

val noSpaceThrottleMessage = noSpaceThrottles.toList.toNel
.map { thrs =>
val names = thrs.map(thr => thr.namespace -> thr.name).toList
s"throttles[insufficient]=${names.mkString(",")}"
}

val noSpaceClusterThrottleMessage = noSpaceClusterThrottles.toList.toNel
.map { thrs =>
val names = thrs.map(_.name).toList
s"clusterthrottles[insufficient]=${names.mkString(",")}"
}

val aggregatedMessage =
List(activeThrottleMessage,
activeClusterThrottleMessage,
noSpaceThrottleMessage,
noSpaceClusterThrottleMessage).filter(_.nonEmpty).map(_.get).mkString(", ")

val message = s"pod ${pod.key} is unschedulable due to $aggregatedMessage"
system.log.info(message)
complete(unSchedulableResult(extenderArgs, message))

// no throttles are active!! all nodes are schedulable.
case Success(NotThrottled(p)) if p == pod =>
system.log.info(
"pod {} is schedulable because no 'throttled' throttles/clusterthrottles for the pod.",
pod.key)
complete(schedulableResult(extenderArgs))

case Success(NotReady) =>
val message = s"throttler is not ready in checking throttle status of pod ${pod.key}"
system.log.error(message)
complete(errorResult(extenderArgs, message))

// failure. no nodes are schedulable.
case Failure(exp) =>
val message =
s"exception occurred in checking throttles for pod ${pod.key}: ${exp.getMessage}"
system.log.error(message)
complete(errorResult(extenderArgs, message))
}
}
}
}
}

val noVictims = v1.ExtenderPreemptionResult(Map.empty)
def echoedResult(args: v1.ExtenderPreemptionArgs): v1.ExtenderPreemptionResult =
v1.ExtenderPreemptionResult(
args.nodeNameToVictims.mapValues(
victims =>
v1.MetaVictims(
pods = victims.pods.map(p => v1.MetaPod(p.uid)),
numPDBViolations = victims.numPDBViolations
)))

def preemptIfNotThrottled = path("preempt_if_not_throttled") {
post {
logRequestResult("preempt_if_not_throttled") {
entity(as[v1.ExtenderPreemptionArgs]) { extenderArgs =>
val pod = extenderArgs.pod
system.log.info("checking throttle status of pod {} for preemption", pod.key)
onComplete((requestHandleActor ? CheckThrottleRequest(pod)).mapTo[CheckThrottleResponse]) {
case Success(Throttled(_, _, _, _, _)) =>
val message = s"pod ${pod.key} is throttled. no victims should be selected."
system.log.info(message)
complete(noVictims)
case Success(NotThrottled(_)) =>
val result = echoedResult(extenderArgs)
val numVictims = result.nodeNameToMetaVictims.mapValues(_.pods.length)
val message = s"pod ${pod.key} is throttled. echo victims: $numVictims"
system.log.info(message)
complete(result)
case Success(NotReady) =>
val message =
s"throttler is not ready in checking throttle status of pod ${pod.key} for preemption"
system.log.error(message)
complete(StatusCodes.InternalServerError,
List(`Content-Type`(`application/json`)),
"{}")
case Failure(exp) =>
val message =
s"exception occurred in checking throttles of pod ${pod.key} for preemption: ${exp.getMessage}"
system.log.error(message)
complete(StatusCodes.InternalServerError,
List(`Content-Type`(`application/json`)),
"{}")
}
}
}
}
Expand Down
Expand Up @@ -161,15 +161,17 @@ object ThrottleRequestHandler {
case object IsReady

case class CheckThrottleRequest(pod: Pod)
case object NotReady
sealed trait CheckThrottleResponse
case object NotReady extends CheckThrottleResponse
case class Throttled(
pod: Pod,
activeThrottles: Set[v1alpha1.Throttle],
activeClusterThrottles: Set[v1alpha1.ClusterThrottle],
insufficientThrottles: Set[v1alpha1.Throttle],
insufficientClusterThrottles: Set[v1alpha1.ClusterThrottle])
extends CheckThrottleResponse

case class NotThrottled(pod: Pod)
case class NotThrottled(pod: Pod) extends CheckThrottleResponse

def props() = Props(new ThrottleRequestHandler())
}
35 changes: 32 additions & 3 deletions src/main/scala/io/k8s/pkg/scheduler/api/v1/package.scala
Expand Up @@ -32,6 +32,15 @@ package object v1 {
failedNodes: Map[String, String] = Map.empty,
error: Option[String] = None)

case class MetaPod(uid: String)
case class Victims(pods: List[Pod], numPDBViolations: Int)
case class MetaVictims(pods: List[MetaPod], numPDBViolations: Int)
case class ExtenderPreemptionArgs(
pod: Pod,
nodeNameToVictims: Map[String, Victims],
nodeNameToMetaVictims: Map[String, MetaVictims])
case class ExtenderPreemptionResult(nodeNameToMetaVictims: Map[String, MetaVictims])

object Implicits {
implicit val nodeFormat: Format[Node] = (
(JsPath \ "metadata").lazyFormat[ObjectMeta](objectMetaFormat) and
Expand All @@ -49,17 +58,37 @@ package object v1 {
unlift((nl: NodeList) => Option(nl.metadata, nl.items))
)
implicit val extenderArgsFmt: Format[ExtenderArgs] = (
(JsPath \ "Pod").format[Pod](podFormat) and
(JsPath \ "Pod").format[Pod] and
(JsPath \ "Nodes").formatNullable[NodeList](nodeListFormat) and
(JsPath \ "NodeNames").formatMaybeEmptyList[String]
)(ExtenderArgs.apply _, unlift(ExtenderArgs.unapply))
)(ExtenderArgs.apply, unlift(ExtenderArgs.unapply))

implicit val extenderFilterResult: Format[ExtenderFilterResult] = (
(JsPath \ "Nodes").formatNullable[NodeList](nodeListFormat) and
(JsPath \ "NodeNames").formatMaybeEmptyList[String] and
(JsPath \ "FailedNodes").formatMaybeEmptyMap[String] and
(JsPath \ "Error").formatNullable[String]
)(ExtenderFilterResult.apply _, unlift(ExtenderFilterResult.unapply))
)(ExtenderFilterResult.apply, unlift(ExtenderFilterResult.unapply))

implicit val metaPodFmt: Format[MetaPod] =
(JsPath \ "UID").format[String].inmap(MetaPod.apply, unlift(MetaPod.unapply))
implicit val victimsFmt: Format[Victims] = (
(JsPath \ "Pods").formatMaybeEmptyList[Pod] and
(JsPath \ "NumPDBViolations").format[Int]
)(Victims.apply, unlift(Victims.unapply))
implicit val metaVictimsFmt: Format[MetaVictims] = (
(JsPath \ "Pods").formatMaybeEmptyList[MetaPod] and
(JsPath \ "NumPDBViolations").format[Int]
)(MetaVictims.apply, unlift(MetaVictims.unapply))
implicit val extenderPreemptionArgsFmt: Format[ExtenderPreemptionArgs] = (
(JsPath \ "Pod").format[Pod] and
(JsPath \ "NodeNameToVictims").formatMaybeEmptyMap[Victims] and
(JsPath \ "NodeNameToMetaVictims").formatMaybeEmptyMap[MetaVictims]
)(ExtenderPreemptionArgs.apply, unlift(ExtenderPreemptionArgs.unapply))
implicit val extenderPreemptionResultFmt: Format[ExtenderPreemptionResult] =
(JsPath \ "NodeNameToMetaVictims")
.formatMaybeEmptyMap[MetaVictims]
.inmap(ExtenderPreemptionResult.apply, unlift(ExtenderPreemptionResult.unapply))
}

}
2 changes: 2 additions & 0 deletions src/test/resources/reference.conf
@@ -0,0 +1,2 @@
akka.loglevel = "DEBUG"

0 comments on commit 3f812a0

Please sign in to comment.