Skip to content

Commit

Permalink
Addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Oct 12, 2016
1 parent 38ac35e commit 42bc7bf
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,20 +285,20 @@ class KafkaSourceSuite extends KafkaSourceTest {
val listener = new QueryStatusCollector
spark.streams.addListener(listener)
try {
testStream(mapped)(
testStream(mapped)(
makeSureGetOffsetCalled,
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
AssertOnQuery { query =>
eventually(timeout(streamingTimeout)) {
assert(listener.lastTriggerStatus.nonEmpty)
}
val status = listener.lastTriggerStatus.get
assert(status.triggerStatus.get("numRows.input.total").toInt > 0)
assert(status.sourceStatuses(0).processingRate > 0.0)
true
AssertOnQuery { query =>
eventually(timeout(streamingTimeout)) {
assert(listener.lastTriggerStatus.nonEmpty)
}
)
val status = listener.lastTriggerStatus.get
assert(status.triggerDetails.get("numRows.input.total").toInt > 0)
assert(status.sourceStatuses(0).processingRate > 0.0)
true
}
)
} finally {
spark.streams.removeListener(listener)
}
Expand Down
43 changes: 22 additions & 21 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,22 +212,22 @@ def __str__(self):
Input rate: 15.5 rows/sec
Processing rate 23.5 rows/sec
Latency: 345.0 ms
Trigger status:
isActive: true
isDataAvailable: true
latency.getBatch: 20
latency.getOffset: 10
Trigger details:
isDataPresentInTrigger: true
isTriggerActive: true
latency.getBatch.total: 20
latency.getOffset.total: 10
numRows.input.total: 100
triggerId: 5
Source statuses [1 source]:
Source 1: MySource1
Available offset: #0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger status:
Trigger details:
numRows.input.source: 100
latency.sourceGetOffset: 10
latency.sourceGetBatch: 20
latency.getOffset.source: 10
latency.getBatch.source: 20
Sink status: MySink
Committed offsets: [#1, -]
"""
Expand Down Expand Up @@ -334,18 +334,19 @@ def sinkStatus(self):
@property
@ignore_unicode_prefix
@since(2.1)
def triggerStatus(self):
def triggerDetails(self):
"""
Low-level detailed status of the currently active trigger (e.g. number of rows processed
in trigger, latency of intermediate steps, etc.).
If no trigger is currently active, then it will have details of the last completed trigger.
>>> sqs.triggerStatus
{u'latency.getOffset': u'10', u'triggerId': u'5', u'isDataAvailable': u'true',
u'numRows.input.total': u'100', u'latency.getBatch': u'20', u'isActive': u'true'}
>>> sqs.triggerDetails
{u'triggerId': u'5', u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
u'isTriggerActive': u'true', u'latency.getOffset.total': u'10',
u'isDataPresentInTrigger': u'true'}
"""
return self._jsqs.triggerStatus()
return self._jsqs.triggerDetails()


class SourceStatus(object):
Expand All @@ -369,10 +370,10 @@ def __str__(self):
Available offset: #0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger status:
Trigger details:
numRows.input.source: 100
latency.sourceGetOffset: 10
latency.sourceGetBatch: 20
latency.getOffset.source: 10
latency.getBatch.source: 20
"""
return self._jss.toString()

Expand Down Expand Up @@ -425,18 +426,18 @@ def processingRate(self):
@property
@ignore_unicode_prefix
@since(2.1)
def triggerStatus(self):
def triggerDetails(self):
"""
Low-level detailed status of the currently active trigger (e.g. number of rows processed
in trigger, latency of intermediate steps, etc.).
If no trigger is currently active, then it will have details of the last completed trigger.
>>> sqs.sourceStatuses[0].triggerStatus
{u'numRows.input.source': u'100', u'latency.sourceGetOffset': u'10',
u'latency.sourceGetBatch': u'20'}
>>> sqs.sourceStatuses[0].triggerDetails
{u'numRows.input.source': u'100', u'latency.getOffset.source': u'10',
u'latency.getBatch.source': u'20'}
"""
return self._jss.triggerStatus()
return self._jss.triggerDetails()


class SinkStatus(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class StreamExecution(

triggerExecutor.execute(() => {
streamMetrics.reportTriggerStarted(currentBatchId)
streamMetrics.reportTriggerStatus(STATUS_MESSAGE, "Finding new data from sources")
streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "Finding new data from sources")
updateStatus()
val isTerminated = reportTimeTaken(TRIGGER_LATENCY) {
if (isActive) {
Expand All @@ -212,15 +212,15 @@ class StreamExecution(
constructNextBatch()
}
if (dataAvailable) {
streamMetrics.reportTriggerStatus(IS_DATA_PRESENT_IN_TRIGGER, true)
streamMetrics.reportTriggerStatus(STATUS_MESSAGE, "Processing new data")
streamMetrics.reportTriggerDetail(IS_DATA_PRESENT_IN_TRIGGER, true)
streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "Processing new data")
updateStatus()
runBatch()
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
} else {
streamMetrics.reportTriggerStatus(IS_DATA_PRESENT_IN_TRIGGER, false)
streamMetrics.reportTriggerStatus(STATUS_MESSAGE, "No new data")
streamMetrics.reportTriggerDetail(IS_DATA_PRESENT_IN_TRIGGER, false)
streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "No new data")
updateStatus()
Thread.sleep(pollingDelayMs)
}
Expand Down Expand Up @@ -624,41 +624,41 @@ class StreamExecution(

streamMetrics.reportNumInputRows(sourceToNumInputRows)
stateNodes.zipWithIndex.foreach { case (s, i) =>
streamMetrics.reportTriggerStatus(
streamMetrics.reportTriggerDetail(
NUM_TOTAL_STATE_ROWS(i + 1),
s.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L))
streamMetrics.reportTriggerStatus(
streamMetrics.reportTriggerDetail(
NUM_UPDATED_STATE_ROWS(i + 1),
s.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
}
updateStatus()
}

private def reportTimeTaken[T](triggerStatusKey: String)(body: => T): T = {
private def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = {
val startTime = triggerClock.getTimeMillis()
val result = body
val endTime = triggerClock.getTimeMillis()
val timeTaken = math.max(endTime - startTime, 0)
streamMetrics.reportTriggerStatus(triggerStatusKey, timeTaken)
streamMetrics.reportTriggerDetail(triggerDetailKey, timeTaken)
updateStatus()
if (triggerStatusKey == TRIGGER_LATENCY) {
if (triggerDetailKey == TRIGGER_LATENCY) {
logInfo(s"Completed up to $availableOffsets in $timeTaken ms")
}
result
}

private def reportTimeTaken[T](source: Source, triggerStatusKey: String)(body: => T): T = {
private def reportTimeTaken[T](source: Source, triggerDetailKey: String)(body: => T): T = {
val startTime = triggerClock.getTimeMillis()
val result = body
val endTime = triggerClock.getTimeMillis()
streamMetrics.reportSourceTriggerStatus(
source, triggerStatusKey, math.max(endTime - startTime, 0))
streamMetrics.reportSourceTriggerDetail(
source, triggerDetailKey, math.max(endTime - startTime, 0))
updateStatus()
result
}

private def reportTimestamp(triggerStatusKey: String): Unit = {
streamMetrics.reportTriggerStatus(triggerStatusKey, triggerClock.getTimeMillis)
private def reportTimestamp(triggerDetailKey: String): Unit = {
streamMetrics.reportTriggerDetail(triggerDetailKey, triggerClock.getTimeMillis)
updateStatus()
}

Expand All @@ -667,10 +667,10 @@ class StreamExecution(
val sourceStatuses = sources.map { s =>
SourceStatus(
s.toString,
localAvailableOffsets.get(s).map(_.toString).getOrElse("-"),
localAvailableOffsets.get(s).map(_.toString).getOrElse("-"), // TODO: use json if available
streamMetrics.currentSourceInputRate(s),
streamMetrics.currentSourceProcessingRate(s),
streamMetrics.currentSourceTriggerStatus(s))
streamMetrics.currentSourceTriggerDetails(s))
}.toArray
val sinkStatus = SinkStatus(
sink.toString,
Expand All @@ -686,7 +686,7 @@ class StreamExecution(
latency = streamMetrics.currentLatency(),
sourceStatuses = sourceStatuses,
sinkStatus = sinkStatus,
triggerStatus = streamMetrics.currentTriggerStatus())
triggerDetails = streamMetrics.currentTriggerDetails())
}

trait State
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceNam
import StreamMetrics._

// Trigger infos
private val triggerStatus = new mutable.HashMap[String, String]
private val sourceTriggerStatus = new mutable.HashMap[Source, mutable.HashMap[String, String]]
private val triggerDetails = new mutable.HashMap[String, String]
private val sourceTriggerDetails = new mutable.HashMap[Source, mutable.HashMap[String, String]]

// Rate estimators for sources and sinks
private val inputRates = new mutable.HashMap[Source, RateCalculator]
Expand All @@ -70,7 +70,7 @@ class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceNam
sources.foreach { s =>
inputRates.put(s, new RateCalculator)
processingRates.put(s, new RateCalculator)
sourceTriggerStatus.put(s, new mutable.HashMap[String, String])
sourceTriggerDetails.put(s, new mutable.HashMap[String, String])

registerGauge(s"inputRate-${s.toString}", () => currentSourceInputRate(s))
registerGauge(s"processingRate-${s.toString}", () => currentSourceProcessingRate(s))
Expand All @@ -80,22 +80,22 @@ class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceNam

def reportTriggerStarted(triggerId: Long): Unit = synchronized {
numInputRows.clear()
triggerStatus.clear()
sourceTriggerStatus.values.foreach(_.clear())
triggerDetails.clear()
sourceTriggerDetails.values.foreach(_.clear())

reportTriggerStatus(TRIGGER_ID, triggerId)
sources.foreach(s => reportSourceTriggerStatus(s, TRIGGER_ID, triggerId))
reportTriggerStatus(IS_TRIGGER_ACTIVE, true)
reportTriggerDetail(TRIGGER_ID, triggerId)
sources.foreach(s => reportSourceTriggerDetail(s, TRIGGER_ID, triggerId))
reportTriggerDetail(IS_TRIGGER_ACTIVE, true)
currentTriggerStartTimestamp = triggerClock.getTimeMillis()
reportTriggerStatus(START_TIMESTAMP, currentTriggerStartTimestamp)
reportTriggerDetail(START_TIMESTAMP, currentTriggerStartTimestamp)
}

def reportTriggerStatus[T](key: String, value: T): Unit = synchronized {
triggerStatus.put(key, value.toString)
def reportTriggerDetail[T](key: String, value: T): Unit = synchronized {
triggerDetails.put(key, value.toString)
}

def reportSourceTriggerStatus[T](source: Source, key: String, value: T): Unit = synchronized {
sourceTriggerStatus(source).put(key, value.toString)
def reportSourceTriggerDetail[T](source: Source, key: String, value: T): Unit = synchronized {
sourceTriggerDetails(source).put(key, value.toString)
}

def reportNumInputRows(inputRows: Map[Source, Long]): Unit = synchronized {
Expand All @@ -105,15 +105,15 @@ class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceNam
def reportTriggerFinished(): Unit = synchronized {
require(currentTriggerStartTimestamp >= 0)
val currentTriggerFinishTimestamp = triggerClock.getTimeMillis()
reportTriggerStatus(FINISH_TIMESTAMP, currentTriggerFinishTimestamp)
reportTriggerStatus(STATUS_MESSAGE, "")
reportTriggerStatus(IS_TRIGGER_ACTIVE, false)
reportTriggerDetail(FINISH_TIMESTAMP, currentTriggerFinishTimestamp)
reportTriggerDetail(STATUS_MESSAGE, "")
reportTriggerDetail(IS_TRIGGER_ACTIVE, false)

// Report number of rows
val totalNumInputRows = numInputRows.values.sum
reportTriggerStatus(NUM_INPUT_ROWS, totalNumInputRows)
reportTriggerDetail(NUM_INPUT_ROWS, totalNumInputRows)
numInputRows.foreach { case (s, r) =>
reportSourceTriggerStatus(s, NUM_SOURCE_INPUT_ROWS, r)
reportSourceTriggerDetail(s, NUM_SOURCE_INPUT_ROWS, r)
}

val currentTriggerDuration = currentTriggerFinishTimestamp - currentTriggerStartTimestamp
Expand Down Expand Up @@ -173,10 +173,10 @@ class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceNam

def currentLatency(): Option[Double] = synchronized { latency }

def currentTriggerStatus(): Map[String, String] = synchronized { triggerStatus.toMap }
def currentTriggerDetails(): Map[String, String] = synchronized { triggerDetails.toMap }

def currentSourceTriggerStatus(source: Source): Map[String, String] = synchronized {
sourceTriggerStatus(source).toMap
def currentSourceTriggerDetails(source: Source): Map[String, String] = synchronized {
sourceTriggerDetails(source).toMap
}

// =========== Other methods ===========
Expand Down Expand Up @@ -226,9 +226,9 @@ object StreamMetrics extends Logging {
val GET_BATCH_TIMESTAMP = "timestamp.afterGetBatch"
val FINISH_TIMESTAMP = "timestamp.triggerFinish"

val GET_OFFSET_LATENCY = "latency.getOffset"
val GET_OFFSET_LATENCY = "latency.getOffset.total"
val GET_BATCH_LATENCY = "latency.getBatch.total"
val OFFSET_WAL_WRITE_LATENCY = "latency.offsetLogWrite"
val GET_BATCH_LATENCY = "latency.getBatch"
val OPTIMIZER_LATENCY = "latency.optimizer"
val TRIGGER_LATENCY = "latency.fullTrigger"
val SOURCE_GET_OFFSET_LATENCY = "latency.getOffset.source"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
* @param inputRate Current rate (rows/sec) at which data is being generated by the source.
* @param processingRate Current rate (rows/sec) at which the query is processing data from
* the source.
* @param triggerStatus Low-level detailed status of the currently active trigger (e.g. number of
* @param triggerDetails Low-level details of the currently active trigger (e.g. number of
* rows processed in trigger, latency of intermediate steps, etc.).
* If no trigger is active, then it will have details of the last completed
* trigger.
Expand All @@ -45,20 +45,20 @@ class SourceStatus private(
val offsetDesc: String,
val inputRate: Double,
val processingRate: Double,
val triggerStatus: ju.Map[String, String]) {
val triggerDetails: ju.Map[String, String]) {

override def toString: String =
"SourceStatus:" + indent(prettyString)

private[sql] def prettyString: String = {
val triggerStatusStrings =
triggerStatus.asScala.map { case (k, v) => s"$k: $v" }
val triggerDetailsLines =
triggerDetails.asScala.map { case (k, v) => s"$k: $v" }
s"""$description
|Available offset: $offsetDesc
|Input rate: $inputRate rows/sec
|Processing rate: $processingRate rows/sec
|Trigger status:
|""".stripMargin + indent(triggerStatusStrings)
|Trigger details:
|""".stripMargin + indent(triggerDetailsLines)

}
}
Expand All @@ -70,7 +70,7 @@ private[sql] object SourceStatus {
offsetDesc: String,
inputRate: Double,
processingRate: Double,
triggerStatus: Map[String, String]): SourceStatus = {
new SourceStatus(desc, offsetDesc, inputRate, processingRate, triggerStatus.asJava)
triggerDetails: Map[String, String]): SourceStatus = {
new SourceStatus(desc, offsetDesc, inputRate, processingRate, triggerDetails.asJava)
}
}
Loading

0 comments on commit 42bc7bf

Please sign in to comment.