From 729ecedab4b61804a9717cadbd4f2c7b6aa50176 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Fri, 18 Nov 2016 15:26:25 -0800 Subject: [PATCH 01/21] added CurrentBatchTimestamp --- .../expressions/datetimeExpressions.scala | 22 +++++++++++++++++++ .../streaming/IncrementalExecution.scala | 14 +++++++++++- .../execution/streaming/StreamExecution.scala | 16 +++++++++++--- .../execution/streaming/StreamProgress.scala | 4 ++-- 4 files changed, 50 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 9cec6be841de0..f7a83b455c528 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -71,6 +71,28 @@ case class CurrentTimestamp() extends LeafExpression with CodegenFallback { override def prettyName: String = "current_timestamp" } +/** + * Expression representing the current batch time, which is used by StreamExecution to + * 1. prevent optimizer from pushing this expression below a stateful operator + * 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp) + * + * There is no code generation since this expression should be replaced with a literal. + */ +@ExpressionDescription( + usage = "_FUNC_() - Returns the current timestamp at the start of batch evaluation.") +case class CurrentBatchTimestamp(timestamp: Long) extends LeafExpression + with CodegenFallback with Nondeterministic { + override def nullable: Boolean = false + + override def dataType: DataType = TimestampType + + override def prettyName: String = "current_batch_timestamp" + + override protected def initializeInternal(partitionIndex: Int): Unit = {} + + override protected def evalInternal(input: InternalRow): Any = timestamp +} + /** * Adds a number of days to startdate. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index e9d072f8a98b0..f9d9424f3599e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.sql.{InternalOutputModes, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Literal} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode} @@ -49,6 +50,17 @@ class IncrementalExecution( sparkSession.sessionState.conf, stateStrategy) + /** + * See [SPARK-18339] + * Walk the optimized logical plan and replace CurrentBatchTimestamp + * with the desired literal + */ + override lazy val optimizedPlan: LogicalPlan = { + sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions { + case CurrentBatchTimestamp(timestamp) => Literal(timestamp) + } + } + /** * Records the current id for a given stateful operator in the query plan as the `state` * preparation walks the query plan. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 3ca6feac05cef..1e07bdd4bffe9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} @@ -95,6 +95,9 @@ class StreamExecution( /** The current eventTime watermark, used to bound the lateness of data that will processed. */ private var currentEventTimeWatermark: Long = 0 + /** The current batch processing timestamp */ + private var currentBatchTimestamp: Long = 0 + /** All stream sources present in the query plan. */ private val sources = logicalPlan.collect { case s: StreamingExecutionRelation => s.source } @@ -288,7 +291,11 @@ class StreamExecution( logInfo(s"Resuming streaming query, starting with batch $batchId") currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) - logDebug(s"Found possibly uncommitted offsets $availableOffsets") + currentBatchTimestamp = nextOffsets.metadata.getOrElse( + throw new IllegalStateException("OffsetLog does not contain current batch timestamp!") + ).toLong + logDebug(s"Found possibly uncommitted offsets $availableOffsets " + + s"at batch timestamp $currentBatchTimestamp") offsetLog.get(batchId - 1).foreach { case lastOffsets => @@ -344,8 +351,10 @@ class StreamExecution( } } if (hasNewData) { + currentBatchTimestamp = triggerClock.getTimeMillis() * 1000L reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) { - assert(offsetLog.add(currentBatchId, availableOffsets.toOffsetSeq(sources)), + assert(offsetLog.add(currentBatchId, + availableOffsets.toOffsetSeq(sources, Some(currentBatchTimestamp.toString))), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") @@ -422,6 +431,7 @@ class StreamExecution( val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) + case t: CurrentTimestamp => CurrentBatchTimestamp(currentBatchTimestamp) } val executedPlan = reportTimeTaken(OPTIMIZER_LATENCY) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index 05a65476709cd..57ca7a578574b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -26,8 +26,8 @@ class StreamProgress( val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset]) extends scala.collection.immutable.Map[Source, Offset] { - def toOffsetSeq(source: Seq[Source]): OffsetSeq = { - OffsetSeq(source.map(get)) + def toOffsetSeq(source: Seq[Source], metadata: Option[String] = None): OffsetSeq = { + OffsetSeq(source.map(get), metadata) } override def toString: String = From b8a1f71bef2aa4c11c08178b2250bd995e952601 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Fri, 18 Nov 2016 15:35:27 -0800 Subject: [PATCH 02/21] update comment --- .../apache/spark/sql/execution/streaming/StreamExecution.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 1e07bdd4bffe9..56bf545a4c5b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -351,7 +351,8 @@ class StreamExecution( } } if (hasNewData) { - currentBatchTimestamp = triggerClock.getTimeMillis() * 1000L + // Current batch timestamp in seconds + currentBatchTimestamp = triggerClock.getTimeMillis() / 1000L reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) { assert(offsetLog.add(currentBatchId, availableOffsets.toOffsetSeq(sources, Some(currentBatchTimestamp.toString))), From 8f0a27329ea711d1936c2df11a310129e22eb9b5 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Sun, 20 Nov 2016 12:41:49 -0800 Subject: [PATCH 03/21] add test for filtering time-based aggregation --- .../streaming/IncrementalExecution.scala | 7 ++- .../streaming/StreamingAggregationSuite.scala | 46 +++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index f9d9424f3599e..6520ed511790b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Literal} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical._ @@ -35,7 +36,7 @@ class IncrementalExecution( val checkpointLocation: String, val currentBatchId: Long, val currentEventTimeWatermark: Long) - extends QueryExecution(sparkSession, logicalPlan) { + extends QueryExecution(sparkSession, logicalPlan) with Logging { // TODO: make this always part of planning. val stateStrategy = @@ -57,7 +58,9 @@ class IncrementalExecution( */ override lazy val optimizedPlan: LogicalPlan = { sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions { - case CurrentBatchTimestamp(timestamp) => Literal(timestamp) + case CurrentBatchTimestamp(timestamp) => + logInfo(s"Current batch timestamp = $timestamp") + Literal(timestamp) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index e59b5491f90b6..31fdaabbb4ed9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ +import org.apache.spark.util.ManualClock object FailureSinglton { var firstTime = true @@ -235,4 +236,49 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { CheckLastBatch(("a", 30), ("b", 3), ("c", 1)) ) } + + test("prune results by time, complete mode") { + import testImplicits._ + import StreamingAggregationSuite._ + clock = new StreamManualClock + + val inputData = MemoryStream[Long] + + val aggregated = + inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .as[(Long, Long)] + .where('value >= current_timestamp().cast("long") - 10L) + + testStream(aggregated, Complete)( + StartStream(ProcessingTime("10 seconds"), triggerClock = clock), + + // advance clock to 10 seconds + AddData(inputData, 0L, 5L, 5L, 10L), + AdvanceManualClock(10 * 1000), + AssertOnQuery { _ => clock.getTimeMillis() === 10 * 1000 }, + CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), + + // advance clock to 20 seconds + AddData(inputData, 15L, 15L, 20L), + AdvanceManualClock(10 * 1000), + CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), + + // advance clock to 30 seconds + AddData(inputData, 0L), + AdvanceManualClock(10 * 1000), + CheckLastBatch((20L, 1)), + + // advance clock to 40 seconds + AddData(inputData, 25L, 30L, 40L, 45L), + AdvanceManualClock(10 * 1000), + CheckLastBatch((30L, 1), (40L, 1), (45L, 1)) + ) + } +} + +object StreamingAggregationSuite { + // Singleton reference to clock that does not get serialized in task closures + @volatile var clock: ManualClock = null } From ce6335a694cf73ff0dae9c25c470c8edac1357d1 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Mon, 21 Nov 2016 14:09:59 -0800 Subject: [PATCH 04/21] addressing feedback from Ryran --- .../execution/streaming/StreamExecution.scala | 58 ++++++++++++++----- 1 file changed, 43 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 56bf545a4c5b4..50c5f67a41d79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import org.apache.hadoop.fs.Path +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging import org.apache.spark.sql._ @@ -37,6 +39,26 @@ import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} +/** + * Contains metadata associated with a stream execution. This information is + * persisted to the offset log via the OffsetSeq metadata field. Current + * information contained in this object includes: + * + * 1. currentEventTimeWatermark: The current eventTime watermark, used to + * bound the lateness of data that will processed. + * 2. currentBatchTimestamp: The current batch processing timestamp + */ +case class StreamExecutionMetadata( + var currentEventTimeWatermark: Long = 0, + var currentBatchTimestamp: Long = 0) { + private implicit val formats = Serialization.formats(NoTypeHints) + + /** + * JSON string representation of this object. + */ + def json: String = Serialization.write(this) +} + /** * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread. * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any @@ -92,11 +114,8 @@ class StreamExecution( /** The current batchId or -1 if execution has not yet been initialized. */ private var currentBatchId: Long = -1 - /** The current eventTime watermark, used to bound the lateness of data that will processed. */ - private var currentEventTimeWatermark: Long = 0 - - /** The current batch processing timestamp */ - private var currentBatchTimestamp: Long = 0 + /** stream execution metadata */ + private var streamExecutionMetadata = StreamExecutionMetadata() /** All stream sources present in the query plan. */ private val sources = @@ -291,11 +310,11 @@ class StreamExecution( logInfo(s"Resuming streaming query, starting with batch $batchId") currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) - currentBatchTimestamp = nextOffsets.metadata.getOrElse( + streamExecutionMetadata = StreamExecutionMetadata(nextOffsets.metadata.getOrElse( throw new IllegalStateException("OffsetLog does not contain current batch timestamp!") - ).toLong + )) logDebug(s"Found possibly uncommitted offsets $availableOffsets " + - s"at batch timestamp $currentBatchTimestamp") + s"at batch timestamp ${streamExecutionMetadata.currentBatchTimestamp}") offsetLog.get(batchId - 1).foreach { case lastOffsets => @@ -352,10 +371,10 @@ class StreamExecution( } if (hasNewData) { // Current batch timestamp in seconds - currentBatchTimestamp = triggerClock.getTimeMillis() / 1000L + streamExecutionMetadata.currentBatchTimestamp = triggerClock.getTimeMillis() / 1000L reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) { assert(offsetLog.add(currentBatchId, - availableOffsets.toOffsetSeq(sources, Some(currentBatchTimestamp.toString))), + availableOffsets.toOffsetSeq(sources, Some(streamExecutionMetadata.json))), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") @@ -432,7 +451,8 @@ class StreamExecution( val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) - case t: CurrentTimestamp => CurrentBatchTimestamp(currentBatchTimestamp) + case t: CurrentTimestamp => + CurrentBatchTimestamp(streamExecutionMetadata.currentBatchTimestamp) } val executedPlan = reportTimeTaken(OPTIMIZER_LATENCY) { @@ -442,7 +462,7 @@ class StreamExecution( outputMode, checkpointFile("state"), currentBatchId, - currentEventTimeWatermark) + streamExecutionMetadata.currentEventTimeWatermark) lastExecution.executedPlan // Force the lazy generation of execution plan } @@ -458,11 +478,12 @@ class StreamExecution( logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}") (e.maxEventTime.value / 1000) - e.delay.milliseconds() }.headOption.foreach { newWatermark => - if (newWatermark > currentEventTimeWatermark) { + if (newWatermark > streamExecutionMetadata.currentEventTimeWatermark) { logInfo(s"Updating eventTime watermark to: $newWatermark ms") - currentEventTimeWatermark = newWatermark + streamExecutionMetadata.currentEventTimeWatermark = newWatermark } else { - logTrace(s"Event time didn't move: $newWatermark < $currentEventTimeWatermark") + logTrace(s"Event time didn't move: $newWatermark < " + + s"$streamExecutionMetadata.currentEventTimeWatermark") } if (newWatermark != 0) { @@ -745,6 +766,13 @@ class StreamExecution( case object TERMINATED extends State } +object StreamExecutionMetadata { + private implicit val formats = Serialization.formats(NoTypeHints) + + def apply(json: String): StreamExecutionMetadata = + Serialization.read[StreamExecutionMetadata](json) +} + object StreamExecution { private val _nextId = new AtomicLong(0) From 88ddbc2aa613bc6ff2771ef0168c30c60d0c8abc Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Mon, 21 Nov 2016 14:18:32 -0800 Subject: [PATCH 05/21] update --- .../apache/spark/sql/execution/streaming/StreamExecution.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 50c5f67a41d79..f38a1ee60eb31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -51,7 +51,7 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} case class StreamExecutionMetadata( var currentEventTimeWatermark: Long = 0, var currentBatchTimestamp: Long = 0) { - private implicit val formats = Serialization.formats(NoTypeHints) + private implicit val formats = StreamExecutionMetadata.formats /** * JSON string representation of this object. From c1b3e601d9feaa38a2fe6eef4d20dab5181e5eda Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Mon, 21 Nov 2016 16:53:53 -0800 Subject: [PATCH 06/21] updates based on feedback from Ryan and Burak --- .../expressions/datetimeExpressions.scala | 8 ++--- .../execution/streaming/StreamExecution.scala | 28 +++++++-------- .../sql/streaming/StreamExecutionSuite.scala | 36 +++++++++++++++++++ 3 files changed, 53 insertions(+), 19 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index f7a83b455c528..3792eb5514e29 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -23,9 +23,9 @@ import java.util.{Calendar, Locale, TimeZone} import scala.util.Try import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, - ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -78,9 +78,7 @@ case class CurrentTimestamp() extends LeafExpression with CodegenFallback { * * There is no code generation since this expression should be replaced with a literal. */ -@ExpressionDescription( - usage = "_FUNC_() - Returns the current timestamp at the start of batch evaluation.") -case class CurrentBatchTimestamp(timestamp: Long) extends LeafExpression +case class CurrentBatchTimestamp(timestamp: SQLTimestamp) extends LeafExpression with CodegenFallback with Nondeterministic { override def nullable: Boolean = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index f38a1ee60eb31..ab7befc5a1cfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -31,9 +31,10 @@ import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentTimestamp} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.streaming._ @@ -45,12 +46,13 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} * information contained in this object includes: * * 1. currentEventTimeWatermark: The current eventTime watermark, used to - * bound the lateness of data that will processed. - * 2. currentBatchTimestamp: The current batch processing timestamp + * bound the lateness of data that will processed. Time unit: milliseconds + * 2. currentBatchTimestamp: The current batch processing timestamp. + * Time unit: microseconds */ case class StreamExecutionMetadata( - var currentEventTimeWatermark: Long = 0, - var currentBatchTimestamp: Long = 0) { + var currentEventTimeWatermarkMillis: Long = 0, + var currentBatchTimestamp: SQLTimestamp = 0) { private implicit val formats = StreamExecutionMetadata.formats /** @@ -310,9 +312,7 @@ class StreamExecution( logInfo(s"Resuming streaming query, starting with batch $batchId") currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) - streamExecutionMetadata = StreamExecutionMetadata(nextOffsets.metadata.getOrElse( - throw new IllegalStateException("OffsetLog does not contain current batch timestamp!") - )) + streamExecutionMetadata = StreamExecutionMetadata(nextOffsets.metadata.getOrElse("{}")) logDebug(s"Found possibly uncommitted offsets $availableOffsets " + s"at batch timestamp ${streamExecutionMetadata.currentBatchTimestamp}") @@ -371,7 +371,7 @@ class StreamExecution( } if (hasNewData) { // Current batch timestamp in seconds - streamExecutionMetadata.currentBatchTimestamp = triggerClock.getTimeMillis() / 1000L + streamExecutionMetadata.currentBatchTimestamp = triggerClock.getTimeMillis() * 1000L reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) { assert(offsetLog.add(currentBatchId, availableOffsets.toOffsetSeq(sources, Some(streamExecutionMetadata.json))), @@ -451,8 +451,8 @@ class StreamExecution( val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) - case t: CurrentTimestamp => - CurrentBatchTimestamp(streamExecutionMetadata.currentBatchTimestamp) + case _: CurrentTimestamp | _: CurrentDate => + CurrentBatchTimestamp(streamExecutionMetadata.currentBatchTimestamp / 1000000L) } val executedPlan = reportTimeTaken(OPTIMIZER_LATENCY) { @@ -462,7 +462,7 @@ class StreamExecution( outputMode, checkpointFile("state"), currentBatchId, - streamExecutionMetadata.currentEventTimeWatermark) + streamExecutionMetadata.currentEventTimeWatermarkMillis) lastExecution.executedPlan // Force the lazy generation of execution plan } @@ -478,9 +478,9 @@ class StreamExecution( logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}") (e.maxEventTime.value / 1000) - e.delay.milliseconds() }.headOption.foreach { newWatermark => - if (newWatermark > streamExecutionMetadata.currentEventTimeWatermark) { + if (newWatermark > streamExecutionMetadata.currentEventTimeWatermarkMillis) { logInfo(s"Updating eventTime watermark to: $newWatermark ms") - streamExecutionMetadata.currentEventTimeWatermark = newWatermark + streamExecutionMetadata.currentEventTimeWatermarkMillis = newWatermark } else { logTrace(s"Event time didn't move: $newWatermark < " + s"$streamExecutionMetadata.currentEventTimeWatermark") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionSuite.scala new file mode 100644 index 0000000000000..a40802155dc19 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.execution.streaming.StreamExecutionMetadata + +class StreamExecutionSuite extends SparkFunSuite { + + test("stream execution metadata") { + assert(StreamExecutionMetadata(0, 0) === + StreamExecutionMetadata("""{}""")) + assert(StreamExecutionMetadata(1, 0) === + StreamExecutionMetadata("""{"currentEventTimeWatermarkMillis":1}""")) + assert(StreamExecutionMetadata(0, 2) === + StreamExecutionMetadata("""{"currentBatchTimestamp":2}""")) + assert(StreamExecutionMetadata(1, 2) === + StreamExecutionMetadata( + """{"currentEventTimeWatermarkMillis":1,"currentBatchTimestamp":2}""")) + } +} From 0c876c6eaebb8d5f9d6ea07184f80640472cb09c Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Tue, 22 Nov 2016 13:05:40 -0800 Subject: [PATCH 07/21] address feedback from Burak and Ryan --- .../expressions/datetimeExpressions.scala | 15 ++++++++------ .../streaming/IncrementalExecution.scala | 4 ++-- .../execution/streaming/StreamExecution.scala | 20 +++++++++++-------- ...ala => StreamExecutionMetadataSuite.scala} | 6 +++--- 4 files changed, 26 insertions(+), 19 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/streaming/{StreamExecutionSuite.scala => StreamExecutionMetadataSuite.scala} (90%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 3792eb5514e29..bc04fb6f76d86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -25,7 +25,6 @@ import scala.util.Try import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -78,17 +77,21 @@ case class CurrentTimestamp() extends LeafExpression with CodegenFallback { * * There is no code generation since this expression should be replaced with a literal. */ -case class CurrentBatchTimestamp(timestamp: SQLTimestamp) extends LeafExpression - with CodegenFallback with Nondeterministic { - override def nullable: Boolean = false +case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType) + extends LeafExpression with Nondeterministic with CodegenFallback { - override def dataType: DataType = TimestampType + override def nullable: Boolean = false override def prettyName: String = "current_batch_timestamp" override protected def initializeInternal(partitionIndex: Int): Unit = {} - override protected def evalInternal(input: InternalRow): Any = timestamp + override protected def evalInternal(input: InternalRow): Any = timestampMs + + def toLiteral: Literal = dataType match { + case _: TimestampType => Literal(timestampMs * 1000L, TimestampType) + case _: DateType => Literal(DateTimeUtils.millisToDays(timestampMs), DateType) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 6520ed511790b..6ab6fa61dc200 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -58,9 +58,9 @@ class IncrementalExecution( */ override lazy val optimizedPlan: LogicalPlan = { sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions { - case CurrentBatchTimestamp(timestamp) => + case ts @ CurrentBatchTimestamp(timestamp, _) => logInfo(s"Current batch timestamp = $timestamp") - Literal(timestamp) + ts.toLiteral } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index ab7befc5a1cfe..9c4c3a3bddfe6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -34,10 +34,10 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.types.{DateType, TimestampType} import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} /** @@ -48,11 +48,11 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} * 1. currentEventTimeWatermark: The current eventTime watermark, used to * bound the lateness of data that will processed. Time unit: milliseconds * 2. currentBatchTimestamp: The current batch processing timestamp. - * Time unit: microseconds + * Time unit: milliseconds */ case class StreamExecutionMetadata( var currentEventTimeWatermarkMillis: Long = 0, - var currentBatchTimestamp: SQLTimestamp = 0) { + var currentBatchTimestampMillis: Long = 0) { private implicit val formats = StreamExecutionMetadata.formats /** @@ -314,7 +314,7 @@ class StreamExecution( availableOffsets = nextOffsets.toStreamProgress(sources) streamExecutionMetadata = StreamExecutionMetadata(nextOffsets.metadata.getOrElse("{}")) logDebug(s"Found possibly uncommitted offsets $availableOffsets " + - s"at batch timestamp ${streamExecutionMetadata.currentBatchTimestamp}") + s"at batch timestamp ${streamExecutionMetadata.currentBatchTimestampMillis}") offsetLog.get(batchId - 1).foreach { case lastOffsets => @@ -370,8 +370,8 @@ class StreamExecution( } } if (hasNewData) { - // Current batch timestamp in seconds - streamExecutionMetadata.currentBatchTimestamp = triggerClock.getTimeMillis() * 1000L + // Current batch timestamp in milliseconds + streamExecutionMetadata.currentBatchTimestampMillis = triggerClock.getTimeMillis() reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) { assert(offsetLog.add(currentBatchId, availableOffsets.toOffsetSeq(sources, Some(streamExecutionMetadata.json))), @@ -451,8 +451,12 @@ class StreamExecution( val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) - case _: CurrentTimestamp | _: CurrentDate => - CurrentBatchTimestamp(streamExecutionMetadata.currentBatchTimestamp / 1000000L) + case ct: CurrentTimestamp => + CurrentBatchTimestamp(streamExecutionMetadata.currentBatchTimestampMillis, + ct.dataType) + case cd: CurrentDate => + CurrentBatchTimestamp(streamExecutionMetadata.currentBatchTimestampMillis, + cd.dataType) } val executedPlan = reportTimeTaken(OPTIMIZER_LATENCY) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala similarity index 90% rename from sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala index a40802155dc19..432dc7bfaad22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming import org.apache.spark.SparkFunSuite import org.apache.spark.sql.execution.streaming.StreamExecutionMetadata -class StreamExecutionSuite extends SparkFunSuite { +class StreamExecutionMetadataSuite extends SparkFunSuite { test("stream execution metadata") { assert(StreamExecutionMetadata(0, 0) === @@ -28,9 +28,9 @@ class StreamExecutionSuite extends SparkFunSuite { assert(StreamExecutionMetadata(1, 0) === StreamExecutionMetadata("""{"currentEventTimeWatermarkMillis":1}""")) assert(StreamExecutionMetadata(0, 2) === - StreamExecutionMetadata("""{"currentBatchTimestamp":2}""")) + StreamExecutionMetadata("""{"currentBatchTimestampMillis":2}""")) assert(StreamExecutionMetadata(1, 2) === StreamExecutionMetadata( - """{"currentEventTimeWatermarkMillis":1,"currentBatchTimestamp":2}""")) + """{"currentEventTimeWatermarkMillis":1,"currentBatchTimestampMillis":2}""")) } } From afeac9734d3d4422af69c941710b8dbe9de7f386 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Tue, 22 Nov 2016 15:24:59 -0800 Subject: [PATCH 08/21] address feedback from Ryan --- .../sql/catalyst/expressions/datetimeExpressions.scala | 7 +++++-- .../spark/sql/execution/streaming/StreamExecution.scala | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index bc04fb6f76d86..800be54dd4ecb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.{Calendar, Locale, TimeZone} @@ -86,10 +87,12 @@ case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType) override protected def initializeInternal(partitionIndex: Int): Unit = {} - override protected def evalInternal(input: InternalRow): Any = timestampMs + override protected def evalInternal(input: InternalRow): Any = + throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") def toLiteral: Literal = dataType match { - case _: TimestampType => Literal(timestampMs * 1000L, TimestampType) + case _: TimestampType => + Literal(DateTimeUtils.fromJavaTimestamp(new Timestamp(timestampMs)), TimestampType) case _: DateType => Literal(DateTimeUtils.millisToDays(timestampMs), DateType) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 9c4c3a3bddfe6..55bd965f0b590 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -45,9 +45,9 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} * persisted to the offset log via the OffsetSeq metadata field. Current * information contained in this object includes: * - * 1. currentEventTimeWatermark: The current eventTime watermark, used to + * @param currentEventTimeWatermarkMillis: The current eventTime watermark, used to * bound the lateness of data that will processed. Time unit: milliseconds - * 2. currentBatchTimestamp: The current batch processing timestamp. + * @param currentBatchTimestampMillis: The current batch processing timestamp. * Time unit: milliseconds */ case class StreamExecutionMetadata( From ae8012b87cd6d03598df66200242c7a2ee6bea13 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Tue, 22 Nov 2016 15:52:48 -0800 Subject: [PATCH 09/21] address feedback from TD --- .../apache/spark/sql/execution/streaming/StreamExecution.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 55bd965f0b590..947b29b20c591 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.streaming._ -import org.apache.spark.sql.types.{DateType, TimestampType} import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} /** @@ -116,7 +115,7 @@ class StreamExecution( /** The current batchId or -1 if execution has not yet been initialized. */ private var currentBatchId: Long = -1 - /** stream execution metadata */ + /** Stream execution metadata */ private var streamExecutionMetadata = StreamExecutionMetadata() /** All stream sources present in the query plan. */ From f647653bdeb630da6b7e96c4c9ec6f105dea35ab Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Wed, 23 Nov 2016 13:33:03 -0800 Subject: [PATCH 10/21] update --- .../expressions/datetimeExpressions.scala | 7 +- .../execution/streaming/StreamExecution.scala | 76 +++++++++---------- .../execution/streaming/StreamProgress.scala | 4 +- .../streaming/StreamingAggregationSuite.scala | 48 ++++++++++-- .../sql/streaming/StreamingQuerySuite.scala | 4 +- 5 files changed, 90 insertions(+), 49 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 800be54dd4ecb..a48fc62b19ea1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -87,8 +87,11 @@ case class CurrentBatchTimestamp(timestampMs: Long, dataType: DataType) override protected def initializeInternal(partitionIndex: Int): Unit = {} - override protected def evalInternal(input: InternalRow): Any = - throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") + /** + * Need to return literal value in order to support compile time expression evaluation + * e.g., select(current_date()) + */ + override protected def evalInternal(input: InternalRow): Any = toLiteral.value def toLiteral: Literal = dataType match { case _: TimestampType => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 947b29b20c591..1c69141e8ea84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -39,27 +39,6 @@ import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} -/** - * Contains metadata associated with a stream execution. This information is - * persisted to the offset log via the OffsetSeq metadata field. Current - * information contained in this object includes: - * - * @param currentEventTimeWatermarkMillis: The current eventTime watermark, used to - * bound the lateness of data that will processed. Time unit: milliseconds - * @param currentBatchTimestampMillis: The current batch processing timestamp. - * Time unit: milliseconds - */ -case class StreamExecutionMetadata( - var currentEventTimeWatermarkMillis: Long = 0, - var currentBatchTimestampMillis: Long = 0) { - private implicit val formats = StreamExecutionMetadata.formats - - /** - * JSON string representation of this object. - */ - def json: String = Serialization.write(this) -} - /** * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread. * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any @@ -274,7 +253,7 @@ class StreamExecution( this, s"Query $name terminated with exception: ${e.getMessage}", e, - Some(committedOffsets.toOffsetSeq(sources))) + Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json))) logError(s"Query $name terminated with error", e) // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to // handle them @@ -312,8 +291,8 @@ class StreamExecution( currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) streamExecutionMetadata = StreamExecutionMetadata(nextOffsets.metadata.getOrElse("{}")) - logDebug(s"Found possibly uncommitted offsets $availableOffsets " + - s"at batch timestamp ${streamExecutionMetadata.currentBatchTimestampMillis}") + logDebug(s"Found possibly unprocessed offsets $availableOffsets " + + s"at batch timestamp ${streamExecutionMetadata.batchTimestampMs}") offsetLog.get(batchId - 1).foreach { case lastOffsets => @@ -370,10 +349,10 @@ class StreamExecution( } if (hasNewData) { // Current batch timestamp in milliseconds - streamExecutionMetadata.currentBatchTimestampMillis = triggerClock.getTimeMillis() + streamExecutionMetadata.batchTimestampMs = triggerClock.getTimeMillis() reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) { assert(offsetLog.add(currentBatchId, - availableOffsets.toOffsetSeq(sources, Some(streamExecutionMetadata.json))), + availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") @@ -451,10 +430,10 @@ class StreamExecution( val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) case ct: CurrentTimestamp => - CurrentBatchTimestamp(streamExecutionMetadata.currentBatchTimestampMillis, + CurrentBatchTimestamp(streamExecutionMetadata.batchTimestampMs, ct.dataType) case cd: CurrentDate => - CurrentBatchTimestamp(streamExecutionMetadata.currentBatchTimestampMillis, + CurrentBatchTimestamp(streamExecutionMetadata.batchTimestampMs, cd.dataType) } @@ -465,7 +444,7 @@ class StreamExecution( outputMode, checkpointFile("state"), currentBatchId, - streamExecutionMetadata.currentEventTimeWatermarkMillis) + streamExecutionMetadata.batchWatermarkMs) lastExecution.executedPlan // Force the lazy generation of execution plan } @@ -481,9 +460,9 @@ class StreamExecution( logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}") (e.maxEventTime.value / 1000) - e.delay.milliseconds() }.headOption.foreach { newWatermark => - if (newWatermark > streamExecutionMetadata.currentEventTimeWatermarkMillis) { + if (newWatermark > streamExecutionMetadata.batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermark ms") - streamExecutionMetadata.currentEventTimeWatermarkMillis = newWatermark + streamExecutionMetadata.batchWatermarkMs = newWatermark } else { logTrace(s"Event time didn't move: $newWatermark < " + s"$streamExecutionMetadata.currentEventTimeWatermark") @@ -748,7 +727,7 @@ class StreamExecution( }.toArray val sinkStatus = SinkStatus( sink.toString, - committedOffsets.toOffsetSeq(sources).toString) + committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString) currentStatus = StreamingQueryStatus( @@ -769,6 +748,33 @@ class StreamExecution( case object TERMINATED extends State } +object StreamExecution { + private val _nextId = new AtomicLong(0) + + def nextId: Long = _nextId.getAndIncrement() +} + +/** + * Contains metadata associated with a stream execution. This information is + * persisted to the offset log via the OffsetSeq metadata field. Current + * information contained in this object includes: + * + * @param batchWatermarkMs: The current eventTime watermark, used to + * bound the lateness of data that will processed. Time unit: milliseconds + * @param batchTimestampMs: The current batch processing timestamp. + * Time unit: milliseconds + */ +case class StreamExecutionMetadata( + var batchWatermarkMs: Long = 0, + var batchTimestampMs: Long = 0) { + private implicit val formats = StreamExecutionMetadata.formats + + /** + * JSON string representation of this object. + */ + def json: String = Serialization.write(this) +} + object StreamExecutionMetadata { private implicit val formats = Serialization.formats(NoTypeHints) @@ -776,12 +782,6 @@ object StreamExecutionMetadata { Serialization.read[StreamExecutionMetadata](json) } -object StreamExecution { - private val _nextId = new AtomicLong(0) - - def nextId: Long = _nextId.getAndIncrement() -} - /** * A special thread to run the stream query. Some codes require to run in the StreamExecutionThread * and will use `classOf[StreamExecutionThread]` to check. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index 57ca7a578574b..21b8750ca913d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -26,8 +26,8 @@ class StreamProgress( val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset]) extends scala.collection.immutable.Map[Source, Offset] { - def toOffsetSeq(source: Seq[Source], metadata: Option[String] = None): OffsetSeq = { - OffsetSeq(source.map(get), metadata) + def toOffsetSeq(source: Seq[Source], metadata: String): OffsetSeq = { + OffsetSeq(source.map(get), Some(metadata)) } override def toString: String = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 31fdaabbb4ed9..ffc21931bcb40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.InternalOutputModes._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore @@ -237,7 +238,7 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { ) } - test("prune results by time, complete mode") { + test("prune results by current_time, complete mode") { import testImplicits._ import StreamingAggregationSuite._ clock = new StreamManualClock @@ -257,25 +258,62 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { // advance clock to 10 seconds AddData(inputData, 0L, 5L, 5L, 10L), AdvanceManualClock(10 * 1000), - AssertOnQuery { _ => clock.getTimeMillis() === 10 * 1000 }, CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), - // advance clock to 20 seconds + // advance clock to 20 seconds, should retain keys >= 10 AddData(inputData, 15L, 15L, 20L), AdvanceManualClock(10 * 1000), CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), - // advance clock to 30 seconds + // advance clock to 30 seconds, should retain keys >= 20 AddData(inputData, 0L), AdvanceManualClock(10 * 1000), CheckLastBatch((20L, 1)), - // advance clock to 40 seconds + // advance clock to 40 seconds, should retain keys >= 30 AddData(inputData, 25L, 30L, 40L, 45L), AdvanceManualClock(10 * 1000), CheckLastBatch((30L, 1), (40L, 1), (45L, 1)) ) } + + test("prune results by date_time, complete mode") { + import testImplicits._ + import StreamingAggregationSuite._ + clock = new StreamManualClock + + val inputData = MemoryStream[Long] + val aggregated = + inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .as[(Long, Long)] + .where(from_unixtime('value + 10L) >= current_date()) + + testStream(aggregated, Complete)( + StartStream(ProcessingTime("10 days"), triggerClock = clock), + + // advance clock to 10 seconds, should retain all keys + AddData(inputData, 0L, 5L, 5L, 10L), + AdvanceManualClock(DateTimeUtils.daysToMillis(10)), + CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), + + // advance clock to 20 seconds, should retain keys >= 10 + AddData(inputData, 15L, 15L, 20L), + AdvanceManualClock(DateTimeUtils.daysToMillis(10)), + CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), + + // advance clock to 30 seconds, should retain keys >= 20 + AddData(inputData, 0L), + AdvanceManualClock(DateTimeUtils.daysToMillis(10)), + CheckLastBatch((20L, 1)), + + // advance clock to 40 seconds, should retain keys >= 30 + AddData(inputData, 25L, 30L, 40L, 45L), + AdvanceManualClock(DateTimeUtils.daysToMillis(10)), + CheckLastBatch((30L, 1), (40L, 1), (45L, 1)) + ) + } } object StreamingAggregationSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index e2e66d6663e19..8ecb33cf9d266 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -103,8 +103,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000), TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), AssertOnQuery( - q => - q.exception.get.startOffset.get === q.committedOffsets.toOffsetSeq(Seq(inputData)), + q => q.exception.get.startOffset.get.offsets === + q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").offsets, "incorrect start offset on exception") ) } From 92fe93dd785b115185f31e3037a544911d2e522c Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Wed, 23 Nov 2016 15:03:37 -0800 Subject: [PATCH 11/21] add recovery test for batch timestamp --- .../StreamExecutionMetadataSuite.scala | 72 +++++++++++++++++-- 1 file changed, 66 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala index 432dc7bfaad22..cf300a2a5ecb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala @@ -17,20 +17,80 @@ package org.apache.spark.sql.streaming -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.execution.streaming.StreamExecutionMetadata +import java.io.File -class StreamExecutionMetadataSuite extends SparkFunSuite { +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecutionMetadata} +import org.apache.spark.sql.functions._ +import org.apache.spark.util.{SystemClock, Utils} + +class StreamExecutionMetadataSuite extends StreamTest { + + private def newMetadataDir = + Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath test("stream execution metadata") { assert(StreamExecutionMetadata(0, 0) === StreamExecutionMetadata("""{}""")) assert(StreamExecutionMetadata(1, 0) === - StreamExecutionMetadata("""{"currentEventTimeWatermarkMillis":1}""")) + StreamExecutionMetadata("""{"batchWatermarkMs":1}""")) assert(StreamExecutionMetadata(0, 2) === - StreamExecutionMetadata("""{"currentBatchTimestampMillis":2}""")) + StreamExecutionMetadata("""{"batchTimestampMs":2}""")) assert(StreamExecutionMetadata(1, 2) === StreamExecutionMetadata( - """{"currentEventTimeWatermarkMillis":1,"currentBatchTimestampMillis":2}""")) + """{"batchWatermarkMs":1,"batchTimestampMs":2}""")) + } + + test("ensure consistent results across batch executions") { + import testImplicits._ + val clock = new SystemClock() + val ms = new MemoryStream[Long](0, sqlContext) + val df = ms.toDF().toDF("a") + val checkpointLoc = newMetadataDir + val checkpointDir = new File(checkpointLoc, "complete") + checkpointDir.mkdirs() + assert(checkpointDir.exists()) + val tableName = "test" + def startQuery: StreamingQuery = { + df.groupBy("a") + .count() + .where('a >= current_timestamp().cast("long")) + .writeStream + .format("memory") + .queryName(tableName) + .option("checkpointLocation", checkpointLoc) + .outputMode("complete") + .start() + } + // no exception here + val t1 = clock.getTimeMillis() + 5000L + val t2 = clock.getTimeMillis() + 10000L + val q = startQuery + ms.addData(t1, t2) + q.processAllAvailable() + + checkAnswer( + spark.table(tableName), + Seq(Row(t1, 1), Row(t2, 1)) + ) + + q.stop() + Thread.sleep(20000L) // Expire t1 and t2 + assert(t1 < clock.getTimeMillis()) + assert(t2 < clock.getTimeMillis()) + + spark.sql(s"drop table $tableName") + + // verify table is dropped + intercept[AnalysisException](spark.table(tableName).collect()) + val q2 = startQuery + q2.processAllAvailable() + checkAnswer( + spark.table(tableName), + Seq(Row(t1, 1), Row(t2, 1)) + ) + + q2.stop() + } } From baafe4ad9d49d78f8f96eccf07642f456fe21635 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Wed, 23 Nov 2016 16:38:37 -0800 Subject: [PATCH 12/21] add watermark recovery test --- .../execution/streaming/StreamExecution.scala | 3 +- .../spark/sql/streaming/WatermarkSuite.scala | 62 ++++++++++++++----- 2 files changed, 48 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 1c69141e8ea84..21664d7fd0381 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -354,7 +354,8 @@ class StreamExecution( assert(offsetLog.add(currentBatchId, availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") - logInfo(s"Committed offsets for batch $currentBatchId.") + logInfo(s"Committed offsets for batch $currentBatchId. " + + s"Metadata ${streamExecutionMetadata.toString}") // NOTE: The following code is correct because runBatches() processes exactly one // batch at a time. If we add pipeline parallelism (multiple batches in flight at diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala index 3617ec0f564c1..3bc10f7870878 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming import org.scalatest.BeforeAndAfter import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} @@ -96,28 +96,58 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { ) } - ignore("recovery") { - val inputData = MemoryStream[Int] - - val windowedAggregation = inputData.toDF() + test("recovery") { + val ms = new MemoryStream[Int](0, sqlContext) + val df = ms.toDF().toDF("a") + val tableName = "recovery" + def startQuery: StreamingQuery = { + ms.toDF() .withColumn("eventTime", $"value".cast("timestamp")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + .writeStream + .format("memory") + .queryName(tableName) + .outputMode("append") + .start() + } - testStream(windowedAggregation)( - AddData(inputData, 10, 11, 12, 13, 14, 15), - CheckAnswer(), - AddData(inputData, 25), // Advance watermark to 15 seconds - StopStream, - StartStream(), - CheckAnswer(), - AddData(inputData, 25), // Evict items less than previous watermark. - StopStream, - StartStream(), - CheckAnswer((10, 5)) + var q = startQuery + ms.addData(10, 11, 12, 13, 14, 15) + q.processAllAvailable() + + checkAnswer( + spark.table(tableName), Seq() + ) + + // Advance watermark to 15 seconds, + // but do not process batch + ms.addData(25) + q.stop() + + q = startQuery + q.processAllAvailable() + checkAnswer( + spark.table(tableName), Seq() + ) + + // Evict items less than previous watermark + ms.addData(25) + q.processAllAvailable() + checkAnswer( + spark.table(tableName), Seq(Row(10, 5)) + ) + q.stop() + + // Ensure we do not send again + q = startQuery + q.processAllAvailable() + checkAnswer( + spark.table(tableName), Seq() ) + q.stop() } test("dropping old data") { From 1547b63b56199a3b0c7a221dc57b3a88577e3eb0 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Wed, 23 Nov 2016 17:09:05 -0800 Subject: [PATCH 13/21] update --- .../StreamExecutionMetadataSuite.scala | 11 +++++--- .../streaming/StreamingAggregationSuite.scala | 28 +++++++++---------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala index cf300a2a5ecb2..cde1eee364b39 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala @@ -41,7 +41,7 @@ class StreamExecutionMetadataSuite extends StreamTest { """{"batchWatermarkMs":1,"batchTimestampMs":2}""")) } - test("ensure consistent results across batch executions") { + test("metadata is recovered from log when query is restarted") { import testImplicits._ val clock = new SystemClock() val ms = new MemoryStream[Long](0, sqlContext) @@ -51,6 +51,9 @@ class StreamExecutionMetadataSuite extends StreamTest { checkpointDir.mkdirs() assert(checkpointDir.exists()) val tableName = "test" + // Query that prunes timestamps less than current_timestamp, making + // it easy to use for ensuring that a batch is re-processed with the + // timestamp used when it was first proccessed. def startQuery: StreamingQuery = { df.groupBy("a") .count() @@ -63,8 +66,8 @@ class StreamExecutionMetadataSuite extends StreamTest { .start() } // no exception here - val t1 = clock.getTimeMillis() + 5000L - val t2 = clock.getTimeMillis() + 10000L + val t1 = clock.getTimeMillis() + 60L * 1000L + val t2 = clock.getTimeMillis() + 60L * 1000L + 1000L val q = startQuery ms.addData(t1, t2) q.processAllAvailable() @@ -75,7 +78,7 @@ class StreamExecutionMetadataSuite extends StreamTest { ) q.stop() - Thread.sleep(20000L) // Expire t1 and t2 + Thread.sleep(60L * 1000L + 5000L) // Expire t1 and t2 assert(t1 < clock.getTimeMillis()) assert(t2 < clock.getTimeMillis()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index ffc21931bcb40..ff73cb0c2e725 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.streaming +import java.util.TimeZone + import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException @@ -249,7 +251,6 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { inputData.toDF() .groupBy($"value") .agg(count("*")) - .as[(Long, Long)] .where('value >= current_timestamp().cast("long") - 10L) testStream(aggregated, Complete)( @@ -277,37 +278,36 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { ) } - test("prune results by date_time, complete mode") { + test("prune results by current_date, complete mode") { import testImplicits._ import StreamingAggregationSuite._ clock = new StreamManualClock - + val tz = TimeZone.getDefault.getID val inputData = MemoryStream[Long] val aggregated = inputData.toDF() + .select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz)) + .toDF("value") .groupBy($"value") .agg(count("*")) - .as[(Long, Long)] - .where(from_unixtime('value + 10L) >= current_date()) - + // .select('value, date_sub(current_date(), 10).cast("timestamp").alias("t")) + // .select('value, 't, 'value >= 't) + .where($"value" >= date_sub(current_date(), 10).cast("timestamp")) + .select(($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") testStream(aggregated, Complete)( - StartStream(ProcessingTime("10 days"), triggerClock = clock), - - // advance clock to 10 seconds, should retain all keys + StartStream(ProcessingTime("10 day"), triggerClock = clock), + // advance clock to 10 days, should retain all keys AddData(inputData, 0L, 5L, 5L, 10L), AdvanceManualClock(DateTimeUtils.daysToMillis(10)), CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), - - // advance clock to 20 seconds, should retain keys >= 10 + // advance clock to 20 days, should retain keys >= 10 AddData(inputData, 15L, 15L, 20L), AdvanceManualClock(DateTimeUtils.daysToMillis(10)), CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), - - // advance clock to 30 seconds, should retain keys >= 20 + // advance clock to 30 days, should retain keys >= 20 AddData(inputData, 0L), AdvanceManualClock(DateTimeUtils.daysToMillis(10)), CheckLastBatch((20L, 1)), - // advance clock to 40 seconds, should retain keys >= 30 AddData(inputData, 25L, 30L, 40L, 45L), AdvanceManualClock(DateTimeUtils.daysToMillis(10)), From 13d1367fd6ac553d7692298b62823e94553a32f3 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Wed, 23 Nov 2016 17:13:43 -0800 Subject: [PATCH 14/21] added fix from @tdas --- .../sql/streaming/StreamingAggregationSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index ff73cb0c2e725..372dfe08d9feb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -278,6 +278,7 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { ) } + test("prune results by current_date, complete mode") { import testImplicits._ import StreamingAggregationSuite._ @@ -292,25 +293,25 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { .agg(count("*")) // .select('value, date_sub(current_date(), 10).cast("timestamp").alias("t")) // .select('value, 't, 'value >= 't) - .where($"value" >= date_sub(current_date(), 10).cast("timestamp")) + .where($"value".cast("date") >= date_sub(current_date(), 10)) .select(($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") testStream(aggregated, Complete)( StartStream(ProcessingTime("10 day"), triggerClock = clock), // advance clock to 10 days, should retain all keys AddData(inputData, 0L, 5L, 5L, 10L), - AdvanceManualClock(DateTimeUtils.daysToMillis(10)), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), // advance clock to 20 days, should retain keys >= 10 AddData(inputData, 15L, 15L, 20L), - AdvanceManualClock(DateTimeUtils.daysToMillis(10)), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), // advance clock to 30 days, should retain keys >= 20 AddData(inputData, 0L), - AdvanceManualClock(DateTimeUtils.daysToMillis(10)), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), CheckLastBatch((20L, 1)), // advance clock to 40 seconds, should retain keys >= 30 AddData(inputData, 25L, 30L, 40L, 45L), - AdvanceManualClock(DateTimeUtils.daysToMillis(10)), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), CheckLastBatch((30L, 1), (40L, 1), (45L, 1)) ) } From 3b403bd4377388c74fd1ff62282d57200eb094e8 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Wed, 23 Nov 2016 18:57:12 -0800 Subject: [PATCH 15/21] added watermark recovery test extension --- .../sql/execution/streaming/memory.scala | 4 + .../StreamExecutionMetadataSuite.scala | 2 +- .../spark/sql/streaming/WatermarkSuite.scala | 82 ++++++++----------- 3 files changed, 37 insertions(+), 51 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 582b5481220da..adf6963577f49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -206,6 +206,10 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi } } + def clear(): Unit = { + batches.clear() + } + override def toString(): String = "MemorySink" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala index cde1eee364b39..458d664a3735c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala @@ -53,7 +53,7 @@ class StreamExecutionMetadataSuite extends StreamTest { val tableName = "test" // Query that prunes timestamps less than current_timestamp, making // it easy to use for ensuring that a batch is re-processed with the - // timestamp used when it was first proccessed. + // timestamp used when it was first processed. def startQuery: StreamingQuery = { df.groupBy("a") .count() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala index 3bc10f7870878..675650da05935 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala @@ -97,57 +97,39 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { } test("recovery") { - val ms = new MemoryStream[Int](0, sqlContext) - val df = ms.toDF().toDF("a") - val tableName = "recovery" - def startQuery: StreamingQuery = { - ms.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) - .withWatermark("eventTime", "10 seconds") - .groupBy(window($"eventTime", "5 seconds") as 'window) - .agg(count("*") as 'count) - .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - .writeStream - .format("memory") - .queryName(tableName) - .outputMode("append") - .start() - } - - var q = startQuery - ms.addData(10, 11, 12, 13, 14, 15) - q.processAllAvailable() - - checkAnswer( - spark.table(tableName), Seq() - ) - - // Advance watermark to 15 seconds, - // but do not process batch - ms.addData(25) - q.stop() - - q = startQuery - q.processAllAvailable() - checkAnswer( - spark.table(tableName), Seq() - ) - - // Evict items less than previous watermark - ms.addData(25) - q.processAllAvailable() - checkAnswer( - spark.table(tableName), Seq(Row(10, 5)) - ) - q.stop() - - // Ensure we do not send again - q = startQuery - q.processAllAvailable() - checkAnswer( - spark.table(tableName), Seq() + val inputData = MemoryStream[Int] + val df = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + val outputMode = OutputMode.Append + val memorySink = new MemorySink(df.schema, outputMode) + testStream(df)( + AddData(inputData, 10, 11, 12, 13, 14, 15), + CheckAnswer(), + AddData(inputData, 25), // Advance watermark to 15 seconds + StopStream, + StartStream(), + CheckLastBatch(), + AddData(inputData, 25), // Evict items less than previous watermark. + CheckLastBatch((10, 5)), + StopStream, + AssertOnQuery { q => // clear the sink + q.sink.asInstanceOf[MemorySink].clear() + true + }, + StartStream(), + CheckLastBatch((10, 5)), + AddData(inputData, 30), + StopStream, + StartStream(), // Watermark should still be 15 seconds + AddData(inputData, 17), + CheckLastBatch(), // We still do not see next batch + AddData(inputData, 30), // Move watermark to 20 seconds + CheckLastBatch((15, 2)) // Ensure we see next window ) - q.stop() } test("dropping old data") { From 4df56754b45247d73ec2e560d012f6b47a1d4853 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Wed, 23 Nov 2016 19:14:26 -0800 Subject: [PATCH 16/21] update --- .../org/apache/spark/sql/streaming/WatermarkSuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala index 675650da05935..ecd3ebf898ece 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala @@ -122,12 +122,15 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { }, StartStream(), CheckLastBatch((10, 5)), - AddData(inputData, 30), + AddData(inputData, 30), // Advance watermark to 20 seconds + CheckLastBatch(), StopStream, StartStream(), // Watermark should still be 15 seconds AddData(inputData, 17), CheckLastBatch(), // We still do not see next batch - AddData(inputData, 30), // Move watermark to 20 seconds + AddData(inputData, 30), // Advance watermark to 20 seconds + CheckLastBatch(), + AddData(inputData, 30), // Evict items less than previous watermark. CheckLastBatch((15, 2)) // Ensure we see next window ) } From 7374d21cdf22fe21f858b80b21414c2e7694358e Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Mon, 28 Nov 2016 12:07:19 -0800 Subject: [PATCH 17/21] addressing feedback from @tdas --- .../StreamExecutionMetadataSuite.scala | 40 ++++++++++++------- .../streaming/StreamingAggregationSuite.scala | 5 +-- .../spark/sql/streaming/WatermarkSuite.scala | 4 +- 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala index 458d664a3735c..46ea169ff6496 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala @@ -19,7 +19,11 @@ package org.apache.spark.sql.streaming import java.io.File -import org.apache.spark.sql.{AnalysisException, Row} +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.sql.Row import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecutionMetadata} import org.apache.spark.sql.functions._ import org.apache.spark.util.{SystemClock, Utils} @@ -54,7 +58,7 @@ class StreamExecutionMetadataSuite extends StreamTest { // Query that prunes timestamps less than current_timestamp, making // it easy to use for ensuring that a batch is re-processed with the // timestamp used when it was first processed. - def startQuery: StreamingQuery = { + def startQuery(): StreamingQuery = { df.groupBy("a") .count() .where('a >= current_timestamp().cast("long")) @@ -65,10 +69,11 @@ class StreamExecutionMetadataSuite extends StreamTest { .outputMode("complete") .start() } - // no exception here - val t1 = clock.getTimeMillis() + 60L * 1000L - val t2 = clock.getTimeMillis() + 60L * 1000L + 1000L - val q = startQuery + // Create two timestamps that are far enough out into the future + // so that the query can finish processing i.e., within 10 seconds + val t1 = clock.getTimeMillis() + 10000L + val t2 = clock.getTimeMillis() + 11000L + val q = startQuery() ms.addData(t1, t2) q.processAllAvailable() @@ -77,23 +82,28 @@ class StreamExecutionMetadataSuite extends StreamTest { Seq(Row(t1, 1), Row(t2, 1)) ) + // Stop the query and wait for the timestamps to expire + // i.e., timestamp < clock.getTimeMillis() q.stop() - Thread.sleep(60L * 1000L + 5000L) // Expire t1 and t2 - assert(t1 < clock.getTimeMillis()) - assert(t2 < clock.getTimeMillis()) + // Expire t1 and t2 + Eventually.eventually(Timeout(11.seconds)) { + assert(t1 < clock.getTimeMillis()) + assert(t2 < clock.getTimeMillis()) + true + } + // Drop the output, so that it is recreated when we start spark.sql(s"drop table $tableName") - - // verify table is dropped - intercept[AnalysisException](spark.table(tableName).collect()) - val q2 = startQuery + // Verify table is dropped + assert(false == spark.catalog.tableExists(tableName)) + // Restart query and ensure that previous batch timestamp + // is used to derive the same result. + val q2 = startQuery() q2.processAllAvailable() checkAnswer( spark.table(tableName), Seq(Row(t1, 1), Row(t2, 1)) ) - q2.stop() - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 372dfe08d9feb..76b6a96011cca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -278,7 +278,6 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { ) } - test("prune results by current_date, complete mode") { import testImplicits._ import StreamingAggregationSuite._ @@ -291,8 +290,6 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { .toDF("value") .groupBy($"value") .agg(count("*")) - // .select('value, date_sub(current_date(), 10).cast("timestamp").alias("t")) - // .select('value, 't, 'value >= 't) .where($"value".cast("date") >= date_sub(current_date(), 10)) .select(($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") testStream(aggregated, Complete)( @@ -309,7 +306,7 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { AddData(inputData, 0L), AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), CheckLastBatch((20L, 1)), - // advance clock to 40 seconds, should retain keys >= 30 + // advance clock to 40 days, should retain keys >= 30 AddData(inputData, 25L, 30L, 40L, 45L), AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), CheckLastBatch((30L, 1), (40L, 1), (45L, 1)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala index ecd3ebf898ece..5a7dfcdf4e2fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala @@ -108,7 +108,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { val memorySink = new MemorySink(df.schema, outputMode) testStream(df)( AddData(inputData, 10, 11, 12, 13, 14, 15), - CheckAnswer(), + CheckLastBatch(), AddData(inputData, 25), // Advance watermark to 15 seconds StopStream, StartStream(), @@ -121,7 +121,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { true }, StartStream(), - CheckLastBatch((10, 5)), + CheckLastBatch((10, 5)), // Recompute last batch and re-evict timestamp 10 AddData(inputData, 30), // Advance watermark to 20 seconds CheckLastBatch(), StopStream, From 8558abcd22e5fd67c05d14da52885fac526917f7 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Mon, 28 Nov 2016 12:58:46 -0800 Subject: [PATCH 18/21] add recovery tests for current_date and current_timestamp @tdas --- .../streaming/StreamingAggregationSuite.scala | 26 +++++++++++++++++-- .../spark/sql/streaming/WatermarkSuite.scala | 2 -- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 76b6a96011cca..231b44ce583a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -260,10 +260,21 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { AddData(inputData, 0L, 5L, 5L, 10L), AdvanceManualClock(10 * 1000), CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), + AdvanceManualClock(5 * 1000), // advance by 5 seconds i.e., 15 seconds total + + // bounce stream and ensure correct batch timestamp is used + // i.e., we don't take it from the clock, which is at 15 seconds. + StopStream, + AssertOnQuery { q => // clear the sink + q.sink.asInstanceOf[MemorySink].clear() + true + }, + StartStream(ProcessingTime("10 seconds"), triggerClock = clock), + CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), // advance clock to 20 seconds, should retain keys >= 10 AddData(inputData, 15L, 15L, 20L), - AdvanceManualClock(10 * 1000), + AdvanceManualClock(5 * 1000), CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), // advance clock to 30 seconds, should retain keys >= 20 @@ -298,9 +309,20 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { AddData(inputData, 0L, 5L, 5L, 10L), AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), + // advance by 5 days i.e., 15 days total + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 5), + // bounce stream and ensure correct batch timestamp is used + // i.e., we don't take it from the clock, which is at 15 seconds. + StopStream, + AssertOnQuery { q => // clear the sink + q.sink.asInstanceOf[MemorySink].clear() + true + }, + StartStream(ProcessingTime("10 day"), triggerClock = clock), + CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), // advance clock to 20 days, should retain keys >= 10 AddData(inputData, 15L, 15L, 20L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 5), CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), // advance clock to 30 days, should retain keys >= 20 AddData(inputData, 0L), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala index 5a7dfcdf4e2fe..ae57b1d3677ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala @@ -104,8 +104,6 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - val outputMode = OutputMode.Append - val memorySink = new MemorySink(df.schema, outputMode) testStream(df)( AddData(inputData, 10, 11, 12, 13, 14, 15), CheckLastBatch(), From ccb573c3c85059dca5d6dbc571f84f8cbc747a50 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Mon, 28 Nov 2016 14:06:20 -0800 Subject: [PATCH 19/21] revise recovery tests --- .../StreamExecutionMetadataSuite.scala | 74 +------------------ .../streaming/StreamingAggregationSuite.scala | 70 ++++++++++-------- 2 files changed, 40 insertions(+), 104 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala index 46ea169ff6496..9b1c3f52f681a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala @@ -17,16 +17,8 @@ package org.apache.spark.sql.streaming -import java.io.File - -import org.scalatest.concurrent.Eventually -import org.scalatest.concurrent.PatienceConfiguration.Timeout -import org.scalatest.time.SpanSugar._ - -import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecutionMetadata} -import org.apache.spark.sql.functions._ -import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.sql.execution.streaming.StreamExecutionMetadata +import org.apache.spark.util.Utils class StreamExecutionMetadataSuite extends StreamTest { @@ -44,66 +36,4 @@ class StreamExecutionMetadataSuite extends StreamTest { StreamExecutionMetadata( """{"batchWatermarkMs":1,"batchTimestampMs":2}""")) } - - test("metadata is recovered from log when query is restarted") { - import testImplicits._ - val clock = new SystemClock() - val ms = new MemoryStream[Long](0, sqlContext) - val df = ms.toDF().toDF("a") - val checkpointLoc = newMetadataDir - val checkpointDir = new File(checkpointLoc, "complete") - checkpointDir.mkdirs() - assert(checkpointDir.exists()) - val tableName = "test" - // Query that prunes timestamps less than current_timestamp, making - // it easy to use for ensuring that a batch is re-processed with the - // timestamp used when it was first processed. - def startQuery(): StreamingQuery = { - df.groupBy("a") - .count() - .where('a >= current_timestamp().cast("long")) - .writeStream - .format("memory") - .queryName(tableName) - .option("checkpointLocation", checkpointLoc) - .outputMode("complete") - .start() - } - // Create two timestamps that are far enough out into the future - // so that the query can finish processing i.e., within 10 seconds - val t1 = clock.getTimeMillis() + 10000L - val t2 = clock.getTimeMillis() + 11000L - val q = startQuery() - ms.addData(t1, t2) - q.processAllAvailable() - - checkAnswer( - spark.table(tableName), - Seq(Row(t1, 1), Row(t2, 1)) - ) - - // Stop the query and wait for the timestamps to expire - // i.e., timestamp < clock.getTimeMillis() - q.stop() - // Expire t1 and t2 - Eventually.eventually(Timeout(11.seconds)) { - assert(t1 < clock.getTimeMillis()) - assert(t2 < clock.getTimeMillis()) - true - } - - // Drop the output, so that it is recreated when we start - spark.sql(s"drop table $tableName") - // Verify table is dropped - assert(false == spark.catalog.tableExists(tableName)) - // Restart query and ensure that previous batch timestamp - // is used to derive the same result. - val q2 = startQuery() - q2.processAllAvailable() - checkAnswer( - spark.table(tableName), - Seq(Row(t1, 1), Row(t2, 1)) - ) - q2.stop() - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 231b44ce583a9..feff8c2b547a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -256,36 +256,40 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { testStream(aggregated, Complete)( StartStream(ProcessingTime("10 seconds"), triggerClock = clock), - // advance clock to 10 seconds + // advance clock to 10 seconds, all keys retained AddData(inputData, 0L, 5L, 5L, 10L), AdvanceManualClock(10 * 1000), CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), - AdvanceManualClock(5 * 1000), // advance by 5 seconds i.e., 15 seconds total + + // advance clock to 20 seconds, should retain keys >= 10 + AddData(inputData, 15L, 15L, 20L), + AdvanceManualClock(10 * 1000), + CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), + + // advance clock to 30 seconds, should retain keys >= 20 + AddData(inputData, 0L, 85L), + AdvanceManualClock(10 * 1000), + CheckLastBatch((20L, 1), (85L, 1)), // bounce stream and ensure correct batch timestamp is used - // i.e., we don't take it from the clock, which is at 15 seconds. + // i.e., we don't take it from the clock, which is at 90 seconds. StopStream, AssertOnQuery { q => // clear the sink q.sink.asInstanceOf[MemorySink].clear() + // advance by a minute i.e., 90 seconds total + clock.advance(60 * 1000L) true }, StartStream(ProcessingTime("10 seconds"), triggerClock = clock), - CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), - - // advance clock to 20 seconds, should retain keys >= 10 - AddData(inputData, 15L, 15L, 20L), - AdvanceManualClock(5 * 1000), - CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), - - // advance clock to 30 seconds, should retain keys >= 20 - AddData(inputData, 0L), - AdvanceManualClock(10 * 1000), - CheckLastBatch((20L, 1)), + CheckLastBatch((20L, 1), (85L, 1)), + AssertOnQuery { q => + clock.getTimeMillis() == 90000L + }, - // advance clock to 40 seconds, should retain keys >= 30 - AddData(inputData, 25L, 30L, 40L, 45L), + // advance clock to 100 seconds, should retain keys >= 90 + AddData(inputData, 85L, 90L, 100L, 105L), AdvanceManualClock(10 * 1000), - CheckLastBatch((30L, 1), (40L, 1), (45L, 1)) + CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) ) } @@ -309,29 +313,31 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { AddData(inputData, 0L, 5L, 5L, 10L), AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), - // advance by 5 days i.e., 15 days total - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 5), + // advance clock to 20 days, should retain keys >= 10 + AddData(inputData, 15L, 15L, 20L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), + // advance clock to 30 days, should retain keys >= 20 + AddData(inputData, 85L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((20L, 1), (85L, 1)), + // bounce stream and ensure correct batch timestamp is used - // i.e., we don't take it from the clock, which is at 15 seconds. + // i.e., we don't take it from the clock, which is at 90 days. StopStream, AssertOnQuery { q => // clear the sink q.sink.asInstanceOf[MemorySink].clear() + // advance by 60 days i.e., 90 days total + clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) true }, StartStream(ProcessingTime("10 day"), triggerClock = clock), - CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), - // advance clock to 20 days, should retain keys >= 10 - AddData(inputData, 15L, 15L, 20L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 5), - CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), - // advance clock to 30 days, should retain keys >= 20 - AddData(inputData, 0L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((20L, 1)), - // advance clock to 40 days, should retain keys >= 30 - AddData(inputData, 25L, 30L, 40L, 45L), + CheckLastBatch((20L, 1), (85L, 1)), + + // advance clock to 100 days, should retain keys >= 90 + AddData(inputData, 85L, 90L, 100L, 105L), AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((30L, 1), (40L, 1), (45L, 1)) + CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) ) } } From ff2538dae7d51e3ba69c56b733ac1919218f9858 Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Mon, 28 Nov 2016 15:08:26 -0800 Subject: [PATCH 20/21] address comments @zsxwing --- .../streaming/StreamExecutionMetadataSuite.scala | 4 ---- .../sql/streaming/StreamingAggregationSuite.scala | 14 ++------------ 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala index 9b1c3f52f681a..c7139c588d1d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala @@ -18,13 +18,9 @@ package org.apache.spark.sql.streaming import org.apache.spark.sql.execution.streaming.StreamExecutionMetadata -import org.apache.spark.util.Utils class StreamExecutionMetadataSuite extends StreamTest { - private def newMetadataDir = - Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath - test("stream execution metadata") { assert(StreamExecutionMetadata(0, 0) === StreamExecutionMetadata("""{}""")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index feff8c2b547a0..fbe560e8d9181 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ -import org.apache.spark.util.ManualClock object FailureSinglton { var firstTime = true @@ -242,11 +241,8 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { test("prune results by current_time, complete mode") { import testImplicits._ - import StreamingAggregationSuite._ - clock = new StreamManualClock - + val clock = new StreamManualClock val inputData = MemoryStream[Long] - val aggregated = inputData.toDF() .groupBy($"value") @@ -295,8 +291,7 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { test("prune results by current_date, complete mode") { import testImplicits._ - import StreamingAggregationSuite._ - clock = new StreamManualClock + val clock = new StreamManualClock val tz = TimeZone.getDefault.getID val inputData = MemoryStream[Long] val aggregated = @@ -341,8 +336,3 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { ) } } - -object StreamingAggregationSuite { - // Singleton reference to clock that does not get serialized in task closures - @volatile var clock: ManualClock = null -} From 0ea579695050ae9c1dd7f68cc8df2dc7fbfc833e Mon Sep 17 00:00:00 2001 From: Tyson Condie Date: Mon, 28 Nov 2016 16:31:55 -0800 Subject: [PATCH 21/21] update --- .../scala/org/apache/spark/sql/streaming/WatermarkSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala index ae57b1d3677ce..3e9488c7dc9af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala @@ -104,6 +104,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + testStream(df)( AddData(inputData, 10, 11, 12, 13, 14, 15), CheckLastBatch(),