Skip to content

Commit

Permalink
Add deregistering of metrics
Browse files Browse the repository at this point in the history
Upon job end, the driver may call a method to both clean the tracking
hashmap and deregister the source from the MetricsSystem.

The new method will remove all metrics belonging to a given job UUID.
  • Loading branch information
mr-celo committed May 2, 2024
1 parent 0b3eb30 commit 0f6d45e
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 19 deletions.
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ lazy val commonSettings = Seq(
description := "Spark data source for the Cognite Data Platform.",
licenses := List("Apache 2" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt")),
homepage := Some(url("https://github.com/cognitedata/cdp-spark-datasource")),
scalacOptions ++= Seq("-Xlint:unused", "-language:higherKinds", "-deprecation", "-feature"),
scalacOptions ++= Seq("-Xlint:unused", "-language:higherKinds", "-deprecation", "-feature") ++ (CrossVersion.partialVersion(scalaVersion.value) match {
// We use JavaConverters to remain backwards compatible with Scala 2.12,
// and to avoid a dependency on scala-collection-compat
case Some((2, 13)) => Seq("-Wconf:src=src/main/scala/cognite/spark/v1/MetricsSource.scala&cat=deprecation:i")
case _ => Seq.empty
}),
resolvers ++= Resolver.sonatypeOssRepos("releases"),
developers := List(
Developer(
Expand Down
65 changes: 48 additions & 17 deletions src/main/scala/cognite/spark/v1/MetricsSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,28 @@ package org.apache.spark.datasource

import cats.Eval
import com.codahale.metrics._
import org.apache.spark._

import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.SparkEnv
import org.log4s.getLogger
import scala.collection.JavaConverters._

class MetricsSource {
// Add metricNamespace to differentiate with spark system metrics.
// Keeps track of all the Metric instances that are being published
val metricsMap = new ConcurrentHashMap[String, Eval[Counter]]
val metricsMap = new ConcurrentHashMap[String, Eval[SourceCounter]]

def getOrCreateCounter(metricNamespace: String, metricName: String): Counter = {
val key = s"$metricNamespace.$metricName"

val wrapped = Eval.later {
getLogger.info(s"Creating and registering counter for $key")
val counter = new Counter
registerMetricSource(metricNamespace, metricName, counter)
counter
val source = registerMetricSource(metricNamespace, metricName, counter)
SourceCounter(source, counter)
}
val key = s"$metricNamespace.$metricName"

metricsMap.putIfAbsent(key, wrapped)
metricsMap.get(key).value
metricsMap.get(key).value.counter
}

/**
Expand All @@ -32,20 +36,47 @@ class MetricsSource {
* @param metricName name of the Metric
* @param metric com.codahale.metrics.Metric instance to be published
*/
def registerMetricSource(metricNamespace: String, metricName: String, metric: Metric): Unit = {
def registerMetricSource(metricNamespace: String, metricName: String, metric: Metric): Source = {
val env = SparkEnv.get
env.metricsSystem.registerSource(
new Source {
override val sourceName = s"${metricNamespace}"
override def metricRegistry: MetricRegistry = {
val metrics = new MetricRegistry
metrics.register(metricName, metric)
metrics
}
val source = new Source {
override val sourceName = s"${metricNamespace}"
override def metricRegistry: MetricRegistry = {
val metrics = new MetricRegistry
metrics.register(metricName, metric)
metrics
}
)
}
env.metricsSystem.registerSource(source)
source
}

/**
* Remove the metrics from a job.
*
* This method will deregister the metric from Spark's org.apache.spark.metrics.MetricsSystem
* and stops tracking that it was published
*/
def removeJobMetrics(metricNamespace: String, jobId: String): Unit = {
val key = s"$metricNamespace.$jobId"
val removed =
metricsMap.keys().asScala.filter(k => k.startsWith(key)).map(k => (k, metricsMap.remove(k)))

if (removed.nonEmpty) {
val logger = getLogger
val metricsSystem = SparkEnv.get.metricsSystem
removed
.map({ case (k, v) => (k, v.value.source) })
.foreach({
case (key, source) => {
logger.info(s"Deleting and removing counter for $key")
metricsSystem.removeSource(source)
}
})
}
}
}

// Singleton to make sure each metric is only registered once.
object MetricsSource extends MetricsSource

case class SourceCounter(source: Source, counter: Counter)
47 changes: 47 additions & 0 deletions src/test/scala/cognite/spark/v1/MetricsSourceTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.apache.spark.datasource

import org.scalatest.{FlatSpec, Matchers}
import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession

class MetricsSourceTest extends FlatSpec with Matchers {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.config("spark.ui.enabled", "false") // comment this out to use Spark UI during tests, on https://localhost:4040 by default
// https://medium.com/@mrpowers/how-to-cut-the-run-time-of-a-spark-sbt-test-suite-by-40-52d71219773f
.config("spark.sql.shuffle.partitions", "1")
.config("spark.sql.storeAssignmentPolicy", "legacy")
.config("spark.app.id", this.getClass.getName + math.floor(math.random() * 1000).toLong.toString)
.getOrCreate()

"A MetricsSource" should "register a new metric only once" in {
val metric = MetricsSource.getOrCreateCounter("prefix", "name")
val sameMetric = MetricsSource.getOrCreateCounter("prefix", "name")

metric.getCount() should equal(0)
sameMetric.getCount() should equal(0)

metric.inc()

metric.getCount() should equal(1)
sameMetric.getCount() should equal(1)
}

it should "deregister a metric only once" in {
MetricsSource.getOrCreateCounter("removePrefix.jobId", "name")
MetricsSource.getOrCreateCounter("removePrefix.otherJobId", "name")

Option(MetricsSource.metricsMap.get("removePrefix.jobId.name")) shouldBe 'defined
Option(MetricsSource.metricsMap.get("removePrefix.otherJobId.name")) shouldBe 'defined
SparkEnv.get.metricsSystem.getSourcesByName("removePrefix.jobId").size should equal(1)
SparkEnv.get.metricsSystem.getSourcesByName("removePrefix.otherJobId").size should equal(1)

MetricsSource.removeJobMetrics("removePrefix", "jobId")

Option(MetricsSource.metricsMap.get("removePrefix.jobId.name")) shouldBe 'empty
Option(MetricsSource.metricsMap.get("removePrefix.otherJobId.name")) shouldBe 'defined
SparkEnv.get.metricsSystem.getSourcesByName("removePrefix.jobId") shouldBe 'empty
SparkEnv.get.metricsSystem.getSourcesByName("removePrefix.otherJobId").size should equal(1)
}
}
3 changes: 2 additions & 1 deletion src/test/scala/cognite/spark/v1/SparkTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,13 @@ trait SparkTest {
private def getCounterSafe(metricName: String): Option[Long] =
Option(MetricsSource.metricsMap
.get(metricName))
.map(_.value.getCount)
.map(_.value.counter.getCount)

private def getCounter(metricName: String): Long =
MetricsSource.metricsMap
.get(metricName)
.value
.counter
.getCount

def getNumberOfRowsRead(metricsPrefix: String, resourceType: String): Long =
Expand Down

0 comments on commit 0f6d45e

Please sign in to comment.