From 208ba7b3063f0d3e51fa2f8600ff79fdec0e44d1 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 28 Mar 2016 10:01:43 +0800 Subject: [PATCH 01/11] Refactor and expose metrics system Source and Sink interface --- .../apache/spark/metrics/MetricsSystem.scala | 20 ++++++++---- .../spark/metrics/sink/ConsoleSink.scala | 14 +++++---- .../apache/spark/metrics/sink/CsvSink.scala | 24 +++++++------- .../spark/metrics/sink/GraphiteSink.scala | 21 +++++++------ .../apache/spark/metrics/sink/JmxSink.scala | 8 +++-- .../spark/metrics/sink/MetricsServlet.scala | 12 +++---- .../org/apache/spark/metrics/sink/Sink.scala | 31 ++++++++++++++++++- .../apache/spark/metrics/sink/Slf4jSink.scala | 14 ++++----- .../apache/spark/metrics/source/Source.scala | 25 ++++++++++++++- .../spark/metrics/sink/GangliaSink.scala | 23 +++++++------- 10 files changed, 130 insertions(+), 62 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 3457a2632277..45b82699340c 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -21,6 +21,7 @@ import java.util.Properties import java.util.concurrent.TimeUnit import scala.collection.mutable +import scala.util.control.NonFatal import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry} import org.eclipse.jetty.servlet.ServletContextHandler @@ -194,19 +195,26 @@ private[spark] class MetricsSystem private ( sinkConfigs.foreach { kv => val classPath = kv._2.getProperty("class") if (null != classPath) { + val cls = Utils.classForName(classPath) try { - val sink = Utils.classForName(classPath) - .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) - .newInstance(kv._2, registry, securityMgr) + val sink = cls.getConstructor( + classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) + .newInstance(kv._2, registry, securityMgr) if (kv._1 == "servlet") { metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) } else { sinks += sink.asInstanceOf[Sink] } } catch { - case e: Exception => - logError("Sink class " + classPath + " cannot be instantiated") - throw e + case _: NoSuchMethodException => + try { + sinks += cls.getConstructor(classOf[Properties], classOf[MetricRegistry]) + .asInstanceOf[Sink] + } catch { + case NonFatal(e) => logError("Sink class " + classPath + " cannot be instantiated", e) + } + + case NonFatal(e) => logError("Sink class " + classPath + " cannot be instantiated", e) } } } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala index fce556fd0382..97f324c7a580 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala @@ -25,27 +25,29 @@ import com.codahale.metrics.{ConsoleReporter, MetricRegistry} import org.apache.spark.SecurityManager import org.apache.spark.metrics.MetricsSystem -private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { +private[spark] class ConsoleSink( + property: Properties, + registry: MetricRegistry, + securityMgr: SecurityManager) extends Sink(property, registry) { val CONSOLE_DEFAULT_PERIOD = 10 val CONSOLE_DEFAULT_UNIT = "SECONDS" val CONSOLE_KEY_PERIOD = "period" val CONSOLE_KEY_UNIT = "unit" - val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match { + private val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match { case Some(s) => s.toInt case None => CONSOLE_DEFAULT_PERIOD } - val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)) + private val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match { + case Some(s) => TimeUnit.valueOf(s.toUpperCase()) case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT) } MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry) + private val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build() diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index 88bba2fdbd1c..f6b5ff7e46f9 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -26,8 +26,10 @@ import com.codahale.metrics.{CsvReporter, MetricRegistry} import org.apache.spark.SecurityManager import org.apache.spark.metrics.MetricsSystem -private[spark] class CsvSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { +private[spark] class CsvSink( + property: Properties, + registry: MetricRegistry, + securityMgr: SecurityManager) extends Sink(property, registry) { val CSV_KEY_PERIOD = "period" val CSV_KEY_UNIT = "unit" val CSV_KEY_DIR = "directory" @@ -36,28 +38,28 @@ private[spark] class CsvSink(val property: Properties, val registry: MetricRegis val CSV_DEFAULT_UNIT = "SECONDS" val CSV_DEFAULT_DIR = "/tmp/" - val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match { + private val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match { case Some(s) => s.toInt case None => CSV_DEFAULT_PERIOD } - val pollUnit: TimeUnit = Option(property.getProperty(CSV_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)) + private val pollUnit: TimeUnit = Option(property.getProperty(CSV_KEY_UNIT)) match { + case Some(s) => TimeUnit.valueOf(s.toUpperCase()) case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT) } MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match { + private val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match { case Some(s) => s case None => CSV_DEFAULT_DIR } - val reporter: CsvReporter = CsvReporter.forRegistry(registry) - .formatFor(Locale.US) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .convertRatesTo(TimeUnit.SECONDS) - .build(new File(pollDir)) + private val reporter: CsvReporter = CsvReporter.forRegistry(registry) + .formatFor(Locale.US) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .build(new File(pollDir)) override def start() { reporter.start(pollPeriod, pollUnit) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index ac33e68abb49..c67ef21b60c6 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -17,7 +17,6 @@ package org.apache.spark.metrics.sink -import java.net.InetSocketAddress import java.util.{Locale, Properties} import java.util.concurrent.TimeUnit @@ -27,8 +26,10 @@ import com.codahale.metrics.graphite.{Graphite, GraphiteReporter, GraphiteUDP} import org.apache.spark.SecurityManager import org.apache.spark.metrics.MetricsSystem -private[spark] class GraphiteSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { +private[spark] class GraphiteSink( + property: Properties, + registry: MetricRegistry, + securityMgr: SecurityManager) extends Sink(property, registry) { val GRAPHITE_DEFAULT_PERIOD = 10 val GRAPHITE_DEFAULT_UNIT = "SECONDS" val GRAPHITE_DEFAULT_PREFIX = "" @@ -50,20 +51,20 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric throw new Exception("Graphite sink requires 'port' property.") } - val host = propertyToOption(GRAPHITE_KEY_HOST).get - val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt + private val host = propertyToOption(GRAPHITE_KEY_HOST).get + private val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt - val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match { + private val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match { case Some(s) => s.toInt case None => GRAPHITE_DEFAULT_PERIOD } - val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)) + private val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match { + case Some(s) => TimeUnit.valueOf(s.toUpperCase()) case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT) } - val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX) + private val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX) MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) @@ -73,7 +74,7 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric case Some(p) => throw new Exception(s"Invalid Graphite protocol: $p") } - val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry) + private val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .prefixedWith(prefix) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala index 1992b42ac7f6..6ace6e921fc5 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala @@ -23,10 +23,12 @@ import com.codahale.metrics.{JmxReporter, MetricRegistry} import org.apache.spark.SecurityManager -private[spark] class JmxSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { +private[spark] class JmxSink( + property: Properties, + registry: MetricRegistry, + securityMgr: SecurityManager) extends Sink(property, registry) { - val reporter: JmxReporter = JmxReporter.forRegistry(registry).build() + private val reporter: JmxReporter = JmxReporter.forRegistry(registry).build() override def start() { reporter.start() diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala index 68b58b849064..1d4e78de64e4 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -30,22 +30,22 @@ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.ui.JettyUtils._ private[spark] class MetricsServlet( - val property: Properties, - val registry: MetricRegistry, + property: Properties, + registry: MetricRegistry, securityMgr: SecurityManager) - extends Sink { + extends Sink(property, registry) { val SERVLET_KEY_PATH = "path" val SERVLET_KEY_SAMPLE = "sample" val SERVLET_DEFAULT_SAMPLE = false - val servletPath = property.getProperty(SERVLET_KEY_PATH) + private val servletPath = property.getProperty(SERVLET_KEY_PATH) - val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)).map(_.toBoolean) + private val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)).map(_.toBoolean) .getOrElse(SERVLET_DEFAULT_SAMPLE) - val mapper = new ObjectMapper().registerModule( + private val mapper = new ObjectMapper().registerModule( new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample)) def getHandlers(conf: SparkConf): Array[ServletContextHandler] = { diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala index 9fad4e7deacb..2f2e479ef20c 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala @@ -17,8 +17,37 @@ package org.apache.spark.metrics.sink -private[spark] trait Sink { +import java.util.Properties + +import com.codahale.metrics.MetricRegistry + +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * The abstract class of metrics Sink, by achiving the methods and registered through metrics + * .properties user could register customer metrics Sink into MetricsSystem. + * + * @param properties Properties related this specific Sink, properties are read from + * configuration file, user could define their own configurations and get + * from this parameter. + * @param metricRegistry The [[MetricRegistry]] for you to dump the collected metrics. + */ +@DeveloperApi +abstract class Sink(properties: Properties, metricRegistry: MetricRegistry) { + + /** + * Start this metrics Sink, this will be called by MetricsSystem + */ def start(): Unit + + /** + * Stop this metrics Sink, this will be called by MetricsSystem + */ def stop(): Unit + + /** + * Report the current registered metrics. + */ def report(): Unit } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala index 7fa4ba762298..b34c7d1f06a8 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala @@ -26,29 +26,29 @@ import org.apache.spark.SecurityManager import org.apache.spark.metrics.MetricsSystem private[spark] class Slf4jSink( - val property: Properties, - val registry: MetricRegistry, + property: Properties, + registry: MetricRegistry, securityMgr: SecurityManager) - extends Sink { + extends Sink(property, registry) { val SLF4J_DEFAULT_PERIOD = 10 val SLF4J_DEFAULT_UNIT = "SECONDS" val SLF4J_KEY_PERIOD = "period" val SLF4J_KEY_UNIT = "unit" - val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match { + private val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match { case Some(s) => s.toInt case None => SLF4J_DEFAULT_PERIOD } - val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)) + private val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match { + case Some(s) => TimeUnit.valueOf(s.toUpperCase()) case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT) } MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry) + private val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build() diff --git a/core/src/main/scala/org/apache/spark/metrics/source/Source.scala b/core/src/main/scala/org/apache/spark/metrics/source/Source.scala index 1dda2cd83b2a..f120b4ab53ce 100644 --- a/core/src/main/scala/org/apache/spark/metrics/source/Source.scala +++ b/core/src/main/scala/org/apache/spark/metrics/source/Source.scala @@ -19,7 +19,30 @@ package org.apache.spark.metrics.source import com.codahale.metrics.MetricRegistry -private[spark] trait Source { +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * The interface of metrics Source, this could be mixed into user code to get different metrics + * offered by codahale metrics libray. To enable this metrics Souce, user should configure + * the full classpath into metrics.properties and make class be accessed by all instances. + * + * Metrics Source will be registered into MetricsSystem to downstream the collected metrics to + * metrics Sink. + */ +@DeveloperApi +trait Source { + + /** + * The name of this metrics Source, name should be unique and will be prepended with app id and + * exeuctor id to distinguish. + * @return name of this Source + */ def sourceName: String + + /** + * A [[MetricRegistry]] in which all the collected metrics are registered. + * @return a [[MetricRegistry]] object which will be registered in MetricsSystem for collection + */ def metricRegistry: MetricRegistry } diff --git a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index 0cd795f63887..be47e875a8d6 100644 --- a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -28,8 +28,10 @@ import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode import org.apache.spark.SecurityManager import org.apache.spark.metrics.MetricsSystem -class GangliaSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { +class GangliaSink( + property: Properties, + registry: MetricRegistry, + securityMgr: SecurityManager) extends Sink(property, registry) { val GANGLIA_KEY_PERIOD = "period" val GANGLIA_DEFAULT_PERIOD = 10 @@ -59,22 +61,21 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry, throw new Exception("Ganglia sink requires 'port' property.") } - val host = propertyToOption(GANGLIA_KEY_HOST).get - val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt - val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) - val dmax = propertyToOption(GANGLIA_KEY_DMAX).map(_.toInt).getOrElse(GANGLIA_DEFAULT_DMAX) - val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE) + private val host = propertyToOption(GANGLIA_KEY_HOST).get + private val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt + private val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) + private val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE) .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) - val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) + private val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) .getOrElse(GANGLIA_DEFAULT_PERIOD) - val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT) + private val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT) .map(u => TimeUnit.valueOf(u.toUpperCase)) .getOrElse(GANGLIA_DEFAULT_UNIT) MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - val ganglia = new GMetric(host, port, mode, ttl) - val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) + private val ganglia = new GMetric(host, port, mode, ttl) + private val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .withDMax(dmax) From 6f0ccbbbecf310c082e82d36aefbb4ef9bea236b Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 28 Mar 2016 15:31:41 +0800 Subject: [PATCH 02/11] Fix bug and add unit test --- .../apache/spark/metrics/MetricsSystem.scala | 1 + .../metrics/CustomerMetricsBuilder.scala | 63 +++++++++++++++++++ .../spark/metrics/MetricsSystemSuite.scala | 19 ++++++ 3 files changed, 83 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/metrics/CustomerMetricsBuilder.scala diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 45b82699340c..441fc0ca76b4 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -209,6 +209,7 @@ private[spark] class MetricsSystem private ( case _: NoSuchMethodException => try { sinks += cls.getConstructor(classOf[Properties], classOf[MetricRegistry]) + .newInstance(kv._2, registry) .asInstanceOf[Sink] } catch { case NonFatal(e) => logError("Sink class " + classPath + " cannot be instantiated", e) diff --git a/core/src/test/scala/org/apache/spark/metrics/CustomerMetricsBuilder.scala b/core/src/test/scala/org/apache/spark/metrics/CustomerMetricsBuilder.scala new file mode 100644 index 000000000000..515c5a1a5705 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/metrics/CustomerMetricsBuilder.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This file is placed in different package to make sure all of these components work well +// when they are outside of org.apache.spark. +package other.metrics + +import java.util.Properties + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.sink.Sink +import org.apache.spark.metrics.source.Source + +class CustomMetricsSource extends Source { + val sourceName = "fake" + + private val registry = new MetricRegistry + + registry.register(MetricRegistry.name("test1"), new Gauge[Int] { + override def getValue: Int = 1 + }) + + registry.register(MetricRegistry.name("test2"), new Gauge[Int] { + override def getValue: Int = 2 + }) + + override def metricRegistry: MetricRegistry = registry +} + +class CustomMetricsSink(properties: Properties, metricRegistry: MetricRegistry) + extends Sink(properties, metricRegistry) { + + private val prop1 = properties.getProperty("prop1") + private val prop2 = properties.getProperty("prop2") + + assert(prop1 != null) + assert(prop2 != null) + assert(metricRegistry.getGauges.keySet().contains("fake.test1")) + assert(metricRegistry.getGauges.keySet().contains("fake.test2")) + + def start(): Unit = { } + + def stop(): Unit = { } + + def report(): Unit = { } +} + + diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index a7a24114f17e..d0563599382f 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer import com.codahale.metrics.MetricRegistry import org.scalatest.{BeforeAndAfter, PrivateMethodTester} +import other.metrics.{CustomMetricsSink, CustomMetricsSource} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.master.MasterSource @@ -269,4 +270,22 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM assert(metricName === source.sourceName) } + test("MetricsSystem plug in customized Source and Sink") { + conf.set("spark.metrics.conf.test.source.fake.class", + classOf[CustomMetricsSource].getCanonicalName) + .set("spark.metrics.conf.test.sink.fake.class", classOf[CustomMetricsSink].getCanonicalName) + .set("spark.metrics.conf.test.sink.fake.prop1", "val1") + .set("spark.metrics.conf.test.sink.fake.prop2", "val2") + + val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr) + metricsSystem.start() + + val sources = PrivateMethod[ArrayBuffer[Source]]('sources) + val sinks = PrivateMethod[ArrayBuffer[Sink]]('sinks) + assert(metricsSystem.invokePrivate(sources()).length === 1) + assert(metricsSystem.invokePrivate(sources()).filter(_.sourceName == "fake").length === 1) + assert(metricsSystem.invokePrivate(sinks()).length === 2) + assert(metricsSystem.invokePrivate(sinks()).map(_.getClass.getCanonicalName) + .contains(classOf[CustomMetricsSink].getCanonicalName)) + } } From b58a427f4d70b63ee334610fa6beb36230e2984b Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 28 Mar 2016 17:14:28 +0800 Subject: [PATCH 03/11] Add Mima exclude --- project/MimaExcludes.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 915c7e2e2fda..ab240a5bb055 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -83,6 +83,9 @@ object MimaExcludes { // [SPARK-21087] CrossValidator, TrainValidationSplit expose sub models after fitting: Scala ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.CrossValidatorModel$CrossValidatorModelWriter"), ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.TrainValidationSplitModel$TrainValidationSplitModelWriter") + + // [SPARK-14151] Expose metrics Source and Sink interface + ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.metrics.sink.Sink") ) // Exclude rules for 2.2.x From a5e695389348d78ccab980326c361dbabba0a148 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 26 Jun 2017 10:37:11 +0800 Subject: [PATCH 04/11] Code improvement Change-Id: Id952f7359ef662372445864f2fa1e89ceff5bec7 --- .../apache/spark/metrics/MetricsSystem.scala | 6 ++-- .../org/apache/spark/metrics/sink/Sink.scala | 30 +++++++++---------- .../apache/spark/metrics/source/Source.scala | 30 +++++++++---------- .../spark/metrics/MetricsSystemSuite.scala | 3 +- project/MimaExcludes.scala | 2 +- 5 files changed, 35 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 441fc0ca76b4..0dd6f4dd89e9 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -195,9 +195,8 @@ private[spark] class MetricsSystem private ( sinkConfigs.foreach { kv => val classPath = kv._2.getProperty("class") if (null != classPath) { - val cls = Utils.classForName(classPath) try { - val sink = cls.getConstructor( + val sink = Utils.classForName(classPath).getConstructor( classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) .newInstance(kv._2, registry, securityMgr) if (kv._1 == "servlet") { @@ -208,7 +207,8 @@ private[spark] class MetricsSystem private ( } catch { case _: NoSuchMethodException => try { - sinks += cls.getConstructor(classOf[Properties], classOf[MetricRegistry]) + sinks += Utils.classForName(classPath) + .getConstructor(classOf[Properties], classOf[MetricRegistry]) .newInstance(kv._2, registry) .asInstanceOf[Sink] } catch { diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala index 2f2e479ef20c..685d7e92e12d 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala @@ -24,30 +24,30 @@ import com.codahale.metrics.MetricRegistry import org.apache.spark.annotation.DeveloperApi /** - * :: DeveloperApi :: - * The abstract class of metrics Sink, by achiving the methods and registered through metrics - * .properties user could register customer metrics Sink into MetricsSystem. - * - * @param properties Properties related this specific Sink, properties are read from - * configuration file, user could define their own configurations and get - * from this parameter. - * @param metricRegistry The [[MetricRegistry]] for you to dump the collected metrics. - */ + * :: DeveloperApi :: + * The abstract class of metrics Sink, by achiving the methods and registered through metrics + * .properties user could register customer metrics Sink into MetricsSystem. + * + * @param properties Properties related this specific Sink, properties are read from + * configuration file, user could define their own configurations and get + * from this parameter. + * @param metricRegistry The [[MetricRegistry]] for you to dump the collected metrics. + */ @DeveloperApi abstract class Sink(properties: Properties, metricRegistry: MetricRegistry) { /** - * Start this metrics Sink, this will be called by MetricsSystem - */ + * Start this metrics Sink, this will be called by MetricsSystem + */ def start(): Unit /** - * Stop this metrics Sink, this will be called by MetricsSystem - */ + * Stop this metrics Sink, this will be called by MetricsSystem + */ def stop(): Unit /** - * Report the current registered metrics. - */ + * Report the current registered metrics. + */ def report(): Unit } diff --git a/core/src/main/scala/org/apache/spark/metrics/source/Source.scala b/core/src/main/scala/org/apache/spark/metrics/source/Source.scala index f120b4ab53ce..abcd42485f56 100644 --- a/core/src/main/scala/org/apache/spark/metrics/source/Source.scala +++ b/core/src/main/scala/org/apache/spark/metrics/source/Source.scala @@ -22,27 +22,27 @@ import com.codahale.metrics.MetricRegistry import org.apache.spark.annotation.DeveloperApi /** - * :: DeveloperApi :: - * The interface of metrics Source, this could be mixed into user code to get different metrics - * offered by codahale metrics libray. To enable this metrics Souce, user should configure - * the full classpath into metrics.properties and make class be accessed by all instances. - * - * Metrics Source will be registered into MetricsSystem to downstream the collected metrics to - * metrics Sink. - */ + * :: DeveloperApi :: + * The interface of metrics Source, this could be mixed into user code to get different metrics + * offered by codahale metrics libray. To enable this metrics Souce, user should configure + * the full classpath into metrics.properties and make class be accessed by all instances. + * + * Metrics Source will be registered into MetricsSystem to downstream the collected metrics to + * metrics Sink. + */ @DeveloperApi trait Source { /** - * The name of this metrics Source, name should be unique and will be prepended with app id and - * exeuctor id to distinguish. - * @return name of this Source - */ + * The name of this metrics Source, name should be unique and will be prepended with app id and + * exeuctor id to distinguish. + * @return name of this Source + */ def sourceName: String /** - * A [[MetricRegistry]] in which all the collected metrics are registered. - * @return a [[MetricRegistry]] object which will be registered in MetricsSystem for collection - */ + * A [[MetricRegistry]] in which all the collected metrics are registered. + * @return a [[MetricRegistry]] object which will be registered in MetricsSystem for collection + */ def metricRegistry: MetricRegistry } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index d0563599382f..b73a19d916c2 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -282,8 +282,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM val sources = PrivateMethod[ArrayBuffer[Source]]('sources) val sinks = PrivateMethod[ArrayBuffer[Sink]]('sinks) - assert(metricsSystem.invokePrivate(sources()).length === 1) - assert(metricsSystem.invokePrivate(sources()).filter(_.sourceName == "fake").length === 1) + assert(metricsSystem.invokePrivate(sources()).exists(s => s.sourceName == "fake")) assert(metricsSystem.invokePrivate(sinks()).length === 2) assert(metricsSystem.invokePrivate(sinks()).map(_.getClass.getCanonicalName) .contains(classOf[CustomMetricsSink].getCanonicalName)) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index ab240a5bb055..7bc5d3fae858 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -85,7 +85,7 @@ object MimaExcludes { ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.TrainValidationSplitModel$TrainValidationSplitModelWriter") // [SPARK-14151] Expose metrics Source and Sink interface - ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.metrics.sink.Sink") + ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.metrics.sink.Sink") ) // Exclude rules for 2.2.x From 79276e3ae147cbf73e65ab0f61bccc42dacae3bf Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 26 Jun 2017 13:15:54 +0800 Subject: [PATCH 05/11] Fix document issue Change-Id: I2f20205a1918b7323c0cbb781ce6f8c2db9dd777 --- .../src/main/scala/org/apache/spark/metrics/sink/Sink.scala | 2 +- .../main/scala/org/apache/spark/metrics/source/Source.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala index 685d7e92e12d..6eb24a27741b 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala @@ -31,7 +31,7 @@ import org.apache.spark.annotation.DeveloperApi * @param properties Properties related this specific Sink, properties are read from * configuration file, user could define their own configurations and get * from this parameter. - * @param metricRegistry The [[MetricRegistry]] for you to dump the collected metrics. + * @param metricRegistry The MetricRegistry for you to dump the collected metrics. */ @DeveloperApi abstract class Sink(properties: Properties, metricRegistry: MetricRegistry) { diff --git a/core/src/main/scala/org/apache/spark/metrics/source/Source.scala b/core/src/main/scala/org/apache/spark/metrics/source/Source.scala index abcd42485f56..4738d6fbaa05 100644 --- a/core/src/main/scala/org/apache/spark/metrics/source/Source.scala +++ b/core/src/main/scala/org/apache/spark/metrics/source/Source.scala @@ -35,14 +35,14 @@ trait Source { /** * The name of this metrics Source, name should be unique and will be prepended with app id and - * exeuctor id to distinguish. + * executor id to distinguish. * @return name of this Source */ def sourceName: String /** - * A [[MetricRegistry]] in which all the collected metrics are registered. - * @return a [[MetricRegistry]] object which will be registered in MetricsSystem for collection + * A MetricRegistry in which all the collected metrics are registered. + * @return a MetricRegistry object which will be registered in MetricsSystem for collection */ def metricRegistry: MetricRegistry } From 6a027ea612d8a5d31828649525a89fc01566dacb Mon Sep 17 00:00:00 2001 From: jerryshao Date: Sat, 8 Jul 2017 16:24:07 -0700 Subject: [PATCH 06/11] Address the merging issues Change-Id: I798ebca934554357dbd2dd3c2dca8d0cb54baa3e --- .../scala/org/apache/spark/metrics/sink/ConsoleSink.scala | 2 +- .../main/scala/org/apache/spark/metrics/sink/CsvSink.scala | 2 +- .../scala/org/apache/spark/metrics/sink/GraphiteSink.scala | 2 +- .../main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala | 2 +- .../scala/org/apache/spark/metrics/sink/GangliaSink.scala | 5 +++-- 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala index 97f324c7a580..4a3dd10af074 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala @@ -41,7 +41,7 @@ private[spark] class ConsoleSink( } private val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase()) + case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)) case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT) } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index f6b5ff7e46f9..69c673677cd2 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -44,7 +44,7 @@ private[spark] class CsvSink( } private val pollUnit: TimeUnit = Option(property.getProperty(CSV_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase()) + case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)) case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT) } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index c67ef21b60c6..d03afdcd36cd 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -60,7 +60,7 @@ private[spark] class GraphiteSink( } private val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase()) + case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)) case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT) } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala index b34c7d1f06a8..a3574efa4e25 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala @@ -42,7 +42,7 @@ private[spark] class Slf4jSink( } private val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase()) + case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)) case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT) } diff --git a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index be47e875a8d6..6af87969ba8c 100644 --- a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -65,11 +65,12 @@ class GangliaSink( private val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt private val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) private val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE) - .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) + .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase(Locale.ROOT))) + .getOrElse(GANGLIA_DEFAULT_MODE) private val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) .getOrElse(GANGLIA_DEFAULT_PERIOD) private val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT) - .map(u => TimeUnit.valueOf(u.toUpperCase)) + .map(u => TimeUnit.valueOf(u.toUpperCase(Locale.ROOT)) .getOrElse(GANGLIA_DEFAULT_UNIT) MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) From 46e21faf2cd555b6c51c9a62f2259c5997354bf6 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 13 Nov 2017 14:34:26 +0800 Subject: [PATCH 07/11] Fix rebase conflits --- .../main/scala/org/apache/spark/metrics/sink/StatsdSink.scala | 2 +- project/MimaExcludes.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala index 859a2f6bcd45..39835e914c51 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala @@ -44,7 +44,7 @@ private[spark] class StatsdSink( val property: Properties, val registry: MetricRegistry, securityMgr: SecurityManager) - extends Sink with Logging { + extends Sink(property, registry) with Logging { import StatsdSink._ val host = property.getProperty(STATSD_KEY_HOST, STATSD_DEFAULT_HOST) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 7bc5d3fae858..158deeae89a9 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -82,7 +82,7 @@ object MimaExcludes { // [SPARK-21087] CrossValidator, TrainValidationSplit expose sub models after fitting: Scala ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.CrossValidatorModel$CrossValidatorModelWriter"), - ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.TrainValidationSplitModel$TrainValidationSplitModelWriter") + ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.TrainValidationSplitModel$TrainValidationSplitModelWriter"), // [SPARK-14151] Expose metrics Source and Sink interface ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.metrics.sink.Sink") From 38bc2a43a69ffb0b40dc5ebb0a60568225eeb798 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 27 Nov 2017 09:53:29 +0800 Subject: [PATCH 08/11] Address the comment --- .../scala/org/apache/spark/metrics/sink/GangliaSink.scala | 4 ++-- project/MimaExcludes.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index 6af87969ba8c..3028d240bdab 100644 --- a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -64,12 +64,12 @@ class GangliaSink( private val host = propertyToOption(GANGLIA_KEY_HOST).get private val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt private val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) - private val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE) + private val mode = propertyToOption(GANGLIA_KEY_MODE) .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase(Locale.ROOT))) .getOrElse(GANGLIA_DEFAULT_MODE) private val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) .getOrElse(GANGLIA_DEFAULT_PERIOD) - private val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT) + private val pollUnit = propertyToOption(GANGLIA_KEY_UNIT) .map(u => TimeUnit.valueOf(u.toUpperCase(Locale.ROOT)) .getOrElse(GANGLIA_DEFAULT_UNIT) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 158deeae89a9..6155c0106549 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -85,7 +85,7 @@ object MimaExcludes { ProblemFilters.exclude[FinalClassProblem]("org.apache.spark.ml.tuning.TrainValidationSplitModel$TrainValidationSplitModelWriter"), // [SPARK-14151] Expose metrics Source and Sink interface - ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.metrics.sink.Sink") + ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.metrics.sink.Sink") ) // Exclude rules for 2.2.x From f360dac77499d7d0eebc09528255922c9315fb31 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 27 Nov 2017 10:09:15 +0800 Subject: [PATCH 09/11] Fix rebase issue --- .../apache/spark/metrics/sink/GangliaSink.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index 3028d240bdab..d5cd834d6b67 100644 --- a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -17,7 +17,7 @@ package org.apache.spark.metrics.sink -import java.util.Properties +import java.util.{Locale, Properties} import java.util.concurrent.TimeUnit import com.codahale.metrics.MetricRegistry @@ -64,23 +64,24 @@ class GangliaSink( private val host = propertyToOption(GANGLIA_KEY_HOST).get private val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt private val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) + private val dmax = propertyToOption(GANGLIA_KEY_DMAX).map(_.toInt).getOrElse(GANGLIA_DEFAULT_DMAX) private val mode = propertyToOption(GANGLIA_KEY_MODE) .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase(Locale.ROOT))) .getOrElse(GANGLIA_DEFAULT_MODE) private val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) .getOrElse(GANGLIA_DEFAULT_PERIOD) private val pollUnit = propertyToOption(GANGLIA_KEY_UNIT) - .map(u => TimeUnit.valueOf(u.toUpperCase(Locale.ROOT)) + .map(u => TimeUnit.valueOf(u.toUpperCase(Locale.ROOT))) .getOrElse(GANGLIA_DEFAULT_UNIT) MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) private val ganglia = new GMetric(host, port, mode, ttl) - private val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .convertRatesTo(TimeUnit.SECONDS) - .withDMax(dmax) - .build(ganglia) + private val reporter = GangliaReporter.forRegistry(registry) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .withDMax(dmax) + .build(ganglia) override def start() { reporter.start(pollPeriod, pollUnit) From e94def75ebb657313698876c2b6e11123a5cf895 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 27 Nov 2017 16:09:20 +0800 Subject: [PATCH 10/11] Refactor the code and add more comments --- .../apache/spark/metrics/MetricsSystem.scala | 21 +++++++++++--- .../spark/metrics/sink/ConsoleSink.scala | 27 ++++++------------ .../apache/spark/metrics/sink/CsvSink.scala | 26 ++++++----------- .../spark/metrics/sink/GraphiteSink.scala | 20 +++++-------- .../apache/spark/metrics/sink/JmxSink.scala | 8 ++++++ .../spark/metrics/sink/MetricsServlet.scala | 8 ++++++ .../org/apache/spark/metrics/sink/Sink.scala | 15 ++++++++-- .../apache/spark/metrics/sink/Slf4jSink.scala | 27 ++++++------------ .../spark/metrics/sink/StatsdSink.scala | 28 ++++++++----------- .../metrics/CustomerMetricsBuilder.scala | 8 +++++- .../spark/metrics/MetricsSystemSuite.scala | 17 +++++++++++ .../spark/metrics/sink/StatsdSinkSuite.scala | 4 +-- .../spark/metrics/sink/GangliaSink.scala | 13 --------- 13 files changed, 114 insertions(+), 108 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 0dd6f4dd89e9..e357182b12a6 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -101,7 +101,20 @@ private[spark] class MetricsSystem private ( StaticSources.allSources.foreach(registerSource) registerSources() registerSinks() - sinks.foreach(_.start) + + // Unregistered sinks that failed to start. + sinks.zipWithIndex + .filterNot { case (s, _) => + try { + s.start() + true + } catch { + case NonFatal(e) => + logWarning(s"Failed to start Sink ${s.getClass.getSimpleName}", e) + false + } + } + .foreach { case (_, idx) => sinks.remove(idx) } } def stop() { @@ -196,9 +209,9 @@ private[spark] class MetricsSystem private ( val classPath = kv._2.getProperty("class") if (null != classPath) { try { - val sink = Utils.classForName(classPath).getConstructor( - classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) - .newInstance(kv._2, registry, securityMgr) + val sink = Utils.classForName(classPath) + .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) + .newInstance(kv._2, registry, securityMgr) if (kv._1 == "servlet") { metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) } else { diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala index 4a3dd10af074..dc180f2b22f6 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala @@ -17,35 +17,24 @@ package org.apache.spark.metrics.sink -import java.util.{Locale, Properties} +import java.util.Properties import java.util.concurrent.TimeUnit import com.codahale.metrics.{ConsoleReporter, MetricRegistry} import org.apache.spark.SecurityManager -import org.apache.spark.metrics.MetricsSystem +/** + * A metrics [[Sink]] which will output registered metrics to console. + * + * @param property [[ConsoleSink]] specific properties + * @param registry A [[MetricRegistry]] can this sink to register + * @param securityMgr A [[SecurityManager]] to check security related stuffs. + */ private[spark] class ConsoleSink( property: Properties, registry: MetricRegistry, securityMgr: SecurityManager) extends Sink(property, registry) { - val CONSOLE_DEFAULT_PERIOD = 10 - val CONSOLE_DEFAULT_UNIT = "SECONDS" - - val CONSOLE_KEY_PERIOD = "period" - val CONSOLE_KEY_UNIT = "unit" - - private val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match { - case Some(s) => s.toInt - case None => CONSOLE_DEFAULT_PERIOD - } - - private val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)) - case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT) - } - - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) private val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index 69c673677cd2..3a1524a48a3e 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -24,32 +24,22 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics.{CsvReporter, MetricRegistry} import org.apache.spark.SecurityManager -import org.apache.spark.metrics.MetricsSystem +/** + * A metrics [[Sink]] which will write registered metrics to the specified directory with CSV + * format. + * + * @param property [[CsvSink]] specific properties + * @param registry A [[MetricRegistry]] can this sink to register + * @param securityMgr A [[SecurityManager]] to check security related stuffs. + */ private[spark] class CsvSink( property: Properties, registry: MetricRegistry, securityMgr: SecurityManager) extends Sink(property, registry) { - val CSV_KEY_PERIOD = "period" - val CSV_KEY_UNIT = "unit" val CSV_KEY_DIR = "directory" - - val CSV_DEFAULT_PERIOD = 10 - val CSV_DEFAULT_UNIT = "SECONDS" val CSV_DEFAULT_DIR = "/tmp/" - private val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match { - case Some(s) => s.toInt - case None => CSV_DEFAULT_PERIOD - } - - private val pollUnit: TimeUnit = Option(property.getProperty(CSV_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)) - case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT) - } - - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - private val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match { case Some(s) => s case None => CSV_DEFAULT_DIR diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index d03afdcd36cd..d05bb11914f9 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -24,8 +24,14 @@ import com.codahale.metrics.MetricRegistry import com.codahale.metrics.graphite.{Graphite, GraphiteReporter, GraphiteUDP} import org.apache.spark.SecurityManager -import org.apache.spark.metrics.MetricsSystem +/** + * A metrics [[Sink]] which will output registered metrics to Graphite. + * + * @param property [[GraphiteSink]] specific properties + * @param registry A [[MetricRegistry]] can this sink to register + * @param securityMgr A [[SecurityManager]] to check security related stuffs. + */ private[spark] class GraphiteSink( property: Properties, registry: MetricRegistry, @@ -54,20 +60,8 @@ private[spark] class GraphiteSink( private val host = propertyToOption(GRAPHITE_KEY_HOST).get private val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt - private val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match { - case Some(s) => s.toInt - case None => GRAPHITE_DEFAULT_PERIOD - } - - private val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)) - case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT) - } - private val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX) - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - val graphite = propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase(Locale.ROOT)) match { case Some("udp") => new GraphiteUDP(host, port) case Some("tcp") | None => new Graphite(host, port) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala index 6ace6e921fc5..fe194621e941 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala @@ -23,6 +23,14 @@ import com.codahale.metrics.{JmxReporter, MetricRegistry} import org.apache.spark.SecurityManager +/** + * A metrics [[Sink]] which will output registered metrics with JMX format, user can use + * jconsole and others to attach to the Spark process and get the metrics report. + * + * @param property [[JmxSink]] specific properties + * @param registry A [[MetricRegistry]] can this sink to register + * @param securityMgr A [[SecurityManager]] to check security related stuffs. + */ private[spark] class JmxSink( property: Properties, registry: MetricRegistry, diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala index 1d4e78de64e4..d4296671db05 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -29,6 +29,14 @@ import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.ui.JettyUtils._ +/** + * A metrics [[Sink]] which will dump registered metrics with JSON format on the web UI. This is + * a default enabled sink for MetricsSystem. + * + * @param property [[MetricsServlet]] specific properties + * @param registry A [[MetricRegistry]] can this sink to register + * @param securityMgr A [[SecurityManager]] to check security related stuffs. + */ private[spark] class MetricsServlet( property: Properties, registry: MetricRegistry, diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala index 6eb24a27741b..8446e59ead13 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala @@ -17,11 +17,13 @@ package org.apache.spark.metrics.sink -import java.util.Properties +import java.util.{Locale, Properties} +import java.util.concurrent.TimeUnit import com.codahale.metrics.MetricRegistry import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.MetricsSystem /** * :: DeveloperApi :: @@ -36,8 +38,17 @@ import org.apache.spark.annotation.DeveloperApi @DeveloperApi abstract class Sink(properties: Properties, metricRegistry: MetricRegistry) { + protected val pollPeriod = properties.getProperty("period", "10").toInt + + protected val pollUnit = Option(properties.getProperty("unit")) + .map(s => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))) + .getOrElse(TimeUnit.SECONDS) + + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + /** - * Start this metrics Sink, this will be called by MetricsSystem + * Start this metrics Sink, this will be called by MetricsSystem. If this [[Sink]] is failed to + * start, Metrics system will unregister and remove it. */ def start(): Unit diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala index a3574efa4e25..0b0639d54dbe 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala @@ -17,36 +17,25 @@ package org.apache.spark.metrics.sink -import java.util.{Locale, Properties} +import java.util.Properties import java.util.concurrent.TimeUnit import com.codahale.metrics.{MetricRegistry, Slf4jReporter} import org.apache.spark.SecurityManager -import org.apache.spark.metrics.MetricsSystem +/** + * A metrics [[Sink]] which will output registered metrics with slf4j format. + * + * @param property [[Slf4jSink]] specific properties + * @param registry A [[MetricRegistry]] can this sink to register + * @param securityMgr A [[SecurityManager]] to check security related stuffs. + */ private[spark] class Slf4jSink( property: Properties, registry: MetricRegistry, securityMgr: SecurityManager) extends Sink(property, registry) { - val SLF4J_DEFAULT_PERIOD = 10 - val SLF4J_DEFAULT_UNIT = "SECONDS" - - val SLF4J_KEY_PERIOD = "period" - val SLF4J_KEY_UNIT = "unit" - - private val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match { - case Some(s) => s.toInt - case None => SLF4J_DEFAULT_PERIOD - } - - private val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match { - case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT)) - case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT) - } - - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) private val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala index 39835e914c51..e6426bf7cc66 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala @@ -18,28 +18,29 @@ package org.apache.spark.metrics.sink import java.util.Properties -import java.util.concurrent.TimeUnit import com.codahale.metrics.MetricRegistry import org.apache.spark.SecurityManager import org.apache.spark.internal.Logging -import org.apache.spark.metrics.MetricsSystem private[spark] object StatsdSink { val STATSD_KEY_HOST = "host" val STATSD_KEY_PORT = "port" - val STATSD_KEY_PERIOD = "period" - val STATSD_KEY_UNIT = "unit" val STATSD_KEY_PREFIX = "prefix" val STATSD_DEFAULT_HOST = "127.0.0.1" val STATSD_DEFAULT_PORT = "8125" - val STATSD_DEFAULT_PERIOD = "10" - val STATSD_DEFAULT_UNIT = "SECONDS" val STATSD_DEFAULT_PREFIX = "" } +/** + * A metrics [[Sink]] which will output registered metrics to StatsD. + * + * @param property [[StatsdSink]] specific properties + * @param registry A [[MetricRegistry]] can this sink to register + * @param securityMgr A [[SecurityManager]] to check security related stuffs. + */ private[spark] class StatsdSink( val property: Properties, val registry: MetricRegistry, @@ -47,18 +48,11 @@ private[spark] class StatsdSink( extends Sink(property, registry) with Logging { import StatsdSink._ - val host = property.getProperty(STATSD_KEY_HOST, STATSD_DEFAULT_HOST) - val port = property.getProperty(STATSD_KEY_PORT, STATSD_DEFAULT_PORT).toInt - - val pollPeriod = property.getProperty(STATSD_KEY_PERIOD, STATSD_DEFAULT_PERIOD).toInt - val pollUnit = - TimeUnit.valueOf(property.getProperty(STATSD_KEY_UNIT, STATSD_DEFAULT_UNIT).toUpperCase) - - val prefix = property.getProperty(STATSD_KEY_PREFIX, STATSD_DEFAULT_PREFIX) - - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + private val host = property.getProperty(STATSD_KEY_HOST, STATSD_DEFAULT_HOST) + private val port = property.getProperty(STATSD_KEY_PORT, STATSD_DEFAULT_PORT).toInt + private val prefix = property.getProperty(STATSD_KEY_PREFIX, STATSD_DEFAULT_PREFIX) - val reporter = new StatsdReporter(registry, host, port, prefix) + private val reporter = new StatsdReporter(registry, host, port, prefix) override def start(): Unit = { reporter.start(pollPeriod, pollUnit) diff --git a/core/src/test/scala/org/apache/spark/metrics/CustomerMetricsBuilder.scala b/core/src/test/scala/org/apache/spark/metrics/CustomerMetricsBuilder.scala index 515c5a1a5705..6a0a6307860f 100644 --- a/core/src/test/scala/org/apache/spark/metrics/CustomerMetricsBuilder.scala +++ b/core/src/test/scala/org/apache/spark/metrics/CustomerMetricsBuilder.scala @@ -47,13 +47,19 @@ class CustomMetricsSink(properties: Properties, metricRegistry: MetricRegistry) private val prop1 = properties.getProperty("prop1") private val prop2 = properties.getProperty("prop2") + private val shouldFailed = properties.getProperty("fail", "false").toBoolean + assert(prop1 != null) assert(prop2 != null) assert(metricRegistry.getGauges.keySet().contains("fake.test1")) assert(metricRegistry.getGauges.keySet().contains("fake.test2")) - def start(): Unit = { } + def start(): Unit = { + if (shouldFailed) { + throw new Exception("Failed to start") + } + } def stop(): Unit = { } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index b73a19d916c2..8639f1c4c4a6 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -287,4 +287,21 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM assert(metricsSystem.invokePrivate(sinks()).map(_.getClass.getCanonicalName) .contains(classOf[CustomMetricsSink].getCanonicalName)) } + + test("should unregister sink if failed to start") { + conf.set("spark.metrics.conf.test.source.fake.class", + classOf[CustomMetricsSource].getCanonicalName) + .set("spark.metrics.conf.test.sink.fake.class", classOf[CustomMetricsSink].getCanonicalName) + .set("spark.metrics.conf.test.sink.fake.prop1", "val1") + .set("spark.metrics.conf.test.sink.fake.prop2", "val2") + .set("spark.metrics.conf.test.sink.fake.fail", "true") + + val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr) + metricsSystem.start() + + val sinks = PrivateMethod[ArrayBuffer[Sink]]('sinks) + assert(metricsSystem.invokePrivate(sinks()).length === 1) + assert(!metricsSystem.invokePrivate(sinks()).map(_.getClass.getCanonicalName) + .contains(classOf[CustomMetricsSink].getCanonicalName)) + } } diff --git a/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala b/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala index 0e21a36071c4..06cca661810d 100644 --- a/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala @@ -31,8 +31,8 @@ class StatsdSinkSuite extends SparkFunSuite { private val securityMgr = new SecurityManager(new SparkConf(false)) private val defaultProps = Map( STATSD_KEY_PREFIX -> "spark", - STATSD_KEY_PERIOD -> "1", - STATSD_KEY_UNIT -> "seconds", + "period" -> "1", + "unit" -> "seconds", STATSD_KEY_HOST -> "127.0.0.1" ) private val socketTimeout = 30000 // milliseconds diff --git a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index d5cd834d6b67..5b05bdb93dfd 100644 --- a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -32,12 +32,6 @@ class GangliaSink( property: Properties, registry: MetricRegistry, securityMgr: SecurityManager) extends Sink(property, registry) { - val GANGLIA_KEY_PERIOD = "period" - val GANGLIA_DEFAULT_PERIOD = 10 - - val GANGLIA_KEY_UNIT = "unit" - val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS - val GANGLIA_KEY_MODE = "mode" val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST @@ -68,13 +62,6 @@ class GangliaSink( private val mode = propertyToOption(GANGLIA_KEY_MODE) .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase(Locale.ROOT))) .getOrElse(GANGLIA_DEFAULT_MODE) - private val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) - .getOrElse(GANGLIA_DEFAULT_PERIOD) - private val pollUnit = propertyToOption(GANGLIA_KEY_UNIT) - .map(u => TimeUnit.valueOf(u.toUpperCase(Locale.ROOT))) - .getOrElse(GANGLIA_DEFAULT_UNIT) - - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) private val ganglia = new GMetric(host, port, mode, ttl) private val reporter = GangliaReporter.forRegistry(registry) From 6728d52f40839936e32112d4447b95d684f2fe38 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 28 Nov 2017 16:23:53 +0800 Subject: [PATCH 11/11] Minor style changes --- .../scala/org/apache/spark/metrics/sink/ConsoleSink.scala | 6 +++--- .../org/apache/spark/metrics/sink/GraphiteSink.scala | 8 ++++---- .../main/scala/org/apache/spark/metrics/sink/Sink.scala | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala index dc180f2b22f6..9e7442968d95 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala @@ -37,9 +37,9 @@ private[spark] class ConsoleSink( securityMgr: SecurityManager) extends Sink(property, registry) { private val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .convertRatesTo(TimeUnit.SECONDS) - .build() + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .build() override def start() { reporter.start(pollPeriod, pollUnit) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index d05bb11914f9..0d98ed79773c 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -69,10 +69,10 @@ private[spark] class GraphiteSink( } private val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .convertRatesTo(TimeUnit.SECONDS) - .prefixedWith(prefix) - .build(graphite) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .prefixedWith(prefix) + .build(graphite) override def start() { reporter.start(pollPeriod, pollUnit) diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala index 8446e59ead13..1657d639fba6 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala @@ -47,7 +47,7 @@ abstract class Sink(properties: Properties, metricRegistry: MetricRegistry) { MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) /** - * Start this metrics Sink, this will be called by MetricsSystem. If this [[Sink]] is failed to + * Start this metrics Sink, this will be called by MetricsSystem. If this [[Sink]] fails to * start, Metrics system will unregister and remove it. */ def start(): Unit