Skip to content

Commit

Permalink
Fixed metrics reported by MicroBatchExecution
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Jan 31, 2018
1 parent 35b8854 commit e66d809
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,17 @@ class MicroBatchExecution(
}
case s: MicroBatchReader =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("getOffset") {
// Once v1 streaming source execution is gone, we can refactor this away.
// For now, we set the range here to get the source to infer the available end offset,
// get that offset, and then set the range again when we later execute.
s.setOffsetRange(
toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
Optional.empty())

(s, Option(s.getEndOffset))
reportTimeTaken("setOffsetRange") {
// Once v1 streaming source execution is gone, we can refactor this away.
// For now, we set the range here to get the source to infer the available end offset,
// get that offset, and then set the range again when we later execute.
s.setOffsetRange(
toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
Optional.empty())
}

val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() }
(s, Option(currentOffset))
}.toMap
availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
package org.apache.spark.sql.streaming

import java.{util => ju}
import java.util.Optional
import java.util.concurrent.CountDownLatch

import org.apache.commons.lang3.RandomStringUtils
import org.mockito.Mockito._
import org.scalactic.TolerantNumerics
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.mockito.MockitoSugar

Expand All @@ -38,7 +37,6 @@ import org.apache.spark.sql.sources.v2.reader.ReadTask
import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2}
import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ManualClock

class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging with MockitoSugar {

Expand Down Expand Up @@ -209,18 +207,28 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi

/** Custom MemoryStream that waits for manual clock to reach a time */
val inputData = new MemoryStream[Int](0, sqlContext) {
// getOffset should take 50 ms the first time it is called
override def getEndOffset: OffsetV2 = {
val offset = super.getEndOffset
if (offset != null) {
clock.waitTillTime(1050)

private def dataAdded: Boolean = currentOffset.offset != -1

// setOffsetRange should take 50 ms the first time it is called after data is added
override def setOffsetRange(start: Optional[OffsetV2], end: Optional[OffsetV2]): Unit = {
synchronized {
if (dataAdded) clock.waitTillTime(1050)
super.setOffsetRange(start, end)
}
}

// getEndOffset should take 100 ms the first time it is called after data is added
override def getEndOffset(): OffsetV2 = synchronized {
if (currentOffset.offset != -1) { // no data available
clock.waitTillTime(1150)
}
offset
super.getEndOffset
}

// getBatch should take 100 ms the first time it is called
override def createReadTasks(): ju.List[ReadTask[Row]] = {
if (getStartOffset.asInstanceOf[LongOffset].offset == -1L) clock.waitTillTime(1150)
override def createReadTasks(): ju.List[ReadTask[Row]] = synchronized {
clock.waitTillTime(1350)
super.createReadTasks()
}
}
Expand Down Expand Up @@ -261,39 +269,44 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
AssertOnQuery(_.status.message === "Waiting for next trigger"),
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),

// Test status and progress while offset is being fetched
// Test status and progress when setOffsetRange is being called
AddData(inputData, 1, 2),
AdvanceManualClock(1000), // time = 1000 to start new trigger, will block on getOffset
AdvanceManualClock(1000), // time = 1000 to start new trigger, will block on setOffsetRange
AssertStreamExecThreadIsWaitingForTime(1050),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message.startsWith("Getting offsets from")),
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),

// Test status and progress while batch is being fetched
AdvanceManualClock(50), // time = 1050 to unblock getOffset
AdvanceManualClock(50), // time = 1050 to unblock setOffsetRange
AssertClockTime(1050),
AssertStreamExecThreadIsWaitingForTime(1150), // will block on getBatch that needs 1150
AssertStreamExecThreadIsWaitingForTime(1150), // will block on getEndOffset that needs 1150
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message.startsWith("Getting offsets from")),
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),

AdvanceManualClock(100), // time = 1150 to unblock getEndOffset
AssertClockTime(1150),
AssertStreamExecThreadIsWaitingForTime(1350), // will block on createReadTasks that needs 1350
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message === "Processing new data"),
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),

// Test status and progress while batch is being processed
AdvanceManualClock(100), // time = 1150 to unblock getBatch
AssertClockTime(1150),
AssertStreamExecThreadIsWaitingForTime(1500), // will block in Spark job that needs 1500
AdvanceManualClock(200), // time = 1350 to unblock createReadTasks
AssertClockTime(1350),
AssertStreamExecThreadIsWaitingForTime(1500), // will block on map task that needs 1500
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message === "Processing new data"),
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),

// Test status and progress while batch processing has completed
AssertOnQuery { _ => clock.getTimeMillis() === 1150 },
AdvanceManualClock(350), // time = 1500 to unblock job
AdvanceManualClock(150), // time = 1500 to unblock map task
AssertClockTime(1500),
CheckAnswer(2),
AssertStreamExecThreadIsWaitingForTime(2000),
AssertStreamExecThreadIsWaitingForTime(2000), // will block until the next trigger
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === false),
AssertOnQuery(_.status.message === "Waiting for next trigger"),
Expand All @@ -310,10 +323,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
assert(progress.numInputRows === 2)
assert(progress.processedRowsPerSecond === 4.0)

assert(progress.durationMs.get("getOffset") === 50)
assert(progress.durationMs.get("getBatch") === 100)
assert(progress.durationMs.get("setOffsetRange") === 50)
assert(progress.durationMs.get("getEndOffset") === 100)
assert(progress.durationMs.get("queryPlanning") === 0)
assert(progress.durationMs.get("walCommit") === 0)
assert(progress.durationMs.get("addBatch") === 350)
assert(progress.durationMs.get("triggerExecution") === 500)

assert(progress.sources.length === 1)
Expand Down

0 comments on commit e66d809

Please sign in to comment.