Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenWhisk User Events #4584

Merged
merged 74 commits into from
Sep 25, 2019
Merged

OpenWhisk User Events #4584

merged 74 commits into from
Sep 25, 2019

Conversation

selfxp
Copy link
Contributor

@selfxp selfxp commented Aug 16, 2019

This service connects to events topic and publishes the events to various services like Prometheus, Datadog etc via Kamon.

Related issue and scope

  • I opened an issue to propose and discuss this change (#????)

My changes affect the following components

  • API
  • Controller
  • Message Bus (e.g., Kafka)
  • Loadbalancer
  • Invoker
  • Intrinsic actions (e.g., sequences, conductors)
  • Data stores (e.g., CouchDB)
  • Tests
  • Deployment
  • CLI
  • General tooling
  • Documentation

Types of changes

  • Bug fix (generally a non-breaking change which closes an issue).
  • Enhancement or new feature (adds new functionality).
  • Breaking change (a bug fix or enhancement which changes existing behavior).

Checklist:

  • I signed an Apache CLA.
  • I reviewed the style guides and followed the recommendations (Travis CI will check :).
  • I added tests to cover my changes.
  • [] My changes require further changes to the documentation.
  • I updated the documentation where necessary.

chetanmeh and others added 30 commits October 25, 2018 11:11
Configures various build related steps including code coverage and scan

* Enable build scan, coverage
* Enable cache and scala format check
* Configure scala format and format all files
* Add badge
Have a single Akka http based server running to host endpoints for 

1. `/metrics` - Prometheus
2. `/ping` - Health check -  If the stream reading from Kafka fails then this health check would start failing.

Also add a basic test infrastructure for HTTP related code paths
Capture more metrics and add more tags to monitor cold starts etc
Switch to Prometheus Java Client and make Kamon integration optional and configurable. This allows to publish metric to Prometheus by default and still provide a way to publish to any other Kamon support system via config
Extract a trait to define metric names which can be adapted per metric system conventions
Enable use of Kamon API to track common metrics which are not specific to any action. As such metrics would be bounded its safe to use Kamon Prometheus Exporter to manage them.

This ensures that such metrics can be tracked via one implementation and only action specific metrics would be tracked via 2 implementations
}
}

user-events {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a convention any OpenWhisk specific config should be placed under whisk namespace.

So move these config under whisk namespace. This would require change in code where the config is read.

@style95
Copy link
Member

style95 commented Sep 3, 2019

This is great as we can modularize UserEvent exporter.
One question is, it looks a dedicated microservice, should this be maintained in the core repo?

@selfxp
Copy link
Contributor Author

selfxp commented Sep 4, 2019

Yeah, good question @style95, there was a voting going on related to the question of where this microservice should be moved and the community decided to have it in the main OW repo.
Please see the discussion here.

@style95
Copy link
Member

style95 commented Sep 5, 2019

@selfxp Thank you for letting me know. I missed it.

val nameArr = name.split("/", 2)
(nameArr(0), nameArr(1))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to split these in the Activation definition? (and avoid parsing these later etc)

override def serialize = toJson.compactPrint

def toJson = Activation.activationFormat.write(this)

def status: String = statusCode match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActivationResponse already has messageForCode() - can we use that instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes sense, I'll use that

private implicit val ec: ExecutionContext = system.dispatcher

//Record the rate of events received
private val activationCounter = Kamon.counter("openwhisk.userevents.global.activations")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the same metrics naming convention used in invoker/controller?
e.g.

#kamon related configuration
kamon {
    environment {
      # Identifier for this service. For keeping it backward compatible setting to natch previous
      # statsd name
      service = "openwhisk-statsd"
    }

then metric names like "global.activations"
This would make queries/tags/etc for user events metrics more like queries/tags/etc for invoker/controller metrics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was done more for compatability reasons and has issues like gauge/counter etc come in name. As this is new work I think we can with approach used in current impl

duration.record(a.duration.toMillis)
}

private def updatedSettings = settings.withProperty(ConsumerConfig.CLIENT_ID_CONFIG, id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth to use a unique name for each instance or leave it blank(to let Kafka define a unique name) so that we can run multiple instances for scalability and failover?

def eventConsumerSettings(config: Config): ConsumerSettings[String, String] =
ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
.withGroupId("kamon")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using the configuration file to make it possible for downstream to decide the default behavior?

private val activationMetrics = new TrieMap[String, ActivationKamonMetrics]
private val limitMetrics = new TrieMap[String, LimitKamonMetrics]

override def processActivation(activation: Activation, initiatorNamespace: String): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does initiatorNamespace mean here a user's namespace who invokes shared actions or web actions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps this pr #4609 makes it easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly, initiator would be the namespace that makes the invocation

val metricConfig = loadConfigOrThrow[MetricConfig](config, "whisk.user-events")

val prometheusRecorder = PrometheusRecorder(prometheusReporter)
val recorders = if (metricConfig.enableKamon) Seq(prometheusRecorder, KamonRecorder) else Seq(prometheusRecorder)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I am ok with the current version, I have one small question.
Can we make the default recorder and reporter configurable as well?
In some downstream, they may want to use KamonRecorder with some reporters as default instead of Prometheus.
In such a case, the Prometheus recorder will still record metrics without any use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I'll create an issue to keep track of this

Copy link
Member

@rabbah rabbah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally LGTM.

override def serialize = toJson.compactPrint

def toJson = Activation.activationFormat.write(this)

def status: String = statusCode match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

override def read(json: JsValue): Duration = json match {
case JsNumber(n) if n <= 0 => Duration.Zero
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw this would suggest a bug.


override def read(json: JsValue): Duration = json match {
case JsNumber(n) if n <= 0 => Duration.Zero
case JsNumber(n) => toDuration(n.longValue())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; drop () in longValue() and in isFinite() above.

def parse(msg: String) = Try(activationFormat.read(msg.parseJson))

val statusSuccess = "success"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these would get removed as well if you reuse existing code to map status code to a string.

* Extract namespace and action from name
* ex. whisk.system/apimgmt/createApi -> (whisk.system, apimgmt/createApi)
*/
def getNamespaceAndActionName(name: String): (String, String) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may be helpful to use EntityPath(name).toFullyQualifiedEntityName instead as there could be an optional a leading slash... up to you.

@@ -223,6 +256,15 @@ object Activation extends DefaultJsonProtocol {
"memory",
"causedBy")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all these field names are defined in WhiskActivation - if there are some not defined there, they should be - this keeps the list in one canonical place. While not related to this PR, we should fix this either here or independently.

Copy link
Contributor Author

@selfxp selfxp Sep 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I'll create an issue for this

private val activationMetrics = new TrieMap[String, ActivationKamonMetrics]
private val limitMetrics = new TrieMap[String, LimitKamonMetrics]

override def processActivation(activation: Activation, initiatorNamespace: String): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps this pr #4609 makes it easier.

val textV4: ContentType = ContentType.parse("text/plain; version=0.0.4; charset=utf-8").right.get
}

class PrometheusEventsApi(consumer: EventConsumer, prometheus: PrometheusExporter) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this service authenticated/secured?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not, I guess it depends on the environment on where it's being deployed. In openwhisk-deploy-kube I have a PR where the service would be exposed through the gateway (#520), and so the authentication strategy could be defined there


# OpenWhisk User Events

This service connects to `events` topic and publishes the events to various services like Prometheus, Datadog etc via Kamon. Refer to [user specific metrics][1] on how to enable them.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to extend https://github.com/apache/openwhisk/blob/master/docs/metrics.md#user-specific-metrics to point to the metrics emitter provided by this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 yes good idea, adding a reference

override def serialize = toJson.compactPrint
def entityPath = EntityPath(name).toFullyQualifiedEntityName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Define return type for public methods. Like entityPath

case ApplicationError => "application error"
case DeveloperError => "action developer error"
case WhiskError => "whisk internal error"
case 0 => ActivationResponse.statusSuccess
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to retain use of existing constants.

}
}

protected[core] def messageForCode(code: Int) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: messageForCode is called in critical frequent use path. So to avoid unnecessary string computation it may be better to also keep the computed values as val and just reuse them

@@ -145,6 +145,10 @@ object WhiskActivation
val conductorAnnotation = "conductor"
val timeoutAnnotation = "timeout"

val memoryAnnotation = "memory"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not annotations. So keep the name without annotation suffix

"conductor",
"memory",
"causedBy")
WhiskActivation.statusCodeAnnotation,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be avoided as it does not serve much purpose to use those annotations value as constant (some are not annotation also like memory). So it would be fine to revert to preexisting way

@@ -82,8 +82,8 @@ case class WhiskActivation(namespace: EntityPath,
if (end != Instant.EPOCH) {
Map(
"end" -> end.toJson,
"duration" -> (duration getOrElse (end.toEpochMilli - start.toEpochMilli)).toJson,
"statusCode" -> response.statusCode.toJson)
WhiskActivation.durationAnnotation -> (duration getOrElse (end.toEpochMilli - start.toEpochMilli)).toJson,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change of using existing constants can be avoided. Using string literals is fine here

TestReporter.histogram(initTimeMetric, actionWithCustomPackage).size shouldBe 1
TestReporter.histogram(durationMetric, actionWithCustomPackage).size shouldBe 1

// // Default package
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is duplicated annotation

@chetanmeh chetanmeh merged commit 3e89aa5 into apache:master Sep 25, 2019
@chetanmeh
Copy link
Member

Thanks to all the reviewers for the valuable feedback and @selfxp for following up on them. This PR is now merged and OpenWhisk setups can now leverage this service to expose action level metrics to end users

BillZong pushed a commit to BillZong/openwhisk that referenced this pull request Nov 18, 2019
The user event service enables aggregating the metric events sent on `events` topic and expose them as Prometheus (or Kamon) metrics. Out of the box dashboards are provided for the Grafana/Prometheus mode which provide detailed insights on performance metrics at cluster/namespace/action level.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants