Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4537][Streaming] Expand StreamingSource to add more metrics #3466

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
registerGauge("lastCompletedBatch_processEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)

// Gauge for last completed batch's delay information.
registerGauge("lastCompletedBatch_processTime",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its better to name this "processingTime"

_.lastCompletedBatch.flatMap(_.processingDelay).getOrElse(0L), -1L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the default value different within getOrElse, and the explicit default value? In fact the default value should not have to be specified twice. Since the registerGauge function already takes care of the nulls using Option and default value, we should not require Options here.

registerGauge("lastCompletedBatch_schedulingDelay",
_.lastCompletedBatch.flatMap(_.schedulingDelay).getOrElse(0L), -1L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

registerGauge("lastCompletedBatch_totalDelay",
_.lastCompletedBatch.flatMap(_.totalDelay).getOrElse(0L), -1L)

// Gauge for last received batch, useful for monitoring the streaming job's running status,
// displayed data -1 for any abnormal condition.
registerGauge("lastReceivedBatch_submissionTime",
Expand All @@ -70,4 +78,8 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that this is old code, but could you fix this as well? Default value should not have to specified twice.

registerGauge("lastReceivedBatch_processEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)

// Gauge for last received batch records and total received batch records.
registerGauge("lastReceivedBatchRecords", _.lastReceivedBatchRecords.values.sum, 0L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt it more consistent to name this lastReceivedBatch_records?

registerGauge("totalReceivedBatchRecords", _.numTotalReceivedBatchRecords, 0L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is more related to the global streaming metrics like totalCompletedBatches, it might be more consistent to put these near them and naming it totalReceivedRecords (please update the corresponding field in the listener as well if you change this).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if its not too much work, could you add totalProcessedRecords? That seems useful. If it is too complicated then dont worry about it for this PR.

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution
import org.apache.spark.Logging


private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
Expand All @@ -37,6 +36,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
private var totalCompletedBatches = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
private var totalReceivedBatchRecords = 0L

val batchDuration = ssc.graph.batchDuration.milliseconds

Expand Down Expand Up @@ -65,6 +65,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)

batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
totalReceivedBatchRecords += infos.map(_.numRecords).sum
}
}

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
Expand All @@ -83,6 +87,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
totalCompletedBatches
}

def numTotalReceivedBatchRecords: Long = synchronized {
totalReceivedBatchRecords
}

def numUnprocessedBatches: Long = synchronized {
waitingBatchInfos.size + runningBatchInfos.size
}
Expand Down