Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute OpenWhisk action via Ignite and Firecracker VM #4556

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2b8a88e
Add docker file to build ignite compatible nodejs image
chetanmeh Jul 11, 2019
71f1480
Switch to centos for default dockerfile
chetanmeh Jul 12, 2019
e45392b
Implement base skeleton for IgniteContainerFactory
chetanmeh Jul 12, 2019
d79b0b8
Ignore parameters which are Docker specific
chetanmeh Jul 12, 2019
f6c9ec6
Rename config as its client specific
chetanmeh Jul 12, 2019
6a087da
Derive the ignite api
chetanmeh Jul 12, 2019
3cb1933
Add base logic to run commands
chetanmeh Jul 12, 2019
f4a7f29
Pass DockerApi to Ignite client and implement skeleton methods
chetanmeh Jul 12, 2019
5c96ed9
Implement ip address api via Docker api
chetanmeh Jul 12, 2019
83ab5c9
Implement logic to determine containerId
chetanmeh Jul 12, 2019
ba2c4d0
Implement support getting dockerId and remove
chetanmeh Jul 12, 2019
37ab3e6
Implement run support
chetanmeh Jul 12, 2019
218301e
Implement support for cleaning all launched vms
chetanmeh Jul 12, 2019
61b6605
Add toString impl to log vm details
chetanmeh Jul 12, 2019
41253e3
Add a ignored test
chetanmeh Jul 12, 2019
93c0150
Fix docker file and add a build script
chetanmeh Jul 12, 2019
e379ab3
Fix build issues
chetanmeh Jul 12, 2019
c8d78ad
Fix config path
chetanmeh Jul 12, 2019
bff66f7
fix command names
chetanmeh Jul 12, 2019
b5a220a
Log test run
chetanmeh Jul 12, 2019
b5bc415
Add manifest file to use with standalone for ignite
chetanmeh Jul 12, 2019
61d6101
Add ignite specific runtime
chetanmeh Jul 12, 2019
dc277db
Rename the file
chetanmeh Jul 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -446,6 +446,11 @@ object LoggingMarkers {
val CONTAINER_CLIENT_RETRIES =
LogMarkerToken(containerClient, "retries", counter)(MeasurementUnit.none)

def INVOKER_IGNITE_CMD(cmd: String) =
LogMarkerToken(invoker, "ignite", start, Some(cmd), Map("cmd" -> cmd))(MeasurementUnit.time.milliseconds)
def INVOKER_IGNITE_CMD_TIMEOUT(cmd: String) =
LogMarkerToken(invoker, "ignite", timeout, Some(cmd), Map("cmd" -> cmd))(MeasurementUnit.none)

val INVOKER_TOTALMEM_BLACKBOX = LogMarkerToken(loadbalancer, "totalCapacityBlackBox", counter)(MeasurementUnit.none)
val INVOKER_TOTALMEM_MANAGED = LogMarkerToken(loadbalancer, "totalCapacityManaged", counter)(MeasurementUnit.none)

Expand Down
Expand Up @@ -264,4 +264,7 @@ object ConfigKeys {

val whiskConfig = "whisk.config"
val swaggerUi = "whisk.swagger-ui"

val ignite = "whisk.ignite"
val igniteClient = s"$ignite.client"
}
17 changes: 17 additions & 0 deletions core/invoker/src/main/resources/application.conf
Expand Up @@ -117,4 +117,21 @@ whisk {
invoker {
protocol: http
}

ignite {
# to pass additional args to 'docker run'; format is `{key1: [v1, v2], key2: [v1, v2]}`
extra-args {

}
client {
timeouts {
create: 10 minutes
version: 10 seconds
inspect: 1 minute
rm: 1 minute
run: 5 minutes
ps: 1 minute
}
}
}
}
Expand Up @@ -195,7 +195,8 @@ class DockerClient(dockerHost: Option[String] = None,
def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean] =
runCmd(Seq("inspect", id.asString, "--format", "{{.State.OOMKilled}}"), config.timeouts.inspect).map(_.toBoolean)

protected def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = {
protected[containerpool] def runCmd(args: Seq[String], timeout: Duration)(
implicit transid: TransactionId): Future[String] = {
val cmd = dockerCmd ++ args
val start = transid.started(
this,
Expand Down
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.containerpool.ignite

import java.io.FileNotFoundException
import java.nio.file.{Files, Paths}

import akka.actor.ActorSystem
import akka.event.Logging.{ErrorLevel, InfoLevel}
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.containerpool.docker.{
DockerClient,
DockerClientConfig,
ProcessRunner,
ProcessTimeoutException
}
import org.apache.openwhisk.core.containerpool.{ContainerAddress, ContainerId}
import pureconfig.loadConfigOrThrow

import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

case class IgniteTimeoutConfig(create: Duration,
version: Duration,
inspect: Duration,
rm: Duration,
run: Duration,
ps: Duration)

case class IgniteClientConfig(timeouts: IgniteTimeoutConfig)

class IgniteClient(dockerClient: DockerClient,
config: IgniteClientConfig = loadConfigOrThrow[IgniteClientConfig](ConfigKeys.igniteClient),
dockerConfig: DockerClientConfig = loadConfigOrThrow[DockerClientConfig](ConfigKeys.dockerClient))(
override implicit val executionContext: ExecutionContext,
implicit val system: ActorSystem,
implicit val log: Logging)
extends IgniteApi
with ProcessRunner {

protected val igniteCmd: Seq[String] = {
val alternatives = List("/usr/bin/ignite", "/usr/local/bin/ignite")

val dockerBin = Try {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

igniteBin

alternatives.find(a => Files.isExecutable(Paths.get(a))).get
} getOrElse {
throw new FileNotFoundException(s"Couldn't locate ignite binary (tried: ${alternatives.mkString(", ")}).")
}
Seq(dockerBin, "-q")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

igniteBin

}

// Invoke ignite CLI to determine client version.
// If the ignite client version cannot be determined, an exception will be thrown and instance initialization will fail.
// Rationale: if we cannot invoke `ignite version` successfully, it is unlikely subsequent `ignite` invocations will succeed.
protected def getClientVersion(): String = {
//TODO Ignite currently does not support formatting. So just get and log the verbatim version details
val vf = executeProcess(igniteCmd ++ Seq("version"), config.timeouts.version)
.andThen {
case Success(version) => log.info(this, s"Detected ignite client version $version")
case Failure(e) =>
log.error(this, s"Failed to determine ignite client version: ${e.getClass} - ${e.getMessage}")
}
Await.result(vf, 2 * config.timeouts.version)
}
val clientVersion: String = getClientVersion()

protected def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = {
val cmd = igniteCmd ++ args
val start = transid.started(
this,
LoggingMarkers.INVOKER_IGNITE_CMD(args.head),
s"running ${cmd.mkString(" ")} (timeout: $timeout)",
logLevel = InfoLevel)
executeProcess(cmd, timeout).andThen {
case Success(_) => transid.finished(this, start)
case Failure(pte: ProcessTimeoutException) =>
transid.failed(this, start, pte.getMessage, ErrorLevel)
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_IGNITE_CMD_TIMEOUT(args.head))
case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
}
}

override def inspectIPAddress(containerId: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress] =
dockerClient.inspectIPAddress(containerId, "bridge")

override def containerId(igniteId: IgniteId)(implicit transid: TransactionId): Future[ContainerId] = {
//Each ignite vm would be backed by a Docker container whose name would be `ignite-<vm id>`
//Use that to find the backing containerId
dockerClient
.runCmd(Seq("inspect", "--format", s"{{.Id}}", s"ignite-${igniteId.asString}"), config.timeouts.inspect)
.flatMap {
case "<no value>" => Future.failed(new NoSuchElementException)
case stdout => Future.successful(ContainerId(stdout))
}
}

override def run(image: String, args: Seq[String])(implicit transid: TransactionId): Future[IgniteId] = {
runCmd(Seq("run", image) ++ args, config.timeouts.run).map(IgniteId.apply)
}

private val importedImages = new TrieMap[String, Boolean]()
private val importsInFlight = TrieMap[String, Future[Boolean]]()
override def importImage(image: String)(implicit transid: TransactionId): Future[Boolean] = {
//TODO Add support for latest
if (importedImages.contains(image)) Future.successful(true)
else {
importsInFlight.getOrElseUpdate(
image, {
runCmd(Seq("image", "import", image), config.timeouts.create)
.map { stdout =>
log.info(this, s"Imported image $image - $stdout")
true
}
.andThen {
case _ =>
importsInFlight.remove(image)
importedImages.put(image, true)
}
})
}
}

override def rm(igniteId: IgniteId)(implicit transid: TransactionId): Future[Unit] =
runCmd(Seq("vm", "rm", igniteId.asString), config.timeouts.rm).map(_ => ())

override def stop(igniteId: IgniteId)(implicit transid: TransactionId): Future[Unit] =
runCmd(Seq("vm", "stop", igniteId.asString), config.timeouts.rm).map(_ => ())

override def listRunningVMs()(implicit transid: TransactionId): Future[Seq[VMInfo]] = {
//Each ignite vm has a backing container whose label is set to vm name and name to vm id
val filter = "--format='{{.ID }}|{{ .Label \"ignite.name\" }}|{{.Names}}'"
val cmd = Seq("ps", "--no-trunc", filter)
runCmd(cmd, config.timeouts.ps).map(_.linesIterator.toSeq.map(VMInfo.apply))
}
}

case class VMInfo(containerId: ContainerId, igniteId: IgniteId, name: String)

object VMInfo {
def apply(value: String): VMInfo = {
val Array(conatinerId, name, vmId) = value.split("|")
val igniteId = vmId.split("-").last
new VMInfo(ContainerId(conatinerId), IgniteId(igniteId), name)
}
}

trait IgniteApi {
protected implicit val executionContext: ExecutionContext

def inspectIPAddress(containerId: ContainerId)(implicit transid: TransactionId): Future[ContainerAddress]

def containerId(igniteId: IgniteId)(implicit transid: TransactionId): Future[ContainerId]

def run(image: String, args: Seq[String])(implicit transid: TransactionId): Future[IgniteId]

def importImage(image: String)(implicit transid: TransactionId): Future[Boolean]

def rm(igniteId: IgniteId)(implicit transid: TransactionId): Future[Unit]

def stop(igniteId: IgniteId)(implicit transid: TransactionId): Future[Unit]

def listRunningVMs()(implicit transid: TransactionId): Future[Seq[VMInfo]]

def stopAndRemove(igniteId: IgniteId)(implicit transid: TransactionId): Future[Unit] = {
for {
_ <- stop(igniteId)
_ <- rm(igniteId)
} yield Unit
}
}
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.containerpool.ignite

import java.time.Instant

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.util.ByteString
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.containerpool.logging.LogLine
import org.apache.openwhisk.core.containerpool.{
BlackboxStartupError,
Container,
ContainerAddress,
ContainerId,
WhiskContainerStartupError
}
import org.apache.openwhisk.core.entity.ByteSize
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.http.Messages
import spray.json._

import scala.concurrent.{ExecutionContext, Future}

case class IgniteId(asString: String) {
require(asString.nonEmpty, "IgniteId must not be empty")
}

object IgniteContainer {

def create(transid: TransactionId,
image: ImageName,
memory: ByteSize = 256.MB,
cpuShares: Int = 0,
name: Option[String] = None)(implicit
as: ActorSystem,
ec: ExecutionContext,
log: Logging,
config: IgniteConfig,
ignite: IgniteApi): Future[IgniteContainer] = {
implicit val tid: TransactionId = transid

val params = config.extraArgs.flatMap {
case (key, valueList) => valueList.toList.flatMap(Seq(key, _))
}
//TODO Environment handling

//TODO cpus - VM vCPU count, 1 or even numbers between 1 and 32 (default 1)
//It does not map to cpuShares currently. We may use it proportionally

//size - VM filesystem size, for example 5GB or 2048MB (default 4.0 GB)
val args = Seq("--cpus", 1.toString, "--memory", s"${memory.toMB}m", "--size", "1GB") ++ name
.map(n => Seq("--name", n))
.getOrElse(Seq.empty) ++ params

val imageToUse = image.publicImageName
for {
importSuccessful <- ignite.importImage(imageToUse)
igniteId <- ignite.run(imageToUse, args).recoverWith {
case _ =>
if (importSuccessful) {
Future.failed(WhiskContainerStartupError(Messages.resourceProvisionError))
} else {
Future.failed(BlackboxStartupError(Messages.imagePullError(imageToUse)))
}
}
containerId <- ignite.containerId(igniteId)
ip <- ignite.inspectIPAddress(containerId).recoverWith {
// remove the container immediately if inspect failed as
// we cannot recover that case automatically
case _ =>
ignite.rm(igniteId)
Future.failed(WhiskContainerStartupError(Messages.resourceProvisionError))
}
} yield new IgniteContainer(containerId, ip, igniteId)
}

}

class IgniteContainer(protected val id: ContainerId, protected val addr: ContainerAddress, igniteId: IgniteId)(
implicit
override protected val as: ActorSystem,
protected val ec: ExecutionContext,
protected val logging: Logging,
ignite: IgniteApi)
extends Container {

override def destroy()(implicit transid: TransactionId): Future[Unit] = {
super.destroy()
ignite.stopAndRemove(igniteId)
}

private val logMsg = "LogMessage are collected via Docker CLI"
override def logs(limit: ByteSize, waitForSentinel: Boolean)(
implicit transid: TransactionId): Source[ByteString, Any] =
Source.single(ByteString(LogLine(logMsg, "stdout", Instant.now.toString).toJson.compactPrint))

override def toString() = s"igniteId: ${igniteId.asString}, docker: ${id.asString}, address: $addr"
}