-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add deregistering of metrics #929
Conversation
def removeJobMetrics(metricNamespace: String, jobId: String): Unit = { | ||
val key = s"$metricNamespace.$jobId" | ||
val removed = | ||
metricsMap.keys().asScala.filter(k => k.startsWith(key)).map(k => (k, metricsMap.remove(k))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This tuple here is created just for logging purposes. I intend to remove the logging (and simplify this) after a bit of validation
Upon job end, the driver may call a method to both clean the tracking hashmap and deregister the source from the MetricsSystem. The new method will remove all metrics belonging to a given job UUID.
d6679ad
to
0f6d45e
Compare
* This method will deregister the metric from Spark's org.apache.spark.metrics.MetricsSystem | ||
* and stops tracking that it was published | ||
*/ | ||
def removeJobMetrics(metricNamespace: String, jobId: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def removeJobMetrics(metricNamespace: String, jobId: String): Unit = { | |
def removeJobMetrics(metricNamespace: String): Unit = { |
I think in datasource and this class metricNamespace is being actually metricsPrefix spark parameter and doesn't need to append jobId. Further hint at that is that jobId wasn't mentioned in this class before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah.. This in not super clear and it was a bit of a pain to stitch together (the jdbc sink adds a bunch of parameters, and it was hard to follow where each came from)
Essentially, it is true that from the point-of-view of the data source, there is no jobId. But the metricNamespace
already contains the jobId when called. So, from the point of view of the driver, it does have it.
But this is a very good point. There is no reason to expose this to the datasource. I will remove it here, and let the backend do its thing with the knowledge of what the driver does. Will do 👍
.foreach({ | ||
case (key, source) => { | ||
logger.info(s"Deleting and removing counter for $key") | ||
metricsSystem.removeSource(source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark is instrumented with the Dropwizard/Codahale metrics library. Several components of Spark are instrumented, notable for this work several metrics generating from Spark driver and executors components can be instrumented.
An important architectural details is that the metrics are sent directly from the sources to the sink. This is important when running on a cluster, as each executor will communicate directly to the sink, for better scalability.
So it seems like when we'll call this from driver it will not cleanup metrics on executors, but didn't try to verify what actually happens when there's driver and separate executors. Also would be good to double-check if all registered metrics continue to be exported. Just found https://github.com/cognitedata/spark-jdbc/blob/master/src/main/scala/org/apache/spark/metrics/sink/JdbcSink.scala#L89 that seems to try pruning them a little bit (but only the counter type)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why it wouldn't remove from the executors based on this.
Is it because the instance that is removing the sources and the counter will be different in the executor? Meaning that none will be found in the map, and therefore none will be remove as a source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that even though metrics registry is placed in sparkenv, it might still be isolated per-jvm
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #929 +/- ##
==========================================
+ Coverage 80.96% 81.01% +0.05%
==========================================
Files 46 46
Lines 3057 3066 +9
Branches 123 120 -3
==========================================
+ Hits 2475 2484 +9
Misses 582 582
|
This pull request seems a bit stale.. Shall we invite more to the party? |
Upon job end, the driver may call a method to both clean the tracking hashmap and deregister the source from the MetricsSystem.
The new method will remove all metrics belonging to a given job UUID.