-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27641][CORE] Fix MetricsSystem to remove unregistered source correctly #24556
Conversation
val regName = buildRegistryName(source) | ||
registry.removeMatching((name: String, _: Metric) => name.startsWith(regName)) | ||
if (sources.contains(source)) { | ||
sources -= source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really necessary to navigate sources
twice - the first during contains
, the 2nd - while removing it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add existing check because original code removes the metrics no matter whether the source was registered or not. i will use index to avoid the twice navigating.thanks for your advice.
if (sources.contains(source)) { | ||
sources -= source | ||
val regName = buildRegistryName(source) | ||
registry.removeMatching((name: String, _: Metric) => name.startsWith(regName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original JIRA is more generic than it seems to be.
AccumulatorSource
there is just for example. Just consider any other custom Source
and not just AccumulatorSource
.
There is a bug in the MetricsSystem that allows to add (supports) multiple sources with the same name, but does not allow to remove a single source with its metrics only. Removing a single source leads to removing all the registered metrics from all the sources of the same name.
Removing metrics from this shared registry by exact match seems to be more accurate than just by the corresponding prefix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for my misunderstanding.I will correct it soon.
@@ -64,10 +64,11 @@ class DoubleAccumulatorSource extends AccumulatorSource | |||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes in this file seems to be not necessary if removing metrics by exact match from the shared registry as described above.
private[spark] def getRegisteredNames(source: Source): Seq[String] = { | ||
val sourceNamePrefix = buildRegistryName(source) | ||
val metricNames = source.metricRegistry.getMetrics.keySet() | ||
JavaConverters.asScalaSet(metricNames).map(k => s"${sourceNamePrefix}.${k}").toSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- There is
MetricRegistry.name
method to create names instead of string interpolation. - It seems that JavaConverters are not used in such a way throughout the codebase
- The result of the method is
ArrayBuffer
, so every lookup isO(N)
. What is the reason of converting aSet
into aSeq
?
if (index != -1) { | ||
sources.remove(index) | ||
val regNames = getRegisteredNames(source) | ||
registry.removeMatching((name: String, _: Metric) => regNames.contains(name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really necessary to iterate over all the metrics in the registry to remove the necessary metrics?
Isn't it more efficient just to remove the metrics from the registry by their names using MetricRegistry.remove(String)
?
@@ -64,10 +64,11 @@ class DoubleAccumulatorSource extends AccumulatorSource | |||
*/ | |||
@Experimental | |||
object LongAccumulatorSource { | |||
def register(sc: SparkContext, accumulators: Map[String, LongAccumulator]): Unit = { | |||
def register(sc: SparkContext, accumulators: Map[String, LongAccumulator]): Source = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason of changing this public API and exposing the package private Source
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it give a chance to remove the registered source of AccumulatorSource's subclass in spark
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AccumulatorSource
is package private as well. So to extend it, it will be necessary to place your code within the same package. So there is no need to expose package private Source
and make it a part of the public contract.
To implement tests without changing the API you could use something like the following
val source = new LongAccumulatorSource
source.register(accumulators)
sc.env.metricsSystem.registerSource(source)
... instead of
LongAccumulatorSource.register(mockContext, Map("laF" -> laFirst, "laS" -> laSecond))
So the necessary source to delete during the tests will be already stored in the local variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense. thanks for your advise.
val source = new LongAccumulatorSource | ||
source.register(accumulators) | ||
sc.env.metricsSystem.registerSource(source) | ||
source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason of changing this public API and exposing the package private Source
?
@@ -81,9 +82,10 @@ object LongAccumulatorSource { | |||
*/ | |||
@Experimental | |||
object DoubleAccumulatorSource { | |||
def register(sc: SparkContext, accumulators: Map[String, DoubleAccumulator]): Unit = { | |||
def register(sc: SparkContext, accumulators: Map[String, DoubleAccumulator]): Source = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason of changing this public API and exposing the package private Source
?
val source = new DoubleAccumulatorSource | ||
source.register(accumulators) | ||
sc.env.metricsSystem.registerSource(source) | ||
source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason of changing this public API and exposing the package private Source
?
assert(metricsSystem.invokePrivate(registry()).getNames.contains("AccumulatorSource.laT")) | ||
assert(!metricsSystem.invokePrivate(registry()).getNames.contains("AccumulatorSource.laF")) | ||
assert(!metricsSystem.invokePrivate(registry()).getNames.contains("AccumulatorSource.laS")) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest to pay an attention to how the other test cases in this test suite are organised by grouping the related blocks of code with the empty lines.
val regName = buildRegistryName(source) | ||
registry.removeMatching((name: String, _: Metric) => name.startsWith(regName)) | ||
val index = sources.indexOf(source) | ||
if (index != -1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there use sources.contains(source)
, it looks more graceful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use index can avoid navigating sources twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using index can avoid navigating sources twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it, now.
@@ -166,9 +172,12 @@ private[spark] class MetricsSystem private ( | |||
} | |||
|
|||
def removeSource(source: Source) { | |||
sources -= source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think sources -= source is
is already looks good.
The method -=
of mutable.ArrayBuffer
as follows:
def -= (x: A): this.type = {
val i = indexOf(x)
if (i != -1) remove(i)
this
}
I think the original code works good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but the original code will remove the source's metrics even if it is not registered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand your meaning.If source not registered,The method -=
of mutable.ArrayBuffer
do nothing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def removeSource(source: Source) {
sources -= source
val regName = buildRegistryName(source)
registry.removeMatching((name: String, _: Metric) => name.startsWith(regName))
}
Even if the source
is not in sources
the original code will execute the code after sources -= source
which will remove the metrics of the source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I get it.
This change looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What metrics would it remove erroneously, if the source were never registered?
@@ -64,10 +64,11 @@ class DoubleAccumulatorSource extends AccumulatorSource | |||
*/ | |||
@Experimental | |||
object LongAccumulatorSource { | |||
def register(sc: SparkContext, accumulators: Map[String, LongAccumulator]): Unit = { | |||
def register(sc: SparkContext, accumulators: Map[String, LongAccumulator]): Source = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AccumulatorSource
is package private as well. So to extend it, it will be necessary to place your code within the same package. So there is no need to expose package private Source
and make it a part of the public contract.
To implement tests without changing the API you could use something like the following
val source = new LongAccumulatorSource
source.register(accumulators)
sc.env.metricsSystem.registerSource(source)
... instead of
LongAccumulatorSource.register(mockContext, Map("laF" -> laFirst, "laS" -> laSecond))
So the necessary source to delete during the tests will be already stored in the local variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the PR!
Can one of the admins verify this patch? |
too old to merge,so close it |
What changes were proposed in this pull request?
LongAccumulatorSource
andDoubleAccumulatorSource
's register method,then we can remove it usemetricsSystem.removeSource
How was this patch tested?
add new unit tests