-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OpenWhisk User Events #4584
OpenWhisk User Events #4584
Changes from 63 commits
b8be6a3
79472f5
fd045a6
79874c9
2606557
9f76657
b581155
5b53b14
bd5198d
4ed0f20
05bba0a
dc647f0
43ccd3c
196e17e
82d3e8a
3448f7c
cddb3a8
6abf7b3
8d54bdb
9e85892
2387fc9
b8e5786
65a00ef
5adff69
2355539
b8f83e2
d926df7
ba54fbf
65dabf5
355de66
685b3f4
80d2238
7a7ab91
78231f7
529d54d
6402e14
ab6e746
1f6338c
af51a02
b62584a
d7cf007
870e6da
c7b558d
19037a7
156eb9f
0ebf922
394ad49
9b22c7c
44451a3
bbea918
956ed11
3dc4c06
6344437
074d542
7030786
77fa8dc
81b3668
8e4746b
a46c4b8
43a9e12
f859355
5b96f3e
89bff99
f04275a
4965397
a38c668
cc6329d
e8d9234
fa27d59
8081a1f
ff9a141
9876c4c
38b3174
09bd41e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,8 @@ import scala.util.Try | |
import spray.json._ | ||
import org.apache.openwhisk.common.TransactionId | ||
import org.apache.openwhisk.core.entity._ | ||
import scala.concurrent.duration._ | ||
import java.util.concurrent.TimeUnit | ||
|
||
/** Basic trait for messages that are sent on a message bus connector. */ | ||
trait Message { | ||
|
@@ -194,22 +196,53 @@ object EventMessageBody extends DefaultJsonProtocol { | |
|
||
case class Activation(name: String, | ||
statusCode: Int, | ||
duration: Long, | ||
waitTime: Long, | ||
initTime: Long, | ||
duration: Duration, | ||
waitTime: Duration, | ||
initTime: Duration, | ||
kind: String, | ||
conductor: Boolean, | ||
memory: Int, | ||
causedBy: Option[String]) | ||
extends EventMessageBody { | ||
val typeName = "Activation" | ||
val typeName = Activation.typeName | ||
override def serialize = toJson.compactPrint | ||
|
||
def toJson = Activation.activationFormat.write(this) | ||
|
||
def status: String = statusCode match { | ||
// Defined in ActivationResponse | ||
case 0 => Activation.statusSuccess | ||
case 1 => Activation.statusApplicationError | ||
case 2 => Activation.statusDeveloperError | ||
case 3 => Activation.statusInternalError | ||
case x => x.toString | ||
} | ||
|
||
def isColdStart: Boolean = initTime != Duration.Zero | ||
} | ||
|
||
object Activation extends DefaultJsonProtocol { | ||
|
||
val typeName = "Activation" | ||
def parse(msg: String) = Try(activationFormat.read(msg.parseJson)) | ||
|
||
val statusSuccess = "success" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these would get removed as well if you reuse existing code to map status code to a string. |
||
val statusApplicationError = "application_error" | ||
val statusDeveloperError = "developer_error" | ||
val statusInternalError = "internal_error" | ||
|
||
private implicit val durationFormat = new RootJsonFormat[Duration] { | ||
override def write(obj: Duration): JsValue = obj match { | ||
case o if o.isFinite() => JsNumber(o.toMillis) | ||
case _ => JsNumber.zero | ||
} | ||
|
||
override def read(json: JsValue): Duration = json match { | ||
case JsNumber(n) if n <= 0 => Duration.Zero | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fwiw this would suggest a bug. |
||
case JsNumber(n) => toDuration(n.longValue()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit; drop |
||
} | ||
} | ||
|
||
implicit val activationFormat = | ||
jsonFormat( | ||
Activation.apply _, | ||
|
@@ -223,6 +256,15 @@ object Activation extends DefaultJsonProtocol { | |
"memory", | ||
"causedBy") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. all these field names are defined in WhiskActivation - if there are some not defined there, they should be - this keeps the list in one canonical place. While not related to this PR, we should fix this either here or independently. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 I'll create an issue for this |
||
|
||
/** | ||
* Extract namespace and action from name | ||
* ex. whisk.system/apimgmt/createApi -> (whisk.system, apimgmt/createApi) | ||
*/ | ||
def getNamespaceAndActionName(name: String): (String, String) = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it may be helpful to use |
||
val nameArr = name.split("/", 2) | ||
(nameArr(0), nameArr(1)) | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason not to split these in the |
||
/** Constructs an "Activation" event from a WhiskActivation */ | ||
def from(a: WhiskActivation): Try[Activation] = { | ||
for { | ||
|
@@ -234,9 +276,9 @@ object Activation extends DefaultJsonProtocol { | |
Activation( | ||
fqn, | ||
a.response.statusCode, | ||
a.duration.getOrElse(0), | ||
a.annotations.getAs[Long](WhiskActivation.waitTimeAnnotation).getOrElse(0), | ||
a.annotations.getAs[Long](WhiskActivation.initTimeAnnotation).getOrElse(0), | ||
toDuration(a.duration.getOrElse(0)), | ||
toDuration(a.annotations.getAs[Long](WhiskActivation.waitTimeAnnotation).getOrElse(0)), | ||
toDuration(a.annotations.getAs[Long](WhiskActivation.initTimeAnnotation).getOrElse(0)), | ||
kind, | ||
a.annotations.getAs[Boolean](WhiskActivation.conductorAnnotation).getOrElse(false), | ||
a.annotations | ||
|
@@ -246,6 +288,8 @@ object Activation extends DefaultJsonProtocol { | |
a.annotations.getAs[String](WhiskActivation.causedByAnnotation).toOption) | ||
} | ||
} | ||
|
||
def toDuration(milliseconds: Long) = new FiniteDuration(milliseconds, TimeUnit.MILLISECONDS) | ||
} | ||
|
||
case class Metric(metricName: String, metricValue: Long) extends EventMessageBody { | ||
|
@@ -255,6 +299,7 @@ case class Metric(metricName: String, metricValue: Long) extends EventMessageBod | |
} | ||
|
||
object Metric extends DefaultJsonProtocol { | ||
val typeName = "Metric" | ||
chetanmeh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def parse(msg: String) = Try(metricFormat.read(msg.parseJson)) | ||
implicit val metricFormat = jsonFormat(Metric.apply _, "metricName", "metricValue") | ||
} | ||
|
@@ -280,5 +325,5 @@ object EventMessage extends DefaultJsonProtocol { | |
} | ||
} | ||
|
||
def parse(msg: String) = format.read(msg.parseJson) | ||
def parse(msg: String) = Try(format.read(msg.parseJson)) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
* | ||
!transformEnvironment.sh | ||
!init.sh | ||
!build/distributions | ||
!Dockerfile |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
FROM scala | ||
|
||
ENV UID=1001 \ | ||
NOT_ROOT_USER=owuser | ||
|
||
# Copy app jars | ||
ADD build/distributions/user-events.tar / | ||
|
||
COPY init.sh / | ||
RUN chmod +x init.sh | ||
|
||
RUN adduser -D -u ${UID} -h /home/${NOT_ROOT_USER} -s /bin/bash ${NOT_ROOT_USER} | ||
USER ${NOT_ROOT_USER} | ||
|
||
# Prometheus port | ||
EXPOSE 9095 | ||
CMD ["./init.sh", "0"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
<!-- | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
--> | ||
|
||
# ![OpenWhisk User Events](https://raw.githubusercontent.com/apache/openwhisk/master/core/monitoring/user-events/images/demo_landing.png) | ||
|
||
# OpenWhisk User Events | ||
|
||
This service connects to `events` topic and publishes the events to various services like Prometheus, Datadog etc via Kamon. Refer to [user specific metrics][1] on how to enable them. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest to extend https://github.com/apache/openwhisk/blob/master/docs/metrics.md#user-specific-metrics to point to the metrics emitter provided by this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 yes good idea, adding a reference |
||
|
||
|
||
## Local Run | ||
>First configure and run `openwhisk docker-compose` that can be found in the [openwhisk-tools][2] project. | ||
|
||
- Start service inside the cluster (on the same docker-compose network: `openwhisk_default`) | ||
- The service will be available on port `9095` | ||
- The endpoint for exposing the metrics for Prometheus can be found on `/metrics`. | ||
|
||
## Usage | ||
|
||
The service needs the following env variables to be set | ||
|
||
- `KAFKA_HOSTS` - For local env it can be set to `172.17.0.1:9093`. When using [OpenWhisk Devtools][2] based setup use `kafka` | ||
|
||
Integrations | ||
------------ | ||
|
||
#### Prometheus | ||
The docker container would run the service and expose the metrics in format required by [Prometheus][3] at `9095` port | ||
|
||
#### Grafana | ||
The `Openwhisk - Action Performance Metrics` Grafana[4] dashboard is available on localhost port `3000` at this address: | ||
http://localhost:3000/d/Oew1lvymk/openwhisk-action-performance-metrics | ||
|
||
The latest version of the dashboard can be found in the "compose/dashboard/openwhisk_events.json" | ||
|
||
[1]: https://github.com/apache/incubator-openwhisk/blob/master/docs/metrics.md#user-specific-metrics | ||
[2]: https://github.com/apache/incubator-openwhisk-devtools/tree/master/docker-compose | ||
[3]: https://hub.docker.com/r/prom/prometheus/ | ||
[4]: https://hub.docker.com/r/grafana/grafana/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
apply plugin: 'scala' | ||
apply plugin: 'application' | ||
apply plugin: 'org.scoverage' | ||
|
||
ext.dockerImageName = 'user-events' | ||
apply from: "../../../gradle/docker.gradle" | ||
distDocker.dependsOn ':common:scala:distDocker', 'distTar' | ||
|
||
project.archivesBaseName = "openwhisk-user-events" | ||
|
||
repositories { | ||
mavenCentral() | ||
} | ||
|
||
dependencies { | ||
compile "org.scala-lang:scala-library:${gradle.scala.version}" | ||
compile project(':common:scala') | ||
|
||
compile 'com.typesafe.akka:akka-stream-kafka_2.12:0.22' | ||
|
||
compile 'io.prometheus:simpleclient:0.6.0' | ||
compile 'io.prometheus:simpleclient_common:0.6.0' | ||
|
||
testCompile 'junit:junit:4.11' | ||
testCompile 'org.scalatest:scalatest_2.12:3.0.1' | ||
testCompile 'net.manub:scalatest-embedded-kafka_2.12:2.0.0' | ||
testCompile 'com.typesafe.akka:akka-testkit_2.12:2.5.17' | ||
testCompile 'com.typesafe.akka:akka-stream-testkit_2.12:2.5.17' | ||
testCompile 'com.typesafe.akka:akka-http-testkit_2.12:10.1.5' | ||
} | ||
|
||
tasks.withType(ScalaCompile) { | ||
scalaCompileOptions.additionalParameters = gradle.scala.compileFlags | ||
} | ||
|
||
mainClassName = "org.apache.openwhisk.core.monitoring.metrics.Main" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ActivationResponse already has
messageForCode()
- can we use that instead?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah makes sense, I'll use that