Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,12 @@ def __str__(self):
Processing rate 23.5 rows/sec
Latency: 345.0 ms
Trigger details:
batchId: 5
isDataPresentInTrigger: true
isTriggerActive: true
latency.getBatch.total: 20
latency.getOffset.total: 10
numRows.input.total: 100
triggerId: 5
Source statuses [1 source]:
Source 1 - MySource1
Available offset: 0
Expand Down Expand Up @@ -341,8 +341,8 @@ def triggerDetails(self):
If no trigger is currently active, then it will have details of the last completed trigger.

>>> sqs.triggerDetails
{u'triggerId': u'5', u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
u'isTriggerActive': u'true', u'latency.getOffset.total': u'10',
{u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
u'isTriggerActive': u'true', u'batchId': u'5', u'latency.getOffset.total': u'10',
u'isDataPresentInTrigger': u'true'}
"""
return self._jsqs.triggerDetails()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceNam

// =========== Setter methods ===========

def reportTriggerStarted(triggerId: Long): Unit = synchronized {
def reportTriggerStarted(batchId: Long): Unit = synchronized {
numInputRows.clear()
triggerDetails.clear()
sourceTriggerDetails.values.foreach(_.clear())

reportTriggerDetail(TRIGGER_ID, triggerId)
sources.foreach(s => reportSourceTriggerDetail(s, TRIGGER_ID, triggerId))
reportTriggerDetail(BATCH_ID, batchId)
sources.foreach(s => reportSourceTriggerDetail(s, BATCH_ID, batchId))
reportTriggerDetail(IS_TRIGGER_ACTIVE, true)
currentTriggerStartTimestamp = triggerClock.getTimeMillis()
reportTriggerDetail(START_TIMESTAMP, currentTriggerStartTimestamp)
Expand Down Expand Up @@ -217,7 +217,7 @@ object StreamMetrics extends Logging {
}


val TRIGGER_ID = "triggerId"
val BATCH_ID = "batchId"
val IS_TRIGGER_ACTIVE = "isTriggerActive"
val IS_DATA_PRESENT_IN_TRIGGER = "isDataPresentInTrigger"
val STATUS_MESSAGE = "statusMessage"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class StreamingQueryStatus private(
("inputRate" -> JDouble(inputRate)) ~
("processingRate" -> JDouble(processingRate)) ~
("latency" -> latency.map(JDouble).getOrElse(JNothing)) ~
("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala))
("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) ~
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for triggerDetails not being in the status.json.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

("sourceStatuses" -> JArray(sourceStatuses.map(_.jsonValue).toList)) ~
("sinkStatus" -> sinkStatus.jsonValue)
}
Expand Down Expand Up @@ -151,7 +151,7 @@ private[sql] object StreamingQueryStatus {
desc = "MySink",
offsetDesc = OffsetSeq(Some(LongOffset(1)) :: None :: Nil).toString),
triggerDetails = Map(
TRIGGER_ID -> "5",
BATCH_ID -> "5",
IS_TRIGGER_ACTIVE -> "true",
IS_DATA_PRESENT_IN_TRIGGER -> "true",
GET_OFFSET_LATENCY -> "10",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ class StreamMetricsSuite extends SparkFunSuite {
assert(sm.currentSourceProcessingRate(source) === 0.0)
assert(sm.currentLatency() === None)
assert(sm.currentTriggerDetails() ===
Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
START_TIMESTAMP -> "0", "key" -> "value"))
assert(sm.currentSourceTriggerDetails(source) ===
Map(TRIGGER_ID -> "1", "key2" -> "value2"))
Map(BATCH_ID -> "1", "key2" -> "value2"))

// Finishing the trigger should calculate the rates, except input rate which needs
// to have another trigger interval
Expand All @@ -66,11 +66,11 @@ class StreamMetricsSuite extends SparkFunSuite {
assert(sm.currentSourceProcessingRate(source) === 100.0)
assert(sm.currentLatency() === None)
assert(sm.currentTriggerDetails() ===
Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
START_TIMESTAMP -> "0", FINISH_TIMESTAMP -> "1000",
NUM_INPUT_ROWS -> "100", "key" -> "value"))
assert(sm.currentSourceTriggerDetails(source) ===
Map(TRIGGER_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2"))
Map(BATCH_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2"))

// After another trigger starts, the rates and latencies should not change until
// new rows are reported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
AssertOnLastQueryStatus { status: StreamingQueryStatus =>
// Check the correctness of the trigger info of the last completed batch reported by
// onQueryProgress
assert(status.triggerDetails.containsKey("triggerId"))
assert(status.triggerDetails.containsKey("batchId"))
assert(status.triggerDetails.get("isTriggerActive") === "false")
assert(status.triggerDetails.get("isDataPresentInTrigger") === "true")

Expand All @@ -104,7 +104,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1")

assert(status.sourceStatuses.length === 1)
assert(status.sourceStatuses(0).triggerDetails.containsKey("triggerId"))
assert(status.sourceStatuses(0).triggerDetails.containsKey("batchId"))
assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100")
assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200")
assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
| Processing rate 23.5 rows/sec
| Latency: 345.0 ms
| Trigger details:
| batchId: 5
| isDataPresentInTrigger: true
| isTriggerActive: true
| latency.getBatch.total: 20
| latency.getOffset.total: 10
| numRows.input.total: 100
| triggerId: 5
| Source statuses [1 source]:
| Source 1 - MySource1
| Available offset: 0
Expand All @@ -72,7 +72,11 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
test("json") {
assert(StreamingQueryStatus.testStatus.json ===
"""
|{"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5,
|{"name":"query","id":1,"timestamp":123,"inputRate":15.5,"processingRate":23.5,
|"latency":345.0,"triggerDetails":{"latency.getBatch.total":"20",
|"numRows.input.total":"100","isTriggerActive":"true","batchId":"5",
|"latency.getOffset.total":"10","isDataPresentInTrigger":"true"},
|"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5,
|"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100",
|"latency.getOffset.source":"10","latency.getBatch.source":"20"}}],
|"sinkStatus":{"description":"MySink","offsetDesc":"[1, -]"}}
Expand All @@ -84,6 +88,20 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
StreamingQueryStatus.testStatus.prettyJson ===
"""
|{
| "name" : "query",
| "id" : 1,
| "timestamp" : 123,
| "inputRate" : 15.5,
| "processingRate" : 23.5,
| "latency" : 345.0,
| "triggerDetails" : {
| "latency.getBatch.total" : "20",
| "numRows.input.total" : "100",
| "isTriggerActive" : "true",
| "batchId" : "5",
| "latency.getOffset.total" : "10",
| "isDataPresentInTrigger" : "true"
| },
| "sourceStatuses" : [ {
| "description" : "MySource1",
| "offsetDesc" : "0",
Expand Down