Skip to content

Commit

Permalink
Adjust user memory via api
Browse files Browse the repository at this point in the history
  • Loading branch information
ningyougang committed Sep 28, 2020
1 parent af16122 commit 03d0caf
Show file tree
Hide file tree
Showing 15 changed files with 439 additions and 93 deletions.
2 changes: 2 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ controller:
authentication:
spi: "{{ controller_authentication_spi | default('') }}"
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
username: "{{ controller_username | default('controller.user') }}"
password: "{{ controller_password | default('controller.pass') }}"
entitlement:
spi: "{{ controller_entitlement_spi | default('') }}"
protocol: "{{ controller_protocol | default('https') }}"
Expand Down
3 changes: 3 additions & 0 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@
"CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}"
"CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}"

"CONFIG_whisk_credentials_controller_username": "{{ controller.username }}"
"CONFIG_whisk_credentials_controller_password": "{{ controller.password }}"

"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
"LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.common

case class ControllerCredentials(username: String, password: String)
Original file line number Diff line number Diff line change
Expand Up @@ -275,4 +275,6 @@ object ConfigKeys {
val parameterStorage = "whisk.parameter-storage"

val azBlob = "whisk.azure-blob"

val controllerCredentials = "whisk.credentials.controller"
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,3 +426,22 @@ object EventMessage extends DefaultJsonProtocol {

def parse(msg: String) = Try(format.read(msg.parseJson))
}

case class ByteSizeMessage(userMemory: ByteSize) extends Message {
override def serialize = ByteSizeMessage.serdes.write(this).compactPrint
}

object ByteSizeMessage extends DefaultJsonProtocol {
implicit val serdes = new RootJsonFormat[ByteSizeMessage] {
override def write(message: ByteSizeMessage): JsValue = {
JsObject("userMemory" -> JsString(message.userMemory.toString))
}

override def read(json: JsValue): ByteSizeMessage = {
val userMemory = fromField[String](json, "userMemory")
new ByteSizeMessage(ByteSize.fromString(userMemory))
}
}

def parse(msg: String) = Try(serdes.read(msg.parseJson))
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.{StatusCodes, Uri}
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import kamon.Kamon
Expand All @@ -31,9 +32,16 @@ import pureconfig.generic.auto._
import spray.json.DefaultJsonProtocol._
import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.MessagingProvider
import org.apache.openwhisk.common.{
AkkaLogging,
ConfigMXBean,
ControllerCredentials,
Logging,
LoggingMarkers,
TransactionId
}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.connector.{ByteSizeMessage, MessagingProvider}
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
import org.apache.openwhisk.core.entitlement._
Expand Down Expand Up @@ -97,7 +105,7 @@ class Controller(val instance: ControllerInstanceId,
(pathEndOrSingleSlash & get) {
complete(info)
}
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ configMemory
}

// initialize datastores
Expand Down Expand Up @@ -176,6 +184,63 @@ class Controller(val instance: ControllerInstanceId,
LogLimit.config,
runtimes,
List(apiV1.basepath()))

private val controllerCredentials = loadConfigOrThrow[ControllerCredentials](ConfigKeys.controllerCredentials)

/**
* config user memory of ContainerPool
*/
private val configMemory = {
implicit val executionContext = actorSystem.dispatcher
(path("config" / "memory") & post) {
extractCredentials {
case Some(BasicHttpCredentials(username, password)) =>
if (username == controllerCredentials.username && password == controllerCredentials.password) {
entity(as[String]) { memory =>
try {
val userMemoryMessage = ByteSizeMessage(ByteSize.fromString(memory))
if (userMemoryMessage.userMemory.size == 0) {
complete(StatusCodes.BadRequest, "user memory must be positive")
} else {
parameter('limit.?) { limit =>
limit match {
case Some(targetValue) =>
val pattern = """\d+:\d"""
if (targetValue.matches(pattern)) {
val invokerArray = targetValue.split(":")
val beginIndex = invokerArray(0).toInt
val finishIndex = invokerArray(1).toInt
if (finishIndex < beginIndex) {
complete(StatusCodes.BadRequest, "finishIndex can't be less than beginIndex")
} else {
val targetInvokers = (beginIndex to finishIndex).toList
loadBalancer.sendUserMemoryToInvokers(userMemoryMessage, Some(targetInvokers))
logging.info(this, "config user memory request is already sent to target invokers")
complete(StatusCodes.Accepted)
}
} else {
complete(StatusCodes.BadRequest, "limit value can't match [beginIndex:finishIndex]")
}
case None =>
loadBalancer.sendUserMemoryToInvokers(userMemoryMessage, None)
logging.info(this, "config user memory request is already sent to all invokers")
complete(StatusCodes.Accepted)
}
}
}
} catch {
case ex: IllegalArgumentException =>
logging.info(this, s"error message: ${ex.getMessage}")
complete(StatusCodes.BadRequest, ex.getMessage)
}
}
} else {
complete(StatusCodes.Unauthorized, "username or password is wrong")
}
case _ => complete(StatusCodes.Unauthorized)
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ trait LoadBalancer {
def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]

/**
* send user memory to invokers
*
* @param userMemory
* @param targetInvokers
*/
def sendUserMemoryToInvokers(userMemoryMessage: ByteSizeMessage, targetInvokers: Option[List[Int]]): Unit = {}

/**
* Returns a message indicating the health of the containers and/or container pool in general.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.openwhisk.spi.SpiLoader
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}

/**
* A loadbalancer that schedules workload based on a hashing-algorithm.
Expand Down Expand Up @@ -316,6 +317,21 @@ class ShardingContainerPoolBalancer(
}
}

/** send user memory to invokers */
override def sendUserMemoryToInvokers(userMemoryMessage: ByteSizeMessage, targetInvokers: Option[List[Int]]): Unit = {
schedulingState.invokers.filter { invoker =>
targetInvokers.getOrElse(schedulingState.invokers.map(_.id.instance)).contains(invoker.id.instance)
} foreach { invokerHealth =>
val topic = s"invoker${invokerHealth.id.toInt}"
messageProducer.send(topic, userMemoryMessage).andThen {
case Success(_) =>
logging.info(this, s"successfully posted user memory configuration to topic $topic")
case Failure(_) =>
logging.error(this, s"failed posted user memory configuration to topic $topic")
}
}
}

override val invokerPool =
invokerPoolFactory.createInvokerPool(
actorSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.openwhisk.core.containerpool

import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.connector.{ByteSizeMessage, MessageFeed}
import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
Expand All @@ -37,6 +37,8 @@ case class ColdStartKey(kind: String, memory: ByteSize)

case class WorkerData(data: ContainerData, state: WorkerState)

object UserMemoryQuery

case object EmitMetrics

case object AdjustPrewarmedContainer
Expand Down Expand Up @@ -74,6 +76,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmedData]
var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
var latestUserMemory = poolConfig.userMemory
// If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
// buffered here to keep order of computation.
// Otherwise actions with small memory-limits could block actions with large memory limits.
Expand Down Expand Up @@ -216,7 +219,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
s"Rescheduling Run message, too many message in the pool, " +
s"freePoolSize: ${freePool.size} containers and ${memoryConsumptionOf(freePool)} MB, " +
s"busyPoolSize: ${busyPool.size} containers and ${memoryConsumptionOf(busyPool)} MB, " +
s"maxContainersMemory ${poolConfig.userMemory.toMB} MB, " +
s"maxContainersMemory ${latestUserMemory.toMB} MB, " +
s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " +
s"needed memory: ${r.action.limits.memory.megabytes} MB, " +
s"waiting messages: ${runBuffer.size}")(r.msg.transid)
Expand Down Expand Up @@ -302,6 +305,13 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
case RescheduleJob =>
freePool = freePool - sender()
busyPool = busyPool - sender()
case message: ByteSizeMessage =>
logging.info(
this,
s"user memory is reconfigured from ${latestUserMemory.toString} to ${message.userMemory.toString}")
latestUserMemory = message.userMemory
case UserMemoryQuery =>
sender() ! latestUserMemory.toString
case EmitMetrics =>
emitMetrics()

Expand Down Expand Up @@ -441,7 +451,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
* @return true, if there is enough space for the given amount of memory.
*/
def hasPoolSpaceFor[A](pool: Map[A, ContainerData], memory: ByteSize): Boolean = {
memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB
memoryConsumptionOf(pool) + memory.toMB <= latestUserMemory.toMB
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.invoker

import akka.actor.ActorSystem
import akka.http.scaladsl.server.Route
import org.apache.openwhisk.common.{Logging, TransactionId}

import org.apache.openwhisk.http.BasicRasService

import scala.concurrent.ExecutionContext

/**
* Implements web server to handle certain REST API calls.
*/
class DefaultInvokerServer(val invoker: InvokerCore)(implicit val ec: ExecutionContext,
val actorSystem: ActorSystem,
val logger: Logging)
extends BasicRasService {

override def routes(implicit transid: TransactionId): Route = {
super.routes ~ {
(path("config" / "memory") & get) {
invoker.getUserMemory()
}
}
}
}

object DefaultInvokerServer extends InvokerServerProvider {
override def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
new DefaultInvokerServer(invoker)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.openwhisk.core.invoker

import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigValueFactory
import kamon.Kamon
Expand Down Expand Up @@ -210,7 +211,9 @@ trait InvokerProvider extends Spi {
}

// this trait can be used to add common implementation
trait InvokerCore {}
trait InvokerCore {
def getUserMemory(): Route
}

/**
* An Spi for providing RestAPI implementation for invoker.
Expand All @@ -220,9 +223,3 @@ trait InvokerServerProvider extends Spi {
def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService
}

object DefaultInvokerServer extends InvokerServerProvider {
override def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
new BasicRasService {}
}
Loading

0 comments on commit 03d0caf

Please sign in to comment.