Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement LoadBalancer Strategy based on annotation #5091

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ subprojects {
'akka-discovery', 'akka-distributed-data', 'akka-protobuf', 'akka-remote', 'akka-slf4j',
'akka-stream', 'akka-stream-testkit', 'akka-testkit']
def akkaHttp = ['akka-http', 'akka-http-core', 'akka-http-spray-json', 'akka-http-testkit', 'akka-http-xml',
'akka-parsing', 'akka-http2-support']
'akka-http2-support', 'akka-parsing']

akka.forEach {
cons.add('compile', "com.typesafe.akka:${it}_${gradle.scala.depVersion}:${gradle.akka.version}")
Expand Down
4 changes: 4 additions & 0 deletions core/controller/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ whisk {
use-cluster-bootstrap: false
}
loadbalancer {
strategy {
default = ""
custom = {}
}
managed-fraction: 90%
blackbox-fraction: 10%
# factor to increase the timeout for forced active acks
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.apache.openwhisk.core.loadBalancer

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.WhiskConfig._
import org.apache.openwhisk.core.connector.{ActivationMessage, MessagingProvider}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.spi.SpiLoader
import pureconfig.loadConfigOrThrow
import spray.json._
import pureconfig._
import pureconfig.generic.auto._

import scala.concurrent.Future

class MuxBalancer(config: WhiskConfig,
feedFactory: FeedFactory,
controllerInstance: ControllerInstanceId,
implicit val messagingProvider: MessagingProvider = SpiLoader.get[MessagingProvider],
override val lbConfig: ShardingContainerPoolBalancerConfig =
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer))(
implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer)
extends CommonLoadBalancer(config, feedFactory, controllerInstance) {

private val defaultLoadBalancer =
getClass[LoadBalancerProvider](lbConfig.strategy.default).instance(config, controllerInstance)
private val customLoadBalancerMap: Map[String, LoadBalancer] =
lbConfig.strategy.custom.foldLeft(Map.empty[String, LoadBalancer]) {
case (result, (name, strategyConfig)) =>
result + (name -> getClass[LoadBalancerProvider](strategyConfig.className).instance(config, controllerInstance))
}

/**
* Instantiates an object of the given type.
*
* Similar to SpiLoader.get, with the difference that the constructed class does not need to be declared as Spi.
* Thus there could be multiple classes implementing same interface constructed at the same time
*
* @param name the name of the class
* @tparam A expected type to return
* @return instance of the class
*/
private def getClass[A](name: String): A = {
val clazz = Class.forName(name + "$")
val classInst = clazz.getField("MODULE$").get(clazz).asInstanceOf[A]
classInst
}

override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = Future.successful(IndexedSeq.empty[InvokerHealth])
override protected def releaseInvoker(invoker: InvokerInstanceId, entry: ActivationEntry) = {
// Currently do nothing
}
override protected val invokerPool: ActorRef = actorSystem.actorOf(Props.empty)

/**
* Publish a message to the loadbalancer
*
* Select the LoadBalancer based on the annotation, if available, otherwise use the default one
**/
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
action.annotations.get("activationStrategy") match {
case None =>
defaultLoadBalancer.publish(action, msg)
case Some(JsString(value)) => {
if (customLoadBalancerMap.contains(value)) {
customLoadBalancerMap(value).publish(action, msg)
} else {
defaultLoadBalancer.publish(action, msg)
}
}
case Some(_) => defaultLoadBalancer.publish(action, msg)
}
}
}

object MuxBalancer extends LoadBalancerProvider {

override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): LoadBalancer = {

new MuxBalancer(whiskConfig, createFeedFactory(whiskConfig, instance), instance)
}

def requiredProperties =
ExecManifest.requiredProperties ++
wskApiHost
}
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,28 @@ case class ClusterConfig(useClusterBootstrap: Boolean)
* @param timeoutFactor factor to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + timeoutAddon)
* @param timeoutAddon extra time to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + timeoutAddon)
*/
case class ShardingContainerPoolBalancerConfig(managedFraction: Double,
case class ShardingContainerPoolBalancerConfig(strategy: ActivationStrategy,
managedFraction: Double,
blackboxFraction: Double,
timeoutFactor: Int,
timeoutAddon: FiniteDuration)

/**
* Configuration for the annotation-based load balancer multiplexer
*
* @param default the default strategy to be used if nothing is configured for the given annotation
* @param custom the Map of the strategy name to strategy configuration
*/
case class ActivationStrategy(default: String,
custom: Map[String, StrategyConfig])

/**
* Configuration for the strategy
*
* @param className indicates the class which will handle this strategy name
*/
case class StrategyConfig(className: String)

/**
* State kept for each activation slot until completion.
*
Expand Down
8 changes: 7 additions & 1 deletion core/standalone/src/main/resources/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ whisk {
spi {
ArtifactStoreProvider = "org.apache.openwhisk.core.database.memory.MemoryArtifactStoreProvider"
MessagingProvider = "org.apache.openwhisk.connector.lean.LeanMessagingProvider"
LoadBalancerProvider = "org.apache.openwhisk.core.loadBalancer.LeanBalancer"
LoadBalancerProvider = "org.apache.openwhisk.core.loadBalancer.MuxBalancer"
# Use cli based log store for all setups as its more stable to use
# and does not require root user access
LogStoreProvider = "org.apache.openwhisk.core.containerpool.docker.DockerCliLogStoreProvider"
Expand All @@ -56,6 +56,12 @@ whisk {
limits-actions-invokes-concurrent = 30
}

loadbalancer {
strategy {
default = "org.apache.openwhisk.core.loadBalancer.LeanBalancer"
}
}

controller {
protocol = http

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.WhiskConfig.kafkaHosts
import org.apache.openwhisk.core.entity.ControllerInstanceId
import org.apache.openwhisk.core.loadBalancer.{LeanBalancer, LoadBalancer, LoadBalancerProvider}
import org.apache.openwhisk.core.loadBalancer.{LoadBalancer, LoadBalancerProvider, MuxBalancer}
import org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort, containerName, createRunCmd}

import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -109,11 +109,12 @@ class KafkaLauncher(
}

object KafkaAwareLeanBalancer extends LoadBalancerProvider {
override def requiredProperties: Map[String, String] = LeanBalancer.requiredProperties ++ kafkaHosts
override def requiredProperties: Map[String, String] = MuxBalancer.requiredProperties ++ kafkaHosts

override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(implicit actorSystem: ActorSystem,
logging: Logging): LoadBalancer =
LeanBalancer.instance(whiskConfig, instance)
override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
implicit actorSystem: ActorSystem,
logging: Logging): LoadBalancer =
MuxBalancer.instance(whiskConfig, instance)
}

object KafkaLauncher {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.loadBalancer.test

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.entity.{
ActivationId,
ControllerInstanceId,
ExecManifest,
ExecutableWhiskActionMetaData,
UUID,
WhiskActivation
}
import org.apache.openwhisk.core.loadBalancer.{InvokerHealth, LoadBalancer, LoadBalancerProvider}
import org.apache.openwhisk.core.WhiskConfig._

import scala.concurrent.Future

class MockLoadBalancer(prefix: String) extends LoadBalancer {
override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = Future.successful(IndexedSeq.empty[InvokerHealth])
override def clusterSize: Int = 1
override def totalActiveActivations: Future[Int] = Future.successful(1)
override def activeActivationsFor(namespace: UUID): Future[Int] =
Future.successful(0)
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
Future.successful(Future.successful(Left(ActivationId(prefix + "-mockLoadBalancerId0"))))
}
}

object MockLoadBalancerCustom extends LoadBalancerProvider {
override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): LoadBalancer = {

new MockLoadBalancer("custom")
}

def requiredProperties =
ExecManifest.requiredProperties ++
wskApiHost
}

object MockLoadBalancerDefault extends LoadBalancerProvider {
override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): LoadBalancer = {

new MockLoadBalancer("default")
}

def requiredProperties =
ExecManifest.requiredProperties ++
wskApiHost
}