From 5ab7fee7cc1eb5314e814fc1d23f218ea5e4555a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 10 Mar 2016 13:03:17 -0800 Subject: [PATCH] Increased timeouts to reduce flakiness --- .../ContinuousQueryManagerSuite.scala | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index 35bb9fdbfdd16..45e824ad6353e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -148,9 +148,9 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with // awaitAnyTermination should be blocking or non-blocking depending on timeout values testAwaitAnyTermination( ExpectBlocked, - awaitTimeout = 2 seconds, + awaitTimeout = 4 seconds, expectedReturnedValue = false, - testBehaviorFor = 1 second) + testBehaviorFor = 2 seconds) testAwaitAnyTermination( ExpectNotBlocked, @@ -162,20 +162,20 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false) testAwaitAnyTermination( ExpectNotBlocked, - awaitTimeout = 1 second, + awaitTimeout = 2 seconds, expectedReturnedValue = true, - testBehaviorFor = 2 seconds) + testBehaviorFor = 4 seconds) require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned // All subsequent calls to awaitAnyTermination should be non-blocking even if timeout is high testAwaitAnyTermination( - ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true) + ExpectNotBlocked, awaitTimeout = 4 seconds, expectedReturnedValue = true) // Resetting termination should make awaitAnyTermination() blocking again sqlContext.streams.resetTerminated() testAwaitAnyTermination( ExpectBlocked, - awaitTimeout = 2 seconds, + awaitTimeout = 4 seconds, expectedReturnedValue = false, testBehaviorFor = 1 second) @@ -184,31 +184,31 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with val q2 = stopRandomQueryAsync(100 milliseconds, withError = true) testAwaitAnyTermination( ExpectException[SparkException], - awaitTimeout = 1 second, + awaitTimeout = 1 seconds, testBehaviorFor = 2 seconds) require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned // All subsequent calls to awaitAnyTermination should throw the exception testAwaitAnyTermination( ExpectException[SparkException], - awaitTimeout = 1 second, - testBehaviorFor = 2 seconds) + awaitTimeout = 2 seconds, + testBehaviorFor = 4 seconds) // Terminate a query asynchronously outside the timeout, awaitAnyTerm should be blocked sqlContext.streams.resetTerminated() - val q3 = stopRandomQueryAsync(1 second, withError = true) + val q3 = stopRandomQueryAsync(2 seconds, withError = true) testAwaitAnyTermination( ExpectNotBlocked, awaitTimeout = 100 milliseconds, expectedReturnedValue = false, - testBehaviorFor = 2 seconds) + testBehaviorFor = 4 seconds) // After that query is stopped, awaitAnyTerm should throw exception eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait for query to stop testAwaitAnyTermination( ExpectException[SparkException], awaitTimeout = 100 milliseconds, - testBehaviorFor = 2 seconds) + testBehaviorFor = 4 seconds) // Terminate multiple queries, one with failure and see whether awaitAnyTermination throws @@ -217,12 +217,12 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with val q4 = stopRandomQueryAsync(10 milliseconds, withError = false) testAwaitAnyTermination( - ExpectNotBlocked, awaitTimeout = 1 second, expectedReturnedValue = true) + ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true) require(!q4.isActive) val q5 = stopRandomQueryAsync(10 milliseconds, withError = true) eventually(Timeout(streamingTimeout)) { require(!q5.isActive) } // After q5 terminates with exception, awaitAnyTerm should start throwing exception - testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 100 milliseconds) + testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2 seconds) } } @@ -260,7 +260,7 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with expectedBehavior: ExpectedBehavior, expectedReturnedValue: Boolean = false, awaitTimeout: Span = null, - testBehaviorFor: Span = 2 seconds + testBehaviorFor: Span = 4 seconds ): Unit = { def awaitTermFunc(): Unit = {