Skip to content

Commit

Permalink
current timestamp test
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-torres committed Dec 15, 2017
1 parent 1416882 commit 25a23d1
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, MonotonicallyIncreasingID}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, CurrentDate, CurrentTimestamp, MonotonicallyIncreasingID}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -342,12 +342,23 @@ object UnsupportedOperationChecker {
def checkForContinuous(plan: LogicalPlan, outputMode: OutputMode): Unit = {
checkForStreaming(plan, outputMode)

plan.foreachUp {
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
_: DeserializeToObject | _: SerializeFromObject) =>
case node if node.nodeName == "StreamingRelationV2" =>
case node =>
throwError(s"Continuous processing does not support ${node.nodeName} operations.")(node)
plan.foreachUp { implicit subPlan =>
subPlan match {
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
_: DeserializeToObject | _: SerializeFromObject) =>
case node if node.nodeName == "StreamingRelationV2" =>
case node =>
throwError(s"Continuous processing does not support ${node.nodeName} operations.")
}

subPlan.expressions.foreach { e =>
if (e.collectLeaves().exists {
case (_: CurrentTimestamp | _: CurrentDate) => true
case _ => false
}) {
throwError(s"Continuous processing does not support current time operations.")
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,15 @@ import org.apache.spark.sql.test.TestSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

class ContinuousSuite extends StreamTest {
import testImplicits._

class ContinuousSuiteBase extends StreamTest {
// We need more than the default local[2] to be able to schedule all partitions simultaneously.
override protected def createSparkSession = new TestSparkSession(
new SparkContext(
"local[10]",
"continuous-stream-test-sql-context",
sparkConf.set("spark.sql.testkey", "true")))

private def waitForRateSourceTriggers(query: StreamExecution, numTriggers: Int): Unit = {
protected def waitForRateSourceTriggers(query: StreamExecution, numTriggers: Int): Unit = {
query match {
case s: ContinuousExecution =>
assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized")
Expand All @@ -74,7 +72,11 @@ class ContinuousSuite extends StreamTest {

// A continuous trigger that will only fire the initial time for the duration of a test.
// This allows clean testing with manual epoch advancement.
private val longContinuousTrigger = Trigger.Continuous("1 hour")
protected val longContinuousTrigger = Trigger.Continuous("1 hour")
}

class ContinuousSuite extends ContinuousSuiteBase {
import testImplicits._

test("basic rate source") {
val df = spark.readStream
Expand Down Expand Up @@ -167,6 +169,22 @@ class ContinuousSuite extends StreamTest {
"Continuous processing does not support Deduplicate operations."))
}

test("timestamp") {
val df = spark.readStream
.format("rate")
.option("numPartitions", "5")
.option("rowsPerSecond", "5")
.load()
.select(current_timestamp())

val except = intercept[AnalysisException] {
testStream(df, useV2Sink = true)(StartStream(longContinuousTrigger))
}

assert(except.message.contains(
"Continuous processing does not support current time operations."))
}

test("repeatedly restart") {
val df = spark.readStream
.format("rate")
Expand Down Expand Up @@ -217,36 +235,9 @@ class ContinuousSuite extends StreamTest {
}
}

class ContinuousStressSuite extends StreamTest {

class ContinuousStressSuite extends ContinuousSuiteBase {
import testImplicits._

// We need more than the default local[2] to be able to schedule all partitions simultaneously.
override protected def createSparkSession = new TestSparkSession(
new SparkContext(
"local[10]",
"continuous-stream-test-sql-context",
sparkConf.set("spark.sql.testkey", "true")))

private def waitForRateSourceTriggers(query: StreamExecution, numTriggers: Int): Unit = {
query match {
case s: ContinuousExecution =>
assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized")
val reader = s.lastExecution.executedPlan.collectFirst {
case DataSourceV2ScanExec(_, r: ContinuousRateStreamReader) => r
}.get

val deltaMs = (numTriggers - 1) * 1000 + 300
while (System.currentTimeMillis < reader.creationTime + deltaMs) {
Thread.sleep(reader.creationTime + deltaMs - System.currentTimeMillis)
}
}
}

// A continuous trigger that will only fire the initial time for the duration of a test.
// This allows clean testing with manual epoch advancement.
private val longContinuousTrigger = Trigger.Continuous("1 hour")

test("only one epoch") {
val df = spark.readStream
.format("rate")
Expand Down

0 comments on commit 25a23d1

Please sign in to comment.