Skip to content

Commit

Permalink
Add deregistering of metrics
Browse files Browse the repository at this point in the history
Upon job end, the driver may call a method to both clean the tracking
hashmap and deregister the source from the MetricsSystem.

The new method will remove all metrics belonging to a given job UUID.
  • Loading branch information
mr-celo committed May 2, 2024
1 parent 0b3eb30 commit d6679ad
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 19 deletions.
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ lazy val commonSettings = Seq(
description := "Spark data source for the Cognite Data Platform.",
licenses := List("Apache 2" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt")),
homepage := Some(url("https://github.com/cognitedata/cdp-spark-datasource")),
scalacOptions ++= Seq("-Xlint:unused", "-language:higherKinds", "-deprecation", "-feature"),
scalacOptions ++= Seq("-Xlint:unused", "-language:higherKinds", "-deprecation", "-feature") ++ (CrossVersion.partialVersion(scalaVersion.value) match {
// We use JavaConverters to remain backwards compatible with Scala 2.12,
// and to avoid a dependency on scala-collection-compat
case Some((2, 13)) => Seq("-Wconf:src=src/main/scala/cognite/spark/v1/MetricsSource.scala&cat=deprecation:i")
case _ => Seq.empty
}),
resolvers ++= Resolver.sonatypeOssRepos("releases"),
developers := List(
Developer(
Expand Down
65 changes: 48 additions & 17 deletions src/main/scala/cognite/spark/v1/MetricsSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,28 @@ package org.apache.spark.datasource

import cats.Eval
import com.codahale.metrics._
import org.apache.spark._

import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.SparkEnv
import org.log4s.getLogger
import scala.collection.JavaConverters._

class MetricsSource {
// Add metricNamespace to differentiate with spark system metrics.
// Keeps track of all the Metric instances that are being published
val metricsMap = new ConcurrentHashMap[String, Eval[Counter]]
val metricsMap = new ConcurrentHashMap[String, Eval[SourceCounter]]

def getOrCreateCounter(metricNamespace: String, metricName: String): Counter = {
val key = s"$metricNamespace.$metricName"

val wrapped = Eval.later {
getLogger.info(s"Creating and registering counter for $key")
val counter = new Counter
registerMetricSource(metricNamespace, metricName, counter)
counter
val source = registerMetricSource(metricNamespace, metricName, counter)
SourceCounter(source, counter)
}
val key = s"$metricNamespace.$metricName"

metricsMap.putIfAbsent(key, wrapped)
metricsMap.get(key).value
metricsMap.get(key).value.counter
}

/**
Expand All @@ -32,20 +36,47 @@ class MetricsSource {
* @param metricName name of the Metric
* @param metric com.codahale.metrics.Metric instance to be published
*/
def registerMetricSource(metricNamespace: String, metricName: String, metric: Metric): Unit = {
def registerMetricSource(metricNamespace: String, metricName: String, metric: Metric): Source = {
val env = SparkEnv.get
env.metricsSystem.registerSource(
new Source {
override val sourceName = s"${metricNamespace}"
override def metricRegistry: MetricRegistry = {
val metrics = new MetricRegistry
metrics.register(metricName, metric)
metrics
}
val source = new Source {
override val sourceName = s"${metricNamespace}"
override def metricRegistry: MetricRegistry = {
val metrics = new MetricRegistry
metrics.register(metricName, metric)
metrics
}
)
}
env.metricsSystem.registerSource(source)
source
}

/**
* Remove the metrics from a job.
*
* This method will deregister the metric from Spark's org.apache.spark.metrics.MetricsSystem
* and stops tracking that it was published
*/
def removeJobMetrics(metricNamespace: String, jobId: String): Unit = {
val key = s"$metricNamespace.$jobId"
val removed =
metricsMap.keys().asScala.filter(k => k.startsWith(key)).map(k => (k, metricsMap.remove(k)))

if (removed.nonEmpty) {
val logger = getLogger
val metricsSystem = SparkEnv.get.metricsSystem
removed
.map({ case (k, v) => (k, v.value.source) })
.foreach({
case (key, source) => {
logger.info(s"Deleting and removing counter for $key")
metricsSystem.removeSource(source)
}
})
}
}
}

// Singleton to make sure each metric is only registered once.
object MetricsSource extends MetricsSource

case class SourceCounter(source: Source, counter: Counter)
3 changes: 2 additions & 1 deletion src/test/scala/cognite/spark/v1/SparkTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,13 @@ trait SparkTest {
private def getCounterSafe(metricName: String): Option[Long] =
Option(MetricsSource.metricsMap
.get(metricName))
.map(_.value.getCount)
.map(_.value.counter.getCount)

private def getCounter(metricName: String): Long =
MetricsSource.metricsMap
.get(metricName)
.value
.counter
.getCount

def getNumberOfRowsRead(metricsPrefix: String, resourceType: String): Long =
Expand Down

0 comments on commit d6679ad

Please sign in to comment.