Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SQL][TEST] Increased timeouts to reduce flakiness in ContinuousQueryManagerSuite #11638

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
// awaitAnyTermination should be blocking or non-blocking depending on timeout values
testAwaitAnyTermination(
ExpectBlocked,
awaitTimeout = 2 seconds,
awaitTimeout = 4 seconds,
expectedReturnedValue = false,
testBehaviorFor = 1 second)
testBehaviorFor = 2 seconds)

testAwaitAnyTermination(
ExpectNotBlocked,
Expand All @@ -162,20 +162,20 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false)
testAwaitAnyTermination(
ExpectNotBlocked,
awaitTimeout = 1 second,
awaitTimeout = 2 seconds,
expectedReturnedValue = true,
testBehaviorFor = 2 seconds)
testBehaviorFor = 4 seconds)
require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned

// All subsequent calls to awaitAnyTermination should be non-blocking even if timeout is high
testAwaitAnyTermination(
ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true)
ExpectNotBlocked, awaitTimeout = 4 seconds, expectedReturnedValue = true)

// Resetting termination should make awaitAnyTermination() blocking again
sqlContext.streams.resetTerminated()
testAwaitAnyTermination(
ExpectBlocked,
awaitTimeout = 2 seconds,
awaitTimeout = 4 seconds,
expectedReturnedValue = false,
testBehaviorFor = 1 second)

Expand All @@ -184,31 +184,31 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
testAwaitAnyTermination(
ExpectException[SparkException],
awaitTimeout = 1 second,
awaitTimeout = 1 seconds,
testBehaviorFor = 2 seconds)
require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned

// All subsequent calls to awaitAnyTermination should throw the exception
testAwaitAnyTermination(
ExpectException[SparkException],
awaitTimeout = 1 second,
testBehaviorFor = 2 seconds)
awaitTimeout = 2 seconds,
testBehaviorFor = 4 seconds)

// Terminate a query asynchronously outside the timeout, awaitAnyTerm should be blocked
sqlContext.streams.resetTerminated()
val q3 = stopRandomQueryAsync(1 second, withError = true)
val q3 = stopRandomQueryAsync(2 seconds, withError = true)
testAwaitAnyTermination(
ExpectNotBlocked,
awaitTimeout = 100 milliseconds,
expectedReturnedValue = false,
testBehaviorFor = 2 seconds)
testBehaviorFor = 4 seconds)

// After that query is stopped, awaitAnyTerm should throw exception
eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait for query to stop
testAwaitAnyTermination(
ExpectException[SparkException],
awaitTimeout = 100 milliseconds,
testBehaviorFor = 2 seconds)
testBehaviorFor = 4 seconds)


// Terminate multiple queries, one with failure and see whether awaitAnyTermination throws
Expand All @@ -217,12 +217,12 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with

val q4 = stopRandomQueryAsync(10 milliseconds, withError = false)
testAwaitAnyTermination(
ExpectNotBlocked, awaitTimeout = 1 second, expectedReturnedValue = true)
ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true)
require(!q4.isActive)
val q5 = stopRandomQueryAsync(10 milliseconds, withError = true)
eventually(Timeout(streamingTimeout)) { require(!q5.isActive) }
// After q5 terminates with exception, awaitAnyTerm should start throwing exception
testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 100 milliseconds)
testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2 seconds)
}
}

Expand Down Expand Up @@ -260,7 +260,7 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
expectedBehavior: ExpectedBehavior,
expectedReturnedValue: Boolean = false,
awaitTimeout: Span = null,
testBehaviorFor: Span = 2 seconds
testBehaviorFor: Span = 4 seconds
): Unit = {

def awaitTermFunc(): Unit = {
Expand Down