-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4537][Streaming] Expand StreamingSource to add more metrics #3466
Conversation
Test build #23868 has started for PR 3466 at commit
|
Test build #23868 has finished for PR 3466 at commit
|
Test PASSed. |
// Gauge for last received batch records and total received batch records. | ||
private var totalReceivedBatchRecords: Long = 0L | ||
def getTotalReceivedBatchRecords(listener: StreamingJobProgressListener): Long = { | ||
totalReceivedBatchRecords += listener.lastReceivedBatchRecords.values.sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this counter work? I think that gauges are collected only on request of a source, so if nobody is consuming the metric or consuming it too often, we will have a wrong count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, you're right, so it is hard to collect the total batch records without modifying the StreamingJobProgressListener
code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix this, thanks a lot.
Test build #23913 has started for PR 3466 at commit
|
Test build #23913 has finished for PR 3466 at commit
|
Test PASSed. |
@@ -62,6 +62,14 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { | |||
registerGauge("lastCompletedBatch_processEndTime", | |||
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) | |||
|
|||
// Gauge for last completed batch's delay information. | |||
registerGauge("lastCompletedBatch_processTime", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its better to name this "processingTime"
Thanks a lot TD for your comments, I will factor out the old code and fix the above issues you mentioned. |
Test build #24751 has started for PR 3466 at commit
|
Hey TD, I've addressed the problem you mentioned in this way, I'm not is this what you want, would you mind taking a look at it ? Thanks a lot. |
defaultValue: T) { | ||
metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] { | ||
override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its better to keep the Option here (and document that defaultValue
is used when f
returns null
. And other places should not have to use Option. This is safer for any one to use and also minimizes the changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will revert it back and try a better way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi TD, what's your meaning of "And other places should not have to use Option", If here as an example, change to
registerGauge("lastCompletedBatch_submissionTime",
_.lastCompletedBatch.map(_.submissionTime).get, -1L)
get
will throw exception when there's no completed batch. I'm not sure what's actual meaning, sorry if I misunderstand anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi TD, sorry to bother you again, I'm not if there's a better way to address this problem, would you mind giving me some hints, thanks a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the problem. Good catch, I did not realize that. How about this. Lets make two versions of registerGauge, one that takes f: StreamingProgressListener => T
without any default value, another that takes f: StreamingProgressListener => Option[T]
and the default value. Each version will be used accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, got it, I will change the code as you suggested.
Test build #24751 has finished for PR 3466 at commit
|
Test PASSed. |
Test build #24808 has started for PR 3466 at commit
|
Test build #24808 has finished for PR 3466 at commit
|
Test PASSed. |
@@ -35,6 +35,15 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { | |||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: The existing version of registerGauge could have used the new version. Not a big deal, very small amount of duplicate code.
Just a couple of more comments for making the name more consistent with existing ones. Otherwise I approve of the how the |
Test build #24827 has started for PR 3466 at commit
|
Hi TD, thanks a lot for your comments, I just change the code style as you suggested, also add one more metrics |
LGTM. Merging this. Thanks @jerryshao |
Add `processingDelay`, `schedulingDelay` and `totalDelay` for the last completed batch. Add `lastReceivedBatchRecords` and `totalReceivedBatchRecords` to the received records counting. Author: jerryshao <saisai.shao@intel.com> Closes #3466 from jerryshao/SPARK-4537 and squashes the following commits: 00f5f7f [jerryshao] Change the code style and add totalProcessedRecords 44721a6 [jerryshao] Further address the comments c097ddc [jerryshao] Address the comments 02dd44f [jerryshao] Fix the addressed comments c7a9376 [jerryshao] Expand StreamingSource to add more metrics (cherry picked from commit f205fe4) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Add `processingDelay`, `schedulingDelay` and `totalDelay` for the last completed batch. Add `lastReceivedBatchRecords` and `totalReceivedBatchRecords` to the received records counting. Author: jerryshao <saisai.shao@intel.com> Closes #3466 from jerryshao/SPARK-4537 and squashes the following commits: 00f5f7f [jerryshao] Change the code style and add totalProcessedRecords 44721a6 [jerryshao] Further address the comments c097ddc [jerryshao] Address the comments 02dd44f [jerryshao] Fix the addressed comments c7a9376 [jerryshao] Expand StreamingSource to add more metrics (cherry picked from commit f205fe4) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Test build #24827 has finished for PR 3466 at commit
|
Test PASSed. |
Add
processingDelay
,schedulingDelay
andtotalDelay
for the last completed batch. AddlastReceivedBatchRecords
andtotalReceivedBatchRecords
to the received records counting.