From 976116bf151160a31ab8f26317170b46b046a7b0 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 16 Apr 2014 15:22:25 +0800 Subject: [PATCH 1/3] Add StreamSource in StreamingContext for better monitoring through metrics system --- .../spark/streaming/StreamingContext.scala | 4 + .../spark/streaming/StreamingSource.scala | 73 +++++++++++++++++++ .../ui/StreamingJobProgressListener.scala | 3 +- 3 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 1c89543058211..e0677b795cb94 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -154,6 +154,10 @@ class StreamingContext private[streaming] ( private[streaming] val uiTab = new StreamingTab(this) + /** Register streaming source to metrics system */ + private val streamingSource = new StreamingSource(this) + SparkEnv.get.metricsSystem.registerSource(streamingSource) + /** Enumeration to identify current state of the StreamingContext */ private[streaming] object StreamingContextState extends Enumeration { type CheckpointState = Value diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala new file mode 100644 index 0000000000000..3015f0411817e --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source +import org.apache.spark.streaming.ui.StreamingJobProgressListener + +private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { + val metricRegistry = new MetricRegistry + val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName) + + val streamingListener = ssc.uiTab.listener + + private def registerGauge[T](name: String, f: StreamingJobProgressListener => T, + defaultValue: T) { + metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] { + override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue) + }) + } + + // Gauge for number of network receivers + registerGauge("networkReceivers", _.numNetworkReceivers, 0) + + // Gauge for number of total completed batches + registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0l) + + // Gauge for number of unprocessed batches + registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0l) + + // Gauge for number of waiting batches + registerGauge("waitingBatches", _.waitingBatches.size, 0l) + + // Gauge for number of running batches + registerGauge("runningBatches", _.runningBatches.size, 0l) + + // Gauge for number of retained completed batches + registerGauge("retainedCompletedBatches", _.retainedCompletedBatches.size, 0l) + + // Gauge for last completed batch, useful for monitoring the streaming job's running status, + // displayed data -1 for any abnormal condition. + registerGauge("lastCompletedBatch_submissionTime", + _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1l), -1l) + registerGauge("lastCompletedBatch_processStartTime", + _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1l), -1l) + registerGauge("lastCompletedBatch_processEndTime", + _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1l), -1l) + + // Gauge for last received batch, useful for monitoring the streaming job's running status, + // displayed data -1 for any abnormal condition. + registerGauge("lastReceivedBatch_submissionTime", + _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1l), -1l) + registerGauge("lastReceivedBatch_processStartTime", + _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1l), -1l) + registerGauge("lastReceivedBatch_processEndTime", + _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1l), -1l) +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index bf637c1446314..14c33c728bfe1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -28,7 +28,8 @@ import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted import org.apache.spark.util.Distribution -private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener { +private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) + extends StreamingListener { private val waitingBatchInfos = new HashMap[Time, BatchInfo] private val runningBatchInfos = new HashMap[Time, BatchInfo] From 21939f53d9438ed458847bb5223329c005826d57 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 17 Apr 2014 08:57:22 +0800 Subject: [PATCH 2/3] Style changes according to style check error --- .../spark/streaming/StreamingSource.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala index 3015f0411817e..fdd7f91e681f7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala @@ -39,35 +39,35 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { registerGauge("networkReceivers", _.numNetworkReceivers, 0) // Gauge for number of total completed batches - registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0l) + registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L) // Gauge for number of unprocessed batches - registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0l) + registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L) // Gauge for number of waiting batches - registerGauge("waitingBatches", _.waitingBatches.size, 0l) + registerGauge("waitingBatches", _.waitingBatches.size, 0L) // Gauge for number of running batches - registerGauge("runningBatches", _.runningBatches.size, 0l) + registerGauge("runningBatches", _.runningBatches.size, 0L) // Gauge for number of retained completed batches - registerGauge("retainedCompletedBatches", _.retainedCompletedBatches.size, 0l) + registerGauge("retainedCompletedBatches", _.retainedCompletedBatches.size, 0L) // Gauge for last completed batch, useful for monitoring the streaming job's running status, // displayed data -1 for any abnormal condition. registerGauge("lastCompletedBatch_submissionTime", - _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1l), -1l) + _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L) registerGauge("lastCompletedBatch_processStartTime", - _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1l), -1l) + _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) registerGauge("lastCompletedBatch_processEndTime", - _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1l), -1l) + _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) // Gauge for last received batch, useful for monitoring the streaming job's running status, // displayed data -1 for any abnormal condition. registerGauge("lastReceivedBatch_submissionTime", - _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1l), -1l) + _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L) registerGauge("lastReceivedBatch_processStartTime", - _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1l), -1l) + _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) registerGauge("lastReceivedBatch_processEndTime", - _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1l), -1l) + _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) } From fb3b0a5881323caf151721f85f351c36b53ec6b8 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 24 Apr 2014 16:21:40 +0800 Subject: [PATCH 3/3] Modify according master update --- .../main/scala/org/apache/spark/streaming/StreamingSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala index fdd7f91e681f7..774adc3c23c21 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala @@ -36,7 +36,7 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { } // Gauge for number of network receivers - registerGauge("networkReceivers", _.numNetworkReceivers, 0) + registerGauge("receivers", _.numReceivers, 0) // Gauge for number of total completed batches registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)