Skip to content

Commit

Permalink
Increased timeouts to reduce flakiness
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Mar 10, 2016
1 parent 4896411 commit 5ab7fee
Showing 1 changed file with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
// awaitAnyTermination should be blocking or non-blocking depending on timeout values
testAwaitAnyTermination(
ExpectBlocked,
awaitTimeout = 2 seconds,
awaitTimeout = 4 seconds,
expectedReturnedValue = false,
testBehaviorFor = 1 second)
testBehaviorFor = 2 seconds)

testAwaitAnyTermination(
ExpectNotBlocked,
Expand All @@ -162,20 +162,20 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false)
testAwaitAnyTermination(
ExpectNotBlocked,
awaitTimeout = 1 second,
awaitTimeout = 2 seconds,
expectedReturnedValue = true,
testBehaviorFor = 2 seconds)
testBehaviorFor = 4 seconds)
require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned

// All subsequent calls to awaitAnyTermination should be non-blocking even if timeout is high
testAwaitAnyTermination(
ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true)
ExpectNotBlocked, awaitTimeout = 4 seconds, expectedReturnedValue = true)

// Resetting termination should make awaitAnyTermination() blocking again
sqlContext.streams.resetTerminated()
testAwaitAnyTermination(
ExpectBlocked,
awaitTimeout = 2 seconds,
awaitTimeout = 4 seconds,
expectedReturnedValue = false,
testBehaviorFor = 1 second)

Expand All @@ -184,31 +184,31 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
testAwaitAnyTermination(
ExpectException[SparkException],
awaitTimeout = 1 second,
awaitTimeout = 1 seconds,
testBehaviorFor = 2 seconds)
require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned

// All subsequent calls to awaitAnyTermination should throw the exception
testAwaitAnyTermination(
ExpectException[SparkException],
awaitTimeout = 1 second,
testBehaviorFor = 2 seconds)
awaitTimeout = 2 seconds,
testBehaviorFor = 4 seconds)

// Terminate a query asynchronously outside the timeout, awaitAnyTerm should be blocked
sqlContext.streams.resetTerminated()
val q3 = stopRandomQueryAsync(1 second, withError = true)
val q3 = stopRandomQueryAsync(2 seconds, withError = true)
testAwaitAnyTermination(
ExpectNotBlocked,
awaitTimeout = 100 milliseconds,
expectedReturnedValue = false,
testBehaviorFor = 2 seconds)
testBehaviorFor = 4 seconds)

// After that query is stopped, awaitAnyTerm should throw exception
eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait for query to stop
testAwaitAnyTermination(
ExpectException[SparkException],
awaitTimeout = 100 milliseconds,
testBehaviorFor = 2 seconds)
testBehaviorFor = 4 seconds)


// Terminate multiple queries, one with failure and see whether awaitAnyTermination throws
Expand All @@ -217,12 +217,12 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with

val q4 = stopRandomQueryAsync(10 milliseconds, withError = false)
testAwaitAnyTermination(
ExpectNotBlocked, awaitTimeout = 1 second, expectedReturnedValue = true)
ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true)
require(!q4.isActive)
val q5 = stopRandomQueryAsync(10 milliseconds, withError = true)
eventually(Timeout(streamingTimeout)) { require(!q5.isActive) }
// After q5 terminates with exception, awaitAnyTerm should start throwing exception
testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 100 milliseconds)
testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2 seconds)
}
}

Expand Down Expand Up @@ -260,7 +260,7 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
expectedBehavior: ExpectedBehavior,
expectedReturnedValue: Boolean = false,
awaitTimeout: Span = null,
testBehaviorFor: Span = 2 seconds
testBehaviorFor: Span = 4 seconds
): Unit = {

def awaitTermFunc(): Unit = {
Expand Down

0 comments on commit 5ab7fee

Please sign in to comment.