Skip to content

Commit

Permalink
[SPARK-29973][SS] Make processedRowsPerSecond calculated more accur…
Browse files Browse the repository at this point in the history
…ately and meaningfully

### What changes were proposed in this pull request?

Give `processingTimeSec` 0.001 when a micro-batch completed under 1ms.

### Why are the changes needed?

The `processingTimeSec` of batch may be less than 1 ms.  As `processingTimeSec` is calculated in ms, so `processingTimeSec` equals 0L. If there is no data in this batch, the `processedRowsPerSecond` equals `0/0.0d`, i.e. `Double.NaN`. If there are some data in this batch, the `processedRowsPerSecond` equals `N/0.0d`, i.e. `Double.Infinity`.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Add new UT

Closes #26610 from uncleGen/SPARK-29973.

Authored-by: uncleGen <hustyugm@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
  • Loading branch information
uncleGen authored and srowen committed Nov 24, 2019
1 parent a60da23 commit 3d74090
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
Expand Up @@ -148,8 +148,8 @@ trait ProgressReporter extends Logging {
currentTriggerEndTimestamp = triggerClock.getTimeMillis()

val executionStats = extractExecutionStats(hasNewData)
val processingTimeSec =
(currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND
val processingTimeSec = Math.max(1L,
currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND

val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
(currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND
Expand Down
Expand Up @@ -24,12 +24,15 @@ import scala.collection.JavaConverters._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._
import org.apache.spark.sql.streaming.StreamingQuerySuite.clock
import org.apache.spark.sql.streaming.util.StreamManualClock

class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
test("StreamingQueryProgress - prettyJson") {
Expand Down Expand Up @@ -215,6 +218,45 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
}
}

test("SPARK-29973: Make `processedRowsPerSecond` calculated more accurately and meaningfully") {
import testImplicits._

clock = new StreamManualClock
val inputData = MemoryStream[Int]
val query = inputData.toDS()

testStream(query)(
StartStream(Trigger.ProcessingTime(1000), triggerClock = clock),
AdvanceManualClock(1000),
waitUntilBatchProcessed,
AssertOnQuery(query => {
assert(query.lastProgress.numInputRows == 0)
assert(query.lastProgress.processedRowsPerSecond == 0.0d)
true
}),
AddData(inputData, 1, 2),
AdvanceManualClock(1000),
waitUntilBatchProcessed,
AssertOnQuery(query => {
assert(query.lastProgress.numInputRows == 2)
assert(query.lastProgress.processedRowsPerSecond == 2000d)
true
}),
StopStream
)
}

def waitUntilBatchProcessed: AssertOnQuery = Execute { q =>
eventually(Timeout(streamingTimeout)) {
if (q.exception.isEmpty) {
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
}
}
if (q.exception.isDefined) {
throw q.exception.get
}
}

def assertJson(source: String, expected: String): Unit = {
assert(
source.replaceAll("\r\n|\r|\n", System.lineSeparator) ===
Expand Down

0 comments on commit 3d74090

Please sign in to comment.