From eec0ad08e390831717203f6d002e3b1218de6d36 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 22 Aug 2018 15:10:32 -0700 Subject: [PATCH 1/2] fix test --- .../sources/RateStreamProviderSuite.scala | 39 +++++++++++++++++-- .../spark/sql/streaming/StreamTest.scala | 5 ++- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala index 9c1756d68ccc..abf7218b4ae4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala @@ -81,12 +81,43 @@ class RateSourceSuite extends StreamTest { .load() testStream(input)( AdvanceRateManualClock(seconds = 1), - CheckLastBatch((0 until 10).map(v => new java.sql.Timestamp(v * 100L) -> v): _*), + CheckLastBatch((0 until 10).map(v => new java.sql.Timestamp(v * 100L) -> v): _*) + ) + } + + test("microbatch - restart") { + val input = spark.readStream + .format("rate") + .option("rowsPerSecond", "10") + .load() + .select('value) + + var streamDuration = 0 + + // Microbatch rate stream offsets contain the number of seconds since the beginning of + // the stream. + def updateStreamDurationFromOffset(s: StreamExecution, expectedMin: Int): Unit = { + streamDuration = s.lastProgress.sources(0).endOffset.toInt + assert(streamDuration >= expectedMin) + } + + // We have to use the lambda version of CheckAnswer because we don't know the right range + // until we see the last offset. + def expectedResultsFromDuration(rows: Seq[Row]): Unit = { + assert(rows.map(_.getLong(0)).sorted == (0 until (streamDuration * 10))) + } + + testStream(input)( + StartStream(), + Execute(_.awaitOffset(0, LongOffset(2))), StopStream, + Execute(updateStreamDurationFromOffset(_, 2)), + CheckAnswer(expectedResultsFromDuration _), StartStream(), - // Advance 2 seconds because creating a new RateSource will also create a new ManualClock - AdvanceRateManualClock(seconds = 2), - CheckLastBatch((10 until 20).map(v => new java.sql.Timestamp(v * 100L) -> v): _*) + Execute(_.awaitOffset(0, LongOffset(4))), + StopStream, + Execute(updateStreamDurationFromOffset(_, 4)), + CheckAnswer(expectedResultsFromDuration _) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index cd9b892eca1f..491dc34afa14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -735,7 +735,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be } case CheckAnswerRowsByFunc(globalCheckFunction, lastOnly) => - val sparkAnswer = fetchStreamAnswer(currentStream, lastOnly) + val sparkAnswer = currentStream match { + case null => fetchStreamAnswer(lastStream, lastOnly) + case s => fetchStreamAnswer(s, lastOnly) + } try { globalCheckFunction(sparkAnswer) } catch { From ade656a3cf3edb32d80e66d03276584c0c86c4d0 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Wed, 22 Aug 2018 15:50:02 -0700 Subject: [PATCH 2/2] fix build --- .../streaming/sources/RateStreamProviderSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala index abf7218b4ae4..dd74af873c2e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming._ @@ -109,12 +110,12 @@ class RateSourceSuite extends StreamTest { testStream(input)( StartStream(), - Execute(_.awaitOffset(0, LongOffset(2))), + Execute(_.awaitOffset(0, LongOffset(2), streamingTimeout.toMillis)), StopStream, Execute(updateStreamDurationFromOffset(_, 2)), CheckAnswer(expectedResultsFromDuration _), StartStream(), - Execute(_.awaitOffset(0, LongOffset(4))), + Execute(_.awaitOffset(0, LongOffset(4), streamingTimeout.toMillis)), StopStream, Execute(updateStreamDurationFromOffset(_, 4)), CheckAnswer(expectedResultsFromDuration _)