Skip to content

Commit

Permalink
OpenWhisk User Events (#4584)
Browse files Browse the repository at this point in the history
The user event service enables aggregating the metric events sent on `events` topic and expose them as Prometheus (or Kamon) metrics. Out of the box dashboards are provided for the Grafana/Prometheus mode which provide detailed insights on performance metrics at cluster/namespace/action level.
  • Loading branch information
selfxp authored and chetanmeh committed Sep 25, 2019
1 parent 5de865c commit 3e89aa5
Show file tree
Hide file tree
Showing 37 changed files with 4,139 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import scala.util.Try
import spray.json._
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.entity._
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit
import org.apache.openwhisk.core.entity.ActivationResponse.statusForCode

/** Basic trait for messages that are sent on a message bus connector. */
trait Message {
Expand Down Expand Up @@ -283,22 +286,47 @@ object EventMessageBody extends DefaultJsonProtocol {

case class Activation(name: String,
statusCode: Int,
duration: Long,
waitTime: Long,
initTime: Long,
duration: Duration,
waitTime: Duration,
initTime: Duration,
kind: String,
conductor: Boolean,
memory: Int,
causedBy: Option[String])
extends EventMessageBody {
val typeName = "Activation"
val typeName = Activation.typeName
override def serialize = toJson.compactPrint
def entityPath: FullyQualifiedEntityName = EntityPath(name).toFullyQualifiedEntityName

def toJson = Activation.activationFormat.write(this)

def status: String = statusForCode(statusCode)

def isColdStart: Boolean = initTime != Duration.Zero

def namespace: String = entityPath.path.root.name

def action: String = entityPath.fullPath.relativePath.get.namespace

}

object Activation extends DefaultJsonProtocol {

val typeName = "Activation"
def parse(msg: String) = Try(activationFormat.read(msg.parseJson))

private implicit val durationFormat = new RootJsonFormat[Duration] {
override def write(obj: Duration): JsValue = obj match {
case o if o.isFinite => JsNumber(o.toMillis)
case _ => JsNumber.zero
}

override def read(json: JsValue): Duration = json match {
case JsNumber(n) if n <= 0 => Duration.Zero
case JsNumber(n) => toDuration(n.longValue)
}
}

implicit val activationFormat =
jsonFormat(
Activation.apply _,
Expand All @@ -323,9 +351,9 @@ object Activation extends DefaultJsonProtocol {
Activation(
fqn,
a.response.statusCode,
a.duration.getOrElse(0),
a.annotations.getAs[Long](WhiskActivation.waitTimeAnnotation).getOrElse(0),
a.annotations.getAs[Long](WhiskActivation.initTimeAnnotation).getOrElse(0),
toDuration(a.duration.getOrElse(0)),
toDuration(a.annotations.getAs[Long](WhiskActivation.waitTimeAnnotation).getOrElse(0)),
toDuration(a.annotations.getAs[Long](WhiskActivation.initTimeAnnotation).getOrElse(0)),
kind,
a.annotations.getAs[Boolean](WhiskActivation.conductorAnnotation).getOrElse(false),
a.annotations
Expand All @@ -335,6 +363,8 @@ object Activation extends DefaultJsonProtocol {
a.annotations.getAs[String](WhiskActivation.causedByAnnotation).toOption)
}
}

def toDuration(milliseconds: Long) = new FiniteDuration(milliseconds, TimeUnit.MILLISECONDS)
}

case class Metric(metricName: String, metricValue: Long) extends EventMessageBody {
Expand All @@ -344,6 +374,7 @@ case class Metric(metricName: String, metricValue: Long) extends EventMessageBod
}

object Metric extends DefaultJsonProtocol {
val typeName = "Metric"
def parse(msg: String) = Try(metricFormat.read(msg.parseJson))
implicit val metricFormat = jsonFormat(Metric.apply _, "metricName", "metricValue")
}
Expand All @@ -369,5 +400,5 @@ object EventMessage extends DefaultJsonProtocol {
}
}

def parse(msg: String) = format.read(msg.parseJson)
def parse(msg: String) = Try(format.read(msg.parseJson))
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,23 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
val DeveloperError = 2 // action ran but failed to handle an error, or action did not run and failed to initialize
val WhiskError = 3 // internal system error

val statusSuccess = "success"
val statusApplicationError = "application_error"
val statusDeveloperError = "action_developer_error"
val statusWhiskError = "whisk_internal_error"

protected[core] def statusForCode(code: Int) = {
require(code >= 0 && code <= 3)
code match {
case Success => statusSuccess
case ApplicationError => statusApplicationError
case DeveloperError => statusDeveloperError
case WhiskError => statusWhiskError
}
}

protected[core] def messageForCode(code: Int) = {
require(code >= Success && code <= WhiskError)
require(code >= 0 && code <= 3)
code match {
case Success => "success"
case ApplicationError => "application error"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ object WhiskActivation
val conductorAnnotation = "conductor"
val timeoutAnnotation = "timeout"

val memory = "memory"
val duration = "duration"
val statusCode = "statusCode"

/** Some field names for compositions */
val actionField = "action"
val paramsField = "params"
Expand Down
5 changes: 5 additions & 0 deletions core/monitoring/user-events/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
*
!transformEnvironment.sh
!init.sh
!build/distributions
!Dockerfile
34 changes: 34 additions & 0 deletions core/monitoring/user-events/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

FROM scala

ENV UID=1001 \
NOT_ROOT_USER=owuser

# Copy app jars
ADD build/distributions/user-events.tar /

COPY init.sh /
RUN chmod +x init.sh

RUN adduser -D -u ${UID} -h /home/${NOT_ROOT_USER} -s /bin/bash ${NOT_ROOT_USER}
USER ${NOT_ROOT_USER}

# Prometheus port
EXPOSE 9095
CMD ["./init.sh", "0"]
55 changes: 55 additions & 0 deletions core/monitoring/user-events/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<!--
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
-->

# ![OpenWhisk User Events](https://raw.githubusercontent.com/apache/openwhisk/master/core/monitoring/user-events/images/demo_landing.png)

# OpenWhisk User Events

This service connects to `events` topic and publishes the events to various services like Prometheus, Datadog etc via Kamon. Refer to [user specific metrics][1] on how to enable them.


## Local Run
>First configure and run `openwhisk docker-compose` that can be found in the [openwhisk-tools][2] project.
- Start service inside the cluster (on the same docker-compose network: `openwhisk_default`)
- The service will be available on port `9095`
- The endpoint for exposing the metrics for Prometheus can be found on `/metrics`.

## Usage

The service needs the following env variables to be set

- `KAFKA_HOSTS` - For local env it can be set to `172.17.0.1:9093`. When using [OpenWhisk Devtools][2] based setup use `kafka`

Integrations
------------

#### Prometheus
The docker container would run the service and expose the metrics in format required by [Prometheus][3] at `9095` port

#### Grafana
The `Openwhisk - Action Performance Metrics` Grafana[4] dashboard is available on localhost port `3000` at this address:
http://localhost:3000/d/Oew1lvymk/openwhisk-action-performance-metrics

The latest version of the dashboard can be found in the "compose/dashboard/openwhisk_events.json"

[1]: https://github.com/apache/incubator-openwhisk/blob/master/docs/metrics.md#user-specific-metrics
[2]: https://github.com/apache/incubator-openwhisk-devtools/tree/master/docker-compose
[3]: https://hub.docker.com/r/prom/prometheus/
[4]: https://hub.docker.com/r/grafana/grafana/
53 changes: 53 additions & 0 deletions core/monitoring/user-events/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply plugin: 'scala'
apply plugin: 'application'
apply plugin: 'org.scoverage'

ext.dockerImageName = 'user-events'
apply from: "../../../gradle/docker.gradle"
distDocker.dependsOn ':common:scala:distDocker', 'distTar'

project.archivesBaseName = "openwhisk-user-events"

repositories {
mavenCentral()
}

dependencies {
compile "org.scala-lang:scala-library:${gradle.scala.version}"
compile project(':common:scala')

compile 'com.typesafe.akka:akka-stream-kafka_2.12:0.22'

compile 'io.prometheus:simpleclient:0.6.0'
compile 'io.prometheus:simpleclient_common:0.6.0'

testCompile 'junit:junit:4.11'
testCompile 'org.scalatest:scalatest_2.12:3.0.1'
testCompile 'net.manub:scalatest-embedded-kafka_2.12:2.0.0'
testCompile 'com.typesafe.akka:akka-testkit_2.12:2.5.17'
testCompile 'com.typesafe.akka:akka-stream-testkit_2.12:2.5.17'
testCompile 'com.typesafe.akka:akka-http-testkit_2.12:10.1.5'
}

tasks.withType(ScalaCompile) {
scalaCompileOptions.additionalParameters = gradle.scala.compileFlags
}

mainClassName = "org.apache.openwhisk.core.monitoring.metrics.Main"
Loading

0 comments on commit 3e89aa5

Please sign in to comment.