Skip to content

Commit

Permalink
[SPARK-29032][CORE] Simplify Prometheus support by adding PrometheusS…
Browse files Browse the repository at this point in the history
…ervlet
  • Loading branch information
dongjoon-hyun committed Sep 10, 2019
1 parent 962e330 commit 65cf440
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
prop.setProperty("*.sink.servlet.path", "/metrics/json")
prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")

prop.setProperty("*.sink.prometheusServlet.class",
"org.apache.spark.metrics.sink.PrometheusServlet")
prop.setProperty("*.sink.prometheusServlet.path", "/metrics/prometheus")
prop.setProperty("master.sink.prometheusServlet.path", "/metrics/master/prometheus")
prop.setProperty("applications.sink.prometheusServlet.path", "/metrics/applications/prometheus")
}

/**
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.sink.{MetricsServlet, Sink}
import org.apache.spark.metrics.sink.{MetricsServlet, PrometheusServlet, Sink}
import org.apache.spark.metrics.source.{Source, StaticSources}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -83,13 +83,15 @@ private[spark] class MetricsSystem private (

// Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
private var metricsServlet: Option[MetricsServlet] = None
private var prometheusServlet: Option[PrometheusServlet] = None

/**
* Get any UI handlers used by this metrics system; can only be called after start().
*/
def getServletHandlers: Array[ServletContextHandler] = {
require(running, "Can only call getServletHandlers on a running MetricsSystem")
metricsServlet.map(_.getHandlers(conf)).getOrElse(Array())
metricsServlet.map(_.getHandlers(conf)).getOrElse(Array()) ++
prometheusServlet.map(_.getHandlers(conf)).getOrElse(Array())
}

metricsConfig.initialize()
Expand Down Expand Up @@ -201,6 +203,12 @@ private[spark] class MetricsSystem private (
classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
metricsServlet = Some(servlet)
} else if (kv._1 == "prometheusServlet") {
val servlet = Utils.classForName[PrometheusServlet](classPath)
.getConstructor(
classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
prometheusServlet = Some(servlet)
} else {
val sink = Utils.classForName[Sink](classPath)
.getConstructor(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.metrics.sink

import java.util.Properties
import javax.servlet.http.HttpServletRequest

import com.codahale.metrics.MetricRegistry
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.ui.JettyUtils._

private[spark] class PrometheusServlet(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
extends Sink {

val SERVLET_KEY_PATH = "path"

val servletPath = property.getProperty(SERVLET_KEY_PATH)

def getHandlers(conf: SparkConf): Array[ServletContextHandler] = {
Array[ServletContextHandler](
createServletHandler(servletPath,
new ServletParams(request => getMetricsSnapshot(request), "text/plain;charset=UTF-8"), conf)
)
}

// The following aims to be consistent with /metrics/json result in terms of item ordering
// and with Spark JMX Sink + Prometheus JMX Converter result in terms of key/value string format.
def getMetricsSnapshot(request: HttpServletRequest): String = {
import scala.collection.JavaConverters._

val sb = new StringBuilder()
registry.getGauges.asScala.foreach { case (k, v) =>
if (!v.getValue.isInstanceOf[String]) {
sb.append(s"${normalizeKey(k)}Value ${v.getValue}\n")
}
}
registry.getCounters.asScala.foreach { case (k, v) =>
sb.append(s"${normalizeKey(k)}Count ${v.getCount}\n")
}
registry.getHistograms.asScala.foreach { case (k, h) =>
val snapshot = h.getSnapshot
val prefix = normalizeKey(k)
sb.append(prefix + "Count " + h.getCount + "\n")
sb.append(prefix + "Max " + snapshot.getMax + "\n")
sb.append(prefix + "Mean " + snapshot.getMax + "\n")
sb.append(prefix + "Min " + snapshot.getMin + "\n")
sb.append(prefix + "50thPercentile " + snapshot.getMedian + "\n")
sb.append(prefix + "75thPercentile " + snapshot.get75thPercentile + "\n")
sb.append(prefix + "95thPercentile " + snapshot.get95thPercentile + "\n")
sb.append(prefix + "98thPercentile " + snapshot.get98thPercentile + "\n")
sb.append(prefix + "99thPercentile " + snapshot.get99thPercentile + "\n")
sb.append(prefix + "999thPercentile " + snapshot.get999thPercentile + "\n")
sb.append(prefix + "StdDev " + snapshot.getStdDev + "\n")
}
registry.getMeters.entrySet.iterator.asScala.foreach { kv =>
val prefix = normalizeKey(kv.getKey)
val meter = kv.getValue
sb.append(prefix + "Count " + meter.getCount + "\n")
sb.append(prefix + "MeanRate " + meter.getMeanRate + "\n")
sb.append(prefix + "OneMinuteRate " + meter.getOneMinuteRate + "\n")
sb.append(prefix + "FiveMinuteRate " + meter.getFiveMinuteRate + "\n")
sb.append(prefix + "FifteenMinuteRate " + meter.getFifteenMinuteRate + "\n")
}
registry.getTimers.entrySet.iterator.asScala.foreach { kv =>
val prefix = normalizeKey(kv.getKey)
val timer = kv.getValue
val snapshot = timer.getSnapshot
sb.append(prefix + "Count " + timer.getCount + "\n")
sb.append(prefix + "Max " + snapshot.getMax + "\n")
sb.append(prefix + "Mean " + snapshot.getMax + "\n")
sb.append(prefix + "Min " + snapshot.getMin + "\n")
sb.append(prefix + "50thPercentile " + snapshot.getMedian + "\n")
sb.append(prefix + "75thPercentile " + snapshot.get75thPercentile + "\n")
sb.append(prefix + "95thPercentile " + snapshot.get95thPercentile + "\n")
sb.append(prefix + "98thPercentile " + snapshot.get98thPercentile + "\n")
sb.append(prefix + "99thPercentile " + snapshot.get99thPercentile + "\n")
sb.append(prefix + "999thPercentile " + snapshot.get999thPercentile + "\n")
sb.append(prefix + "StdDev " + snapshot.getStdDev + "\n")
sb.append(prefix + "FifteenMinuteRate " + timer.getFifteenMinuteRate + "\n")
sb.append(prefix + "FiveMinuteRate " + timer.getFiveMinuteRate + "\n")
sb.append(prefix + "OneMinuteRate " + timer.getOneMinuteRate + "\n")
sb.append(prefix + "MeanRate " + timer.getMeanRate + "\n")
}
sb.toString()
}

private def normalizeKey(key: String): String = {
s"metrics_${key.replaceAll("[^a-zA-Z0-9]", "_")}_"
}

override def start() { }

override def stop() { }

override def report() { }
}

0 comments on commit 65cf440

Please sign in to comment.