Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[New Scheduler] Manage container creation #5074

Merged
merged 12 commits into from
May 26, 2021
1 change: 1 addition & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -456,5 +456,6 @@ etcd_connect_string: "{% set ret = [] %}\
{{ ret | join(',') }}"

scheduler:
protocol: "{{ scheduler_protocol | default('http') }}"
dataManagementService:
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,11 @@ object LoggingMarkers {
LogMarkerToken(kafka, "topic", start, Some("delay"), Map("topic" -> topic))(MeasurementUnit.time.milliseconds)
else LogMarkerToken(kafka, topic, start, Some("delay"))(MeasurementUnit.time.milliseconds)

// Time that is needed to produce message in kafka
val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds)

def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)

/*
* General markers
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,45 @@

package org.apache.openwhisk.common

import org.apache.openwhisk.core.entity.InvokerInstanceId

case object GracefulShutdown
case object Enable

// States an Invoker can be in
sealed trait InvokerState {
val asString: String
val isUsable: Boolean
}

object InvokerState {
// Invokers in this state can be used to schedule workload to
sealed trait Usable extends InvokerState { val isUsable = true }
// No workload should be scheduled to invokers in this state
sealed trait Unusable extends InvokerState { val isUsable = false }

// A completely healthy invoker, pings arriving fine, no system errors
case object Healthy extends Usable { val asString = "up" }
// The invoker can not create a container
case object Unhealthy extends Unusable { val asString = "unhealthy" }
// Pings are arriving fine, the invoker does not respond with active-acks in the expected time though
case object Unresponsive extends Unusable { val asString = "unresponsive" }
// The invoker is down
case object Offline extends Unusable { val asString = "down" }
}

/**
* Describes an abstract invoker. An invoker is a local container pool manager that
* is in charge of the container life cycle management.
*
* @param id a unique instance identifier for the invoker
* @param status it status (healthy, unhealthy, unresponsive, offline)
*/
case class InvokerHealth(id: InvokerInstanceId, status: InvokerState) {
override def equals(obj: scala.Any): Boolean = obj match {
case that: InvokerHealth => that.id == this.id && that.status == this.status
case _ => false
}

override def toString = s"InvokerHealth($id, $status)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,11 @@ object TransactionId {

val systemPrefix = "sid_"

var containerCreation = TransactionId(systemPrefix + "containerCreation")
val unknown = TransactionId(systemPrefix + "unknown")
val testing = TransactionId(systemPrefix + "testing") // Common id for for unit testing
val invoker = TransactionId(systemPrefix + "invoker") // Invoker startup/shutdown or GC activity
val invokerHealthManager = TransactionId(systemPrefix + "invokerHealthManager") // Invoker startup/shutdown or GC activity
def invokerHealthActivation = TransactionId(systemPrefix + "invokerHealthActivation") // Invoker health activation
val invokerWarmup = TransactionId(systemPrefix + "invokerWarmup") // Invoker warmup thread that makes stem-cell containers
val invokerNanny = TransactionId(systemPrefix + "invokerNanny") // Invoker nanny thread
val dispatcher = TransactionId(systemPrefix + "dispatcher") // Kafka message dispatcher
Expand All @@ -245,7 +245,9 @@ object TransactionId {
val controller = TransactionId(systemPrefix + "controller") // Controller startup
val dbBatcher = TransactionId(systemPrefix + "dbBatcher") // Database batcher
val actionHealthPing = TransactionId(systemPrefix + "actionHealth")
def invokerHealthActivation = TransactionId(systemPrefix + "invokerHealthActivation") // Invoker health activation
var containerCreation = TransactionId(systemPrefix + "containerCreation")
var containerDeletion = TransactionId(systemPrefix + "containerDeletion")
val warmUp = TransactionId(systemPrefix + "warmUp")

private val dict = ('A' to 'Z') ++ ('a' to 'z') ++ ('0' to '9')

Expand Down
72 changes: 72 additions & 0 deletions common/scala/src/main/scala/org/apache/openwhisk/core/WarmUp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core

import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.connector.{ActivationMessage, ContainerCreationMessage}
import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
import org.apache.openwhisk.core.entity._

object WarmUp {
val warmUpActionIdentity = {
val whiskSystem = "whisk.system"
val uuid = UUID()
Identity(Subject(whiskSystem), Namespace(EntityName(whiskSystem), uuid), BasicAuthenticationAuthKey(uuid, Secret()))
}

private val actionName = "warmUp"

// this action doesn't need to be in database
val warmUpAction = FullyQualifiedEntityName(warmUpActionIdentity.namespace.name.toPath, EntityName(actionName))

def warmUpActivation(controller: ControllerInstanceId) = {
ActivationMessage(
transid = TransactionId.warmUp,
action = warmUpAction,
revision = DocRevision.empty,
user = warmUpActionIdentity,
activationId = new ActivationIdGenerator {}.make(),
rootControllerIndex = controller,
blocking = false,
content = None,
initArgs = Set.empty)
}

def warmUpContainerCreationMessage(scheduler: SchedulerInstanceId) =
ExecManifest.runtimesManifest
.resolveDefaultRuntime("nodejs:default")
.map { manifest =>
val metadata = WhiskActionMetaData(
warmUpAction.path,
warmUpAction.name,
CodeExecMetaDataAsString(manifest, false, entryPoint = None))
ContainerCreationMessage(
TransactionId.warmUp,
warmUpActionIdentity.namespace.name.toString,
warmUpAction,
DocRevision.empty,
metadata,
scheduler,
"",
0)
}

def isWarmUpAction(fqn: FullyQualifiedEntityName): Boolean = {
fqn == warmUpAction
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ object WhiskConfig {
object ConfigKeys {
val cluster = "whisk.cluster"
val loadbalancer = "whisk.loadbalancer"
val fraction = "whisk.fraction"
val buildInformation = "whisk.info"

val couchdb = "whisk.couchdb"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ object Annotations {
val RawHttpAnnotationName = "raw-http"
val RequireWhiskAuthAnnotation = "require-whisk-auth"
val ProvideApiKeyAnnotationName = "provide-api-key"
val InvokerResourcesAnnotationName = "invoker-resources"
val InvokerResourcesStrictPolicyAnnotationName = "invoker-resources-strict-policy"
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@ import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import kamon.Kamon
import pureconfig._
import pureconfig.generic.auto._
import spray.json.DefaultJsonProtocol._
import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.connector.MessagingProvider
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
Expand All @@ -40,13 +36,17 @@ import org.apache.openwhisk.core.entitlement._
import org.apache.openwhisk.core.entity.ActivationId.ActivationIdGenerator
import org.apache.openwhisk.core.entity.ExecManifest.Runtimes
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.loadBalancer.{InvokerState, LoadBalancerProvider}
import org.apache.openwhisk.core.loadBalancer.LoadBalancerProvider
import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
import org.apache.openwhisk.spi.SpiLoader
import pureconfig._
import spray.json.DefaultJsonProtocol._
import spray.json._
import pureconfig.generic.auto._

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits
import scala.concurrent.duration.DurationInt
import scala.concurrent.Await
import scala.util.{Failure, Success}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,6 @@ case object GetStatus

case object Tick

// States an Invoker can be in
sealed trait InvokerState {
val asString: String
val isUsable: Boolean
}

object InvokerState {
// Invokers in this state can be used to schedule workload to
sealed trait Usable extends InvokerState { val isUsable = true }
// No workload should be scheduled to invokers in this state
sealed trait Unusable extends InvokerState { val isUsable = false }

// A completely healthy invoker, pings arriving fine, no system errors
case object Healthy extends Usable { val asString = "up" }
// Pings are arriving fine, the invoker returns system errors though
case object Unhealthy extends Unusable { val asString = "unhealthy" }
// Pings are arriving fine, the invoker does not respond with active-acks in the expected time though
case object Unresponsive extends Unusable { val asString = "unresponsive" }
// Pings are not arriving for this invoker
case object Offline extends Unusable { val asString = "down" }
}

// Possible answers of an activation
sealed trait InvocationFinishedResult
object InvocationFinishedResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,18 @@

package org.apache.openwhisk.core.loadBalancer

import scala.concurrent.Future
import akka.actor.{ActorRefFactory, ActorSystem, Props}
import akka.stream.ActorMaterializer
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.common.{InvokerHealth, Logging, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.spi.Spi

import scala.concurrent.Future
import scala.concurrent.duration._

/**
* Describes an abstract invoker. An invoker is a local container pool manager that
* is in charge of the container life cycle management.
*
* @param id a unique instance identifier for the invoker
* @param status it status (healthy, unhealthy, offline)
*/
class InvokerHealth(val id: InvokerInstanceId, val status: InvokerState) {
override def equals(obj: scala.Any): Boolean = obj match {
case that: InvokerHealth => that.id == this.id && that.status == this.status
case _ => false
}

override def toString = s"InvokerHealth($id, $status)"
}

trait LoadBalancer {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import akka.management.scaladsl.AkkaManagement
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
import pureconfig._
import pureconfig.generic.auto._
import org.apache.openwhisk.common._
Expand All @@ -37,7 +38,6 @@ import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size.SizeLong
import org.apache.openwhisk.common.LoggingMarkers._
import org.apache.openwhisk.core.controller.Controller
import org.apache.openwhisk.core.loadBalancer.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.spi.SpiLoader

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.openwhisk.core.containerpool.v2
import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
import akka.util.Timeout
import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool.ContainerRemoved
Expand Down Expand Up @@ -350,23 +351,6 @@ object InvokerHealthManager {
}
}

// States an Invoker can be in
sealed trait InvokerState {
val asString: String
}

case object Offline extends InvokerState {
val asString = "down"
}

case object Healthy extends InvokerState {
val asString = "up"
}

case object Unhealthy extends InvokerState {
val asString = "unhealthy"
}

//recevied from ContainerProxy actor
case class HealthMessage(state: Boolean)

Expand Down
28 changes: 28 additions & 0 deletions core/scheduler/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

whisk{
# tracing configuration
tracing {
component = "Scheduler"
}

fraction {
managed-fraction: 90%
blackbox-fraction: 10%
}
}