Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deregistering of metrics #929

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ lazy val commonSettings = Seq(
description := "Spark data source for the Cognite Data Platform.",
licenses := List("Apache 2" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt")),
homepage := Some(url("https://github.com/cognitedata/cdp-spark-datasource")),
scalacOptions ++= Seq("-Xlint:unused", "-language:higherKinds", "-deprecation", "-feature"),
scalacOptions ++= Seq("-Xlint:unused", "-language:higherKinds", "-deprecation", "-feature") ++ (CrossVersion.partialVersion(scalaVersion.value) match {
// We use JavaConverters to remain backwards compatible with Scala 2.12,
// and to avoid a dependency on scala-collection-compat
case Some((2, 13)) => Seq("-Wconf:src=src/main/scala/cognite/spark/v1/MetricsSource.scala&cat=deprecation:i")
case _ => Seq.empty
}),
resolvers ++= Resolver.sonatypeOssRepos("releases"),
developers := List(
Developer(
Expand Down
72 changes: 53 additions & 19 deletions src/main/scala/cognite/spark/v1/MetricsSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,28 @@ package org.apache.spark.datasource

import cats.Eval
import com.codahale.metrics._
import org.apache.spark._

import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.SparkEnv
import org.log4s.getLogger
import scala.collection.JavaConverters._

class MetricsSource {
// Add metricNamespace to differentiate with spark system metrics.
// Add metricPrefix to differentiate with spark system metrics.
// Keeps track of all the Metric instances that are being published
val metricsMap = new ConcurrentHashMap[String, Eval[Counter]]
val metricsMap = new ConcurrentHashMap[String, Eval[SourceCounter]]

def getOrCreateCounter(metricPrefix: String, metricName: String): Counter = {
val key = s"$metricPrefix.$metricName"

def getOrCreateCounter(metricNamespace: String, metricName: String): Counter = {
val wrapped = Eval.later {
getLogger.info(s"Creating and registering counter for $key")
val counter = new Counter
registerMetricSource(metricNamespace, metricName, counter)
counter
val source = registerMetricSource(metricPrefix, metricName, counter)
SourceCounter(source, counter)
}
val key = s"$metricNamespace.$metricName"

metricsMap.putIfAbsent(key, wrapped)
metricsMap.get(key).value
metricsMap.get(key).value.counter
}

/**
Expand All @@ -32,20 +36,50 @@ class MetricsSource {
* @param metricName name of the Metric
* @param metric com.codahale.metrics.Metric instance to be published
*/
def registerMetricSource(metricNamespace: String, metricName: String, metric: Metric): Unit = {
def registerMetricSource(metricPrefix: String, metricName: String, metric: Metric): Source = {
val env = SparkEnv.get
env.metricsSystem.registerSource(
new Source {
override val sourceName = s"${metricNamespace}"
override def metricRegistry: MetricRegistry = {
val metrics = new MetricRegistry
metrics.register(metricName, metric)
metrics
}
val source = new Source {
override val sourceName = s"${metricPrefix}"
override def metricRegistry: MetricRegistry = {
val metrics = new MetricRegistry
metrics.register(metricName, metric)
metrics
}
)
}
env.metricsSystem.registerSource(source)
source
}

/**
* Remove the metrics from a job.
*
* This method will deregister the metric from Spark's org.apache.spark.metrics.MetricsSystem
* and stops tracking that it was published
*/
def removeJobMetrics(metricPrefix: String): Unit = {
val removed =
metricsMap
.keys()
.asScala
.filter(k => k.startsWith(metricPrefix))
.map(k => (k, metricsMap.remove(k)))

if (removed.nonEmpty) {
val logger = getLogger
val metricsSystem = SparkEnv.get.metricsSystem
removed
.map({ case (k, v) => (k, v.value.source) })
.foreach({
case (key, source) => {
logger.info(s"Deleting and removing counter for $key")
metricsSystem.removeSource(source)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Dashboard/Spark_dropwizard_metrics_info.md

Spark is instrumented with the Dropwizard/Codahale metrics library. Several components of Spark are instrumented, notable for this work several metrics generating from Spark driver and executors components can be instrumented.

An important architectural details is that the metrics are sent directly from the sources to the sink. This is important when running on a cluster, as each executor will communicate directly to the sink, for better scalability.

So it seems like when we'll call this from driver it will not cleanup metrics on executors, but didn't try to verify what actually happens when there's driver and separate executors. Also would be good to double-check if all registered metrics continue to be exported. Just found https://github.com/cognitedata/spark-jdbc/blob/master/src/main/scala/org/apache/spark/metrics/sink/JdbcSink.scala#L89 that seems to try pruning them a little bit (but only the counter type)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why it wouldn't remove from the executors based on this.
Is it because the instance that is removing the sources and the counter will be different in the executor? Meaning that none will be found in the map, and therefore none will be remove as a source?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that even though metrics registry is placed in sparkenv, it might still be isolated per-jvm

}
})
}
}
}

// Singleton to make sure each metric is only registered once.
object MetricsSource extends MetricsSource

case class SourceCounter(source: Source, counter: Counter)
47 changes: 47 additions & 0 deletions src/test/scala/cognite/spark/v1/MetricsSourceTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.apache.spark.datasource

import org.scalatest.{FlatSpec, Matchers}
import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession

class MetricsSourceTest extends FlatSpec with Matchers {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.config("spark.ui.enabled", "false") // comment this out to use Spark UI during tests, on https://localhost:4040 by default
// https://medium.com/@mrpowers/how-to-cut-the-run-time-of-a-spark-sbt-test-suite-by-40-52d71219773f
.config("spark.sql.shuffle.partitions", "1")
.config("spark.sql.storeAssignmentPolicy", "legacy")
.config("spark.app.id", this.getClass.getName + math.floor(math.random() * 1000).toLong.toString)
.getOrCreate()

"A MetricsSource" should "register a new metric only once" in {
val metric = MetricsSource.getOrCreateCounter("prefix", "name")
val sameMetric = MetricsSource.getOrCreateCounter("prefix", "name")

metric.getCount() should equal(0)
sameMetric.getCount() should equal(0)

metric.inc()

metric.getCount() should equal(1)
sameMetric.getCount() should equal(1)
}

it should "deregister a metric only once" in {
MetricsSource.getOrCreateCounter("removePrefix.jobId", "name")
MetricsSource.getOrCreateCounter("removePrefix.otherJobId", "name")

Option(MetricsSource.metricsMap.get("removePrefix.jobId.name")).isDefined shouldBe true
Option(MetricsSource.metricsMap.get("removePrefix.otherJobId.name")).isDefined shouldBe true
SparkEnv.get.metricsSystem.getSourcesByName("removePrefix.jobId").size should equal(1)
SparkEnv.get.metricsSystem.getSourcesByName("removePrefix.otherJobId").size should equal(1)

MetricsSource.removeJobMetrics("removePrefix.jobId")

Option(MetricsSource.metricsMap.get("removePrefix.jobId.name")).isDefined shouldBe false
Option(MetricsSource.metricsMap.get("removePrefix.otherJobId.name")).isDefined shouldBe true
SparkEnv.get.metricsSystem.getSourcesByName("removePrefix.jobId").size should equal(0)
SparkEnv.get.metricsSystem.getSourcesByName("removePrefix.otherJobId").size should equal(1)
}
}
3 changes: 2 additions & 1 deletion src/test/scala/cognite/spark/v1/SparkTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,13 @@ trait SparkTest {
private def getCounterSafe(metricName: String): Option[Long] =
Option(MetricsSource.metricsMap
.get(metricName))
.map(_.value.getCount)
.map(_.value.counter.getCount)

private def getCounter(metricName: String): Long =
MetricsSource.metricsMap
.get(metricName)
.value
.counter
.getCount

def getNumberOfRowsRead(metricsPrefix: String, resourceType: String): Long =
Expand Down
Loading