Skip to content

Commit

Permalink
Improve the json parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
ningyougang committed Oct 14, 2020
1 parent 88c24e7 commit f9dd602
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -427,21 +427,42 @@ object EventMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(format.read(msg.parseJson))
}

case class ByteSizeMessage(userMemory: ByteSize) extends Message {
override def serialize = ByteSizeMessage.serdes.write(this).compactPrint
case class UserMemoryMessage(userMemory: ByteSize) extends Message {
override def serialize = UserMemoryMessage.serdes.write(this).compactPrint
}

object ByteSizeMessage extends DefaultJsonProtocol {
implicit val serdes = new RootJsonFormat[ByteSizeMessage] {
override def write(message: ByteSizeMessage): JsValue = {
object UserMemoryMessage extends DefaultJsonProtocol {
implicit val serdes = new RootJsonFormat[UserMemoryMessage] {
override def write(message: UserMemoryMessage): JsValue = {
JsObject("userMemory" -> JsString(message.userMemory.toString))
}

override def read(json: JsValue): ByteSizeMessage = {
override def read(json: JsValue): UserMemoryMessage = {
val userMemory = fromField[String](json, "userMemory")
new ByteSizeMessage(ByteSize.fromString(userMemory))
new UserMemoryMessage(ByteSize.fromString(userMemory))
}
}

def parse(msg: String) = Try(serdes.read(msg.parseJson))
}

case class ConfigMemory(invoker: Int, memory: ByteSize)
case class ConfigMemoryList(items: List[ConfigMemory])

object ConfigMemoryProtocol extends DefaultJsonProtocol {
implicit val serdes = new RootJsonFormat[ByteSize] {
override def write(obj: ByteSize): JsValue = JsObject("memory" -> JsString(obj.toString))

override def read(json: JsValue): ByteSize = {
json match {
case JsString(memory) => ByteSize.fromString(memory)
case _ => throw new DeserializationException("Could not deserialize ByteSize")
}
}
}
implicit val configMemoryFormat = jsonFormat2(ConfigMemory)
implicit object configMemoryListJsonFormat extends RootJsonFormat[ConfigMemoryList] {
def read(value: JsValue) = ConfigMemoryList(value.convertTo[List[ConfigMemory]])
def write(f: ConfigMemoryList) = ???
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.openwhisk.common.{
TransactionId
}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.connector.{ByteSizeMessage, MessagingProvider}
import org.apache.openwhisk.core.connector.{ConfigMemoryList, MessagingProvider, UserMemoryMessage}
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
import org.apache.openwhisk.core.entitlement._
Expand Down Expand Up @@ -190,48 +190,28 @@ class Controller(val instance: ControllerInstanceId,
/**
* config user memory of ContainerPool
*/
import org.apache.openwhisk.core.connector.ConfigMemoryProtocol._
private val configMemory = {
implicit val executionContext = actorSystem.dispatcher
(path("config" / "memory") & post) {
extractCredentials {
case Some(BasicHttpCredentials(username, password)) =>
if (username == controllerCredentials.username && password == controllerCredentials.password) {
entity(as[String]) { memory =>
try {
val userMemoryMessage = ByteSizeMessage(ByteSize.fromString(memory))
if (userMemoryMessage.userMemory.size == 0) {
complete(StatusCodes.BadRequest, "user memory must be positive")
} else {
parameter('limit.?) { limit =>
limit match {
case Some(targetValue) =>
val pattern = """\d+:\d"""
if (targetValue.matches(pattern)) {
val invokerArray = targetValue.split(":")
val beginIndex = invokerArray(0).toInt
val finishIndex = invokerArray(1).toInt
if (finishIndex < beginIndex) {
complete(StatusCodes.BadRequest, "finishIndex can't be less than beginIndex")
} else {
val targetInvokers = (beginIndex to finishIndex).toList
loadBalancer.sendUserMemoryToInvokers(userMemoryMessage, Some(targetInvokers))
logging.info(this, "config user memory request is already sent to target invokers")
complete(StatusCodes.Accepted)
}
} else {
complete(StatusCodes.BadRequest, "limit value can't match [beginIndex:finishIndex]")
}
case None =>
loadBalancer.sendUserMemoryToInvokers(userMemoryMessage, None)
logging.info(this, "config user memory request is already sent to all invokers")
complete(StatusCodes.Accepted)
}
}
val configMemoryList = memory.parseJson.convertTo[ConfigMemoryList]

val existIllegalUserMemory = configMemoryList.items.exists { configMemory =>
MemoryLimit.MIN_MEMORY.compare(configMemory.memory) > 0
}
if (existIllegalUserMemory) {
complete(StatusCodes.BadRequest, s"user memory can't be less than ${MemoryLimit.MIN_MEMORY}")
} else {
configMemoryList.items.foreach { configMemory =>
val invoker = configMemory.invoker
val userMemoryMessage = UserMemoryMessage(configMemory.memory)
loadBalancer.sendUserMemoryToInvoker(userMemoryMessage, invoker)
}
} catch {
case ex: IllegalArgumentException =>
logging.info(this, s"error message: ${ex.getMessage}")
complete(StatusCodes.BadRequest, ex.getMessage)
complete(StatusCodes.Accepted)
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ trait LoadBalancer {
* @param userMemory
* @param targetInvokers
*/
def sendUserMemoryToInvokers(userMemoryMessage: ByteSizeMessage, targetInvokers: Option[List[Int]]): Unit = {}
def sendUserMemoryToInvoker(userMemoryMessage: UserMemoryMessage, targetInvoker: Int): Unit = {}

/**
* Returns a message indicating the health of the containers and/or container pool in general.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,9 @@ class ShardingContainerPoolBalancer(
}

/** send user memory to invokers */
override def sendUserMemoryToInvokers(userMemoryMessage: ByteSizeMessage, targetInvokers: Option[List[Int]]): Unit = {
override def sendUserMemoryToInvoker(userMemoryMessage: UserMemoryMessage, targetInvoker: Int): Unit = {
schedulingState.invokers.filter { invoker =>
targetInvokers.getOrElse(schedulingState.invokers.map(_.id.instance)).contains(invoker.id.instance)
invoker.id.instance == targetInvoker
} foreach { invokerHealth =>
val topic = s"invoker${invokerHealth.id.toInt}"
messageProducer.send(topic, userMemoryMessage).andThen {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.openwhisk.core.containerpool

import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
import org.apache.openwhisk.core.connector.{ByteSizeMessage, MessageFeed}
import org.apache.openwhisk.core.connector.{MessageFeed, UserMemoryMessage}
import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
Expand Down Expand Up @@ -305,11 +305,11 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
case RescheduleJob =>
freePool = freePool - sender()
busyPool = busyPool - sender()
case message: ByteSizeMessage =>
case userMemoryMessage: UserMemoryMessage =>
logging.info(
this,
s"user memory is reconfigured from ${latestUserMemory.toString} to ${message.userMemory.toString}")
latestUserMemory = message.userMemory
s"user memory is reconfigured from ${latestUserMemory.toString} to ${userMemoryMessage.userMemory.toString}")
latestUserMemory = userMemoryMessage.userMemory
case UserMemoryQuery =>
sender() ! latestUserMemory.toString
case EmitMetrics =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class InvokerReactive(
Future(
ActivationMessage
.parse(new String(bytes, StandardCharsets.UTF_8))
.orElse(ByteSizeMessage.parse(new String(bytes, StandardCharsets.UTF_8))))
.orElse(UserMemoryMessage.parse(new String(bytes, StandardCharsets.UTF_8))))
.flatMap(Future.fromTry)
.flatMap {
case msg: ActivationMessage =>
Expand Down Expand Up @@ -259,7 +259,7 @@ class InvokerReactive(
logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.")
Future.successful(())
}
case msg: ByteSizeMessage =>
case msg: UserMemoryMessage =>
pool ! msg
activationFeed ! MessageFeed.Processed
Future.successful(())
Expand Down

0 comments on commit f9dd602

Please sign in to comment.