Skip to content

Commit

Permalink
[New Scheduler] Manage container creation (#5074)
Browse files Browse the repository at this point in the history
* Manage container creation

* Add ContainerManager Test

* Add dedicated namespace

* Remove namespace

* Apply scala fmt

* Add dedicatedNamespaces filter

* Add dedicatedNamespaces test

* Move InvokerState to common

* Unify InvokerHealth message

* Add configuration for test

* Add license header

* Remove compare InvokerResourceMessage
  • Loading branch information
KeonHee committed May 26, 2021
1 parent 9c445f3 commit f1829e1
Show file tree
Hide file tree
Showing 20 changed files with 1,881 additions and 76 deletions.
1 change: 1 addition & 0 deletions ansible/group_vars/all
Expand Up @@ -456,5 +456,6 @@ etcd_connect_string: "{% set ret = [] %}\
{{ ret | join(',') }}"

scheduler:
protocol: "{{ scheduler_protocol | default('http') }}"
dataManagementService:
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"
Expand Up @@ -560,9 +560,11 @@ object LoggingMarkers {
LogMarkerToken(kafka, "topic", start, Some("delay"), Map("topic" -> topic))(MeasurementUnit.time.milliseconds)
else LogMarkerToken(kafka, topic, start, Some("delay"))(MeasurementUnit.time.milliseconds)

// Time that is needed to produce message in kafka
val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds)

def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)

/*
* General markers
*/
Expand Down
Expand Up @@ -17,5 +17,45 @@

package org.apache.openwhisk.common

import org.apache.openwhisk.core.entity.InvokerInstanceId

case object GracefulShutdown
case object Enable

// States an Invoker can be in
sealed trait InvokerState {
val asString: String
val isUsable: Boolean
}

object InvokerState {
// Invokers in this state can be used to schedule workload to
sealed trait Usable extends InvokerState { val isUsable = true }
// No workload should be scheduled to invokers in this state
sealed trait Unusable extends InvokerState { val isUsable = false }

// A completely healthy invoker, pings arriving fine, no system errors
case object Healthy extends Usable { val asString = "up" }
// The invoker can not create a container
case object Unhealthy extends Unusable { val asString = "unhealthy" }
// Pings are arriving fine, the invoker does not respond with active-acks in the expected time though
case object Unresponsive extends Unusable { val asString = "unresponsive" }
// The invoker is down
case object Offline extends Unusable { val asString = "down" }
}

/**
* Describes an abstract invoker. An invoker is a local container pool manager that
* is in charge of the container life cycle management.
*
* @param id a unique instance identifier for the invoker
* @param status it status (healthy, unhealthy, unresponsive, offline)
*/
case class InvokerHealth(id: InvokerInstanceId, status: InvokerState) {
override def equals(obj: scala.Any): Boolean = obj match {
case that: InvokerHealth => that.id == this.id && that.status == this.status
case _ => false
}

override def toString = s"InvokerHealth($id, $status)"
}
Expand Up @@ -232,11 +232,11 @@ object TransactionId {

val systemPrefix = "sid_"

var containerCreation = TransactionId(systemPrefix + "containerCreation")
val unknown = TransactionId(systemPrefix + "unknown")
val testing = TransactionId(systemPrefix + "testing") // Common id for for unit testing
val invoker = TransactionId(systemPrefix + "invoker") // Invoker startup/shutdown or GC activity
val invokerHealthManager = TransactionId(systemPrefix + "invokerHealthManager") // Invoker startup/shutdown or GC activity
def invokerHealthActivation = TransactionId(systemPrefix + "invokerHealthActivation") // Invoker health activation
val invokerWarmup = TransactionId(systemPrefix + "invokerWarmup") // Invoker warmup thread that makes stem-cell containers
val invokerNanny = TransactionId(systemPrefix + "invokerNanny") // Invoker nanny thread
val dispatcher = TransactionId(systemPrefix + "dispatcher") // Kafka message dispatcher
Expand All @@ -245,7 +245,9 @@ object TransactionId {
val controller = TransactionId(systemPrefix + "controller") // Controller startup
val dbBatcher = TransactionId(systemPrefix + "dbBatcher") // Database batcher
val actionHealthPing = TransactionId(systemPrefix + "actionHealth")
def invokerHealthActivation = TransactionId(systemPrefix + "invokerHealthActivation") // Invoker health activation
var containerCreation = TransactionId(systemPrefix + "containerCreation")
var containerDeletion = TransactionId(systemPrefix + "containerDeletion")
val warmUp = TransactionId(systemPrefix + "warmUp")

private val dict = ('A' to 'Z') ++ ('a' to 'z') ++ ('0' to '9')

Expand Down
72 changes: 72 additions & 0 deletions common/scala/src/main/scala/org/apache/openwhisk/core/WarmUp.scala
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core

import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.connector.{ActivationMessage, ContainerCreationMessage}
import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
import org.apache.openwhisk.core.entity._

object WarmUp {
val warmUpActionIdentity = {
val whiskSystem = "whisk.system"
val uuid = UUID()
Identity(Subject(whiskSystem), Namespace(EntityName(whiskSystem), uuid), BasicAuthenticationAuthKey(uuid, Secret()))
}

private val actionName = "warmUp"

// this action doesn't need to be in database
val warmUpAction = FullyQualifiedEntityName(warmUpActionIdentity.namespace.name.toPath, EntityName(actionName))

def warmUpActivation(controller: ControllerInstanceId) = {
ActivationMessage(
transid = TransactionId.warmUp,
action = warmUpAction,
revision = DocRevision.empty,
user = warmUpActionIdentity,
activationId = new ActivationIdGenerator {}.make(),
rootControllerIndex = controller,
blocking = false,
content = None,
initArgs = Set.empty)
}

def warmUpContainerCreationMessage(scheduler: SchedulerInstanceId) =
ExecManifest.runtimesManifest
.resolveDefaultRuntime("nodejs:default")
.map { manifest =>
val metadata = WhiskActionMetaData(
warmUpAction.path,
warmUpAction.name,
CodeExecMetaDataAsString(manifest, false, entryPoint = None))
ContainerCreationMessage(
TransactionId.warmUp,
warmUpActionIdentity.namespace.name.toString,
warmUpAction,
DocRevision.empty,
metadata,
scheduler,
"",
0)
}

def isWarmUpAction(fqn: FullyQualifiedEntityName): Boolean = {
fqn == warmUpAction
}
}
Expand Up @@ -203,6 +203,7 @@ object WhiskConfig {
object ConfigKeys {
val cluster = "whisk.cluster"
val loadbalancer = "whisk.loadbalancer"
val fraction = "whisk.fraction"
val buildInformation = "whisk.info"

val couchdb = "whisk.couchdb"
Expand Down
Expand Up @@ -24,4 +24,6 @@ object Annotations {
val RawHttpAnnotationName = "raw-http"
val RequireWhiskAuthAnnotation = "require-whisk-auth"
val ProvideApiKeyAnnotationName = "provide-api-key"
val InvokerResourcesAnnotationName = "invoker-resources"
val InvokerResourcesStrictPolicyAnnotationName = "invoker-resources-strict-policy"
}
Expand Up @@ -26,12 +26,8 @@ import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import kamon.Kamon
import pureconfig._
import pureconfig.generic.auto._
import spray.json.DefaultJsonProtocol._
import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.connector.MessagingProvider
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
Expand All @@ -40,13 +36,17 @@ import org.apache.openwhisk.core.entitlement._
import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
import org.apache.openwhisk.core.entity.ExecManifest.Runtimes
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.loadBalancer.{InvokerState, LoadBalancerProvider}
import org.apache.openwhisk.core.loadBalancer.LoadBalancerProvider
import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
import org.apache.openwhisk.spi.SpiLoader
import pureconfig._
import spray.json.DefaultJsonProtocol._
import spray.json._
import pureconfig.generic.auto._

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits
import scala.concurrent.duration.DurationInt
import scala.concurrent.Await
import scala.util.{Failure, Success}

/**
Expand Down
Expand Up @@ -43,28 +43,6 @@ case object GetStatus

case object Tick

// States an Invoker can be in
sealed trait InvokerState {
val asString: String
val isUsable: Boolean
}

object InvokerState {
// Invokers in this state can be used to schedule workload to
sealed trait Usable extends InvokerState { val isUsable = true }
// No workload should be scheduled to invokers in this state
sealed trait Unusable extends InvokerState { val isUsable = false }

// A completely healthy invoker, pings arriving fine, no system errors
case object Healthy extends Usable { val asString = "up" }
// Pings are arriving fine, the invoker returns system errors though
case object Unhealthy extends Unusable { val asString = "unhealthy" }
// Pings are arriving fine, the invoker does not respond with active-acks in the expected time though
case object Unresponsive extends Unusable { val asString = "unresponsive" }
// Pings are not arriving for this invoker
case object Offline extends Unusable { val asString = "down" }
}

// Possible answers of an activation
sealed trait InvocationFinishedResult
object InvocationFinishedResult {
Expand Down
Expand Up @@ -17,34 +17,18 @@

package org.apache.openwhisk.core.loadBalancer

import scala.concurrent.Future
import akka.actor.{ActorRefFactory, ActorSystem, Props}
import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.common.{InvokerHealth, Logging, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.spi.Spi

import scala.concurrent.Future
import scala.concurrent.duration._

/**
* Describes an abstract invoker. An invoker is a local container pool manager that
* is in charge of the container life cycle management.
*
* @param id a unique instance identifier for the invoker
* @param status it status (healthy, unhealthy, offline)
*/
class InvokerHealth(val id: InvokerInstanceId, val status: InvokerState) {
override def equals(obj: scala.Any): Boolean = obj match {
case that: InvokerHealth => that.id == this.id && that.status == this.status
case _ => false
}

override def toString = s"InvokerHealth($id, $status)"
}

trait LoadBalancer {

/**
Expand Down
Expand Up @@ -28,6 +28,7 @@ import akka.management.scaladsl.AkkaManagement
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
import pureconfig._
import pureconfig.generic.auto._
import org.apache.openwhisk.common._
Expand All @@ -37,7 +38,6 @@ import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size.SizeLong
import org.apache.openwhisk.common.LoggingMarkers._
import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.loadBalancer.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.spi.SpiLoader

Expand Down
Expand Up @@ -20,6 +20,7 @@ package org.apache.openwhisk.core.containerpool.v2
import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
import akka.util.Timeout
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool.ContainerRemoved
Expand Down Expand Up @@ -350,23 +351,6 @@ object InvokerHealthManager {
}
}

// States an Invoker can be in
sealed trait InvokerState {
val asString: String
}

case object Offline extends InvokerState {
val asString = "down"
}

case object Healthy extends InvokerState {
val asString = "up"
}

case object Unhealthy extends InvokerState {
val asString = "unhealthy"
}

//recevied from ContainerProxy actor
case class HealthMessage(state: Boolean)

Expand Down
28 changes: 28 additions & 0 deletions core/scheduler/src/main/resources/application.conf
@@ -0,0 +1,28 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

whisk{
# tracing configuration
tracing {
component = "Scheduler"
}

fraction {
managed-fraction: 90%
blackbox-fraction: 10%
}
}

0 comments on commit f1829e1

Please sign in to comment.