From cedaafdff23ad99e4be06077c5b5cc3bee6ebf07 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 14 Dec 2016 16:18:27 -0800 Subject: [PATCH 1/8] test possible fix --- .../StreamingQueryListenerSuite.scala | 147 ++++++++++-------- 1 file changed, 79 insertions(+), 68 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 7c6745ac8285a..ef49cbee67842 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -52,79 +52,89 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { spark.sparkContext.listenerBus.waitUntilEmpty(10000) } - testQuietly("single listener, check trigger events are generated correctly") { - val clock = new StreamManualClock - val inputData = new MemoryStream[Int](0, sqlContext) - val df = inputData.toDS().as[Long].map { 10 / _ } - val listener = new EventCollector - try { - // No events until started - spark.streams.addListener(listener) - assert(listener.startEvent === null) - assert(listener.progressEvents.isEmpty) - assert(listener.terminationEvent === null) - - testStream(df, OutputMode.Append)( - - // Start event generated when query started - StartStream(ProcessingTime(100), triggerClock = clock), - AssertOnQuery { query => - assert(listener.startEvent !== null) - assert(listener.startEvent.id === query.id) - assert(listener.startEvent.runId === query.runId) - assert(listener.startEvent.name === query.name) - assert(listener.progressEvents.isEmpty) - assert(listener.terminationEvent === null) - true - }, - - // Progress event generated when data processed - AddData(inputData, 1, 2), - AdvanceManualClock(100), - CheckAnswer(10, 5), - AssertOnQuery { query => - assert(listener.progressEvents.nonEmpty) - assert(listener.progressEvents.last.json === query.lastProgress.json) - assert(listener.terminationEvent === null) - true - }, - - // Termination event generated when stopped cleanly - StopStream, - AssertOnQuery { query => - eventually(Timeout(streamingTimeout)) { + for (i <- 0 to 10000) { + testQuietly(s"single listener, check trigger events are generated correctly- $i") { + val clock = new StreamManualClock + val inputData = new MemoryStream[Int](0, sqlContext) + val df = inputData.toDS().as[Long].map { + 10 / _ + } + val listener = new EventCollector + try { + // No events until started + spark.streams.addListener(listener) + assert(listener.startEvent === null) + assert(listener.progressEvents.isEmpty) + assert(listener.terminationEvent === null) + + testStream(df, OutputMode.Append)( + + // Start event generated when query started + StartStream(ProcessingTime(100), triggerClock = clock), + AssertOnQuery { query => + assert(listener.startEvent !== null) + assert(listener.startEvent.id === query.id) + assert(listener.startEvent.runId === query.runId) + assert(listener.startEvent.name === query.name) + assert(listener.progressEvents.isEmpty) + assert(listener.terminationEvent === null) + true + }, + + // Progress event generated when data processed + AddData(inputData, 1, 2), + AdvanceManualClock(100), + CheckAnswer(10, 5), + AssertOnQuery { query => + assert(listener.progressEvents.nonEmpty) + // SPARK-18868: We can't use query.lastProgress, because in progressEvents, we filter + // out non-zero input rows, but the lastProgress may be a zero input row trigger + val lastNonZeroProgress = query.recentProgress.filter(_.numInputRows > 0).lastOption + .getOrElse(fail("No progress updates received in StreamingQuery!")) + assert(listener.progressEvents.last.json === lastNonZeroProgress.json) + assert(listener.terminationEvent === null) + true + }, + + // Termination event generated when stopped cleanly + StopStream, + AssertOnQuery { query => + eventually(Timeout(streamingTimeout)) { + assert(listener.terminationEvent !== null) + assert(listener.terminationEvent.id === query.id) + assert(listener.terminationEvent.runId === query.runId) + assert(listener.terminationEvent.exception === None) + } + listener.checkAsyncErrors() + listener.reset() + true + }, + + // Termination event generated with exception message when stopped with error + StartStream(ProcessingTime(100), triggerClock = clock), + AddData(inputData, 0), + AdvanceManualClock(100), + ExpectFailure[SparkException], + AssertOnQuery { query => assert(listener.terminationEvent !== null) assert(listener.terminationEvent.id === query.id) - assert(listener.terminationEvent.runId === query.runId) - assert(listener.terminationEvent.exception === None) + assert(listener.terminationEvent.exception.nonEmpty) + // Make sure that the exception message reported through listener + // contains the actual exception and relevant stack trace + assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException")) + assert( + listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException")) + assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite")) + listener.checkAsyncErrors() + true } - listener.checkAsyncErrors() - listener.reset() - true - }, - - // Termination event generated with exception message when stopped with error - StartStream(ProcessingTime(100), triggerClock = clock), - AddData(inputData, 0), - AdvanceManualClock(100), - ExpectFailure[SparkException], - AssertOnQuery { query => - assert(listener.terminationEvent !== null) - assert(listener.terminationEvent.id === query.id) - assert(listener.terminationEvent.exception.nonEmpty) - // Make sure that the exception message reported through listener - // contains the actual exception and relevant stack trace - assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException")) - assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException")) - assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite")) - listener.checkAsyncErrors() - true - } - ) - } finally { - spark.streams.removeListener(listener) + ) + } finally { + spark.streams.removeListener(listener) + } } } + /* test("adding and removing listener") { def isListenerActive(listener: EventCollector): Boolean = { @@ -330,6 +340,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // to verify that we can skip broken jsons generated by Structured Streaming. testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt") } + */ private def testReplayListenerBusWithBorkenEventJsons(fileName: String): Unit = { val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName") From 32d1ca8a7ed3c2e548b6c7fdceeddaa754c0b477 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 15 Dec 2016 11:48:35 -0800 Subject: [PATCH 2/8] hack tests --- dev/run-tests.py | 2 ++ .../spark/sql/streaming/StreamingQueryListenerSuite.scala | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/dev/run-tests.py b/dev/run-tests.py index ab285ac96af7e..9435df3d5b0f3 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -395,6 +395,8 @@ def run_scala_tests_sbt(test_modules, test_profiles): if not sbt_test_goals: return + sbt_test_goals = ['sql/testOnly *StreamingQueryListenerSuite'] + profiles_and_goals = test_profiles + sbt_test_goals print("[info] Running Spark tests using SBT with these arguments: ", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index ef49cbee67842..efbd937b9f954 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -52,7 +52,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { spark.sparkContext.listenerBus.waitUntilEmpty(10000) } - for (i <- 0 to 10000) { + for (i <- 0 to 1000) { testQuietly(s"single listener, check trigger events are generated correctly- $i") { val clock = new StreamManualClock val inputData = new MemoryStream[Int](0, sqlContext) From d4bbb459ffd48edddfff2967fcc67aefe0b35ce0 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 15 Dec 2016 11:52:47 -0800 Subject: [PATCH 3/8] fix second flakiness --- .../StreamingQueryListenerSuite.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index efbd937b9f954..96ea619737649 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -116,15 +116,18 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { AdvanceManualClock(100), ExpectFailure[SparkException], AssertOnQuery { query => - assert(listener.terminationEvent !== null) - assert(listener.terminationEvent.id === query.id) - assert(listener.terminationEvent.exception.nonEmpty) - // Make sure that the exception message reported through listener - // contains the actual exception and relevant stack trace - assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException")) - assert( - listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException")) - assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite")) + eventually(Timeout(streamingTimeout)) { + assert(listener.terminationEvent !== null) + assert(listener.terminationEvent.id === query.id) + assert(listener.terminationEvent.exception.nonEmpty) + // Make sure that the exception message reported through listener + // contains the actual exception and relevant stack trace + assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException")) + assert( + listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException")) + assert( + listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite")) + } listener.checkAsyncErrors() true } From 50d1e442044df24401f5644d133a643434e374da Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 15 Dec 2016 12:33:00 -0800 Subject: [PATCH 4/8] try 10,000 times --- .../spark/sql/streaming/StreamingQueryListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 96ea619737649..5b91f20680271 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -52,7 +52,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { spark.sparkContext.listenerBus.waitUntilEmpty(10000) } - for (i <- 0 to 1000) { + for (i <- 0 to 10000) { testQuietly(s"single listener, check trigger events are generated correctly- $i") { val clock = new StreamManualClock val inputData = new MemoryStream[Int](0, sqlContext) From d8ca90d85acecc9dc55ccc819770a5783feee3e2 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 15 Dec 2016 13:19:13 -0800 Subject: [PATCH 5/8] passed 1,000 times --- .../StreamingQueryListenerSuite.scala | 163 +++++++++--------- 1 file changed, 79 insertions(+), 84 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 5b91f20680271..4f19cbd0b5aef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -52,92 +52,88 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { spark.sparkContext.listenerBus.waitUntilEmpty(10000) } - for (i <- 0 to 10000) { - testQuietly(s"single listener, check trigger events are generated correctly- $i") { - val clock = new StreamManualClock - val inputData = new MemoryStream[Int](0, sqlContext) - val df = inputData.toDS().as[Long].map { - 10 / _ - } - val listener = new EventCollector - try { - // No events until started - spark.streams.addListener(listener) - assert(listener.startEvent === null) - assert(listener.progressEvents.isEmpty) - assert(listener.terminationEvent === null) - - testStream(df, OutputMode.Append)( - - // Start event generated when query started - StartStream(ProcessingTime(100), triggerClock = clock), - AssertOnQuery { query => - assert(listener.startEvent !== null) - assert(listener.startEvent.id === query.id) - assert(listener.startEvent.runId === query.runId) - assert(listener.startEvent.name === query.name) - assert(listener.progressEvents.isEmpty) - assert(listener.terminationEvent === null) - true - }, - - // Progress event generated when data processed - AddData(inputData, 1, 2), - AdvanceManualClock(100), - CheckAnswer(10, 5), - AssertOnQuery { query => - assert(listener.progressEvents.nonEmpty) - // SPARK-18868: We can't use query.lastProgress, because in progressEvents, we filter - // out non-zero input rows, but the lastProgress may be a zero input row trigger - val lastNonZeroProgress = query.recentProgress.filter(_.numInputRows > 0).lastOption - .getOrElse(fail("No progress updates received in StreamingQuery!")) - assert(listener.progressEvents.last.json === lastNonZeroProgress.json) - assert(listener.terminationEvent === null) - true - }, - - // Termination event generated when stopped cleanly - StopStream, - AssertOnQuery { query => - eventually(Timeout(streamingTimeout)) { - assert(listener.terminationEvent !== null) - assert(listener.terminationEvent.id === query.id) - assert(listener.terminationEvent.runId === query.runId) - assert(listener.terminationEvent.exception === None) - } - listener.checkAsyncErrors() - listener.reset() - true - }, - - // Termination event generated with exception message when stopped with error - StartStream(ProcessingTime(100), triggerClock = clock), - AddData(inputData, 0), - AdvanceManualClock(100), - ExpectFailure[SparkException], - AssertOnQuery { query => - eventually(Timeout(streamingTimeout)) { - assert(listener.terminationEvent !== null) - assert(listener.terminationEvent.id === query.id) - assert(listener.terminationEvent.exception.nonEmpty) - // Make sure that the exception message reported through listener - // contains the actual exception and relevant stack trace - assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException")) - assert( - listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException")) - assert( - listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite")) - } - listener.checkAsyncErrors() - true + testQuietly(s"single listener, check trigger events are generated correctly- $i") { + val clock = new StreamManualClock + val inputData = new MemoryStream[Int](0, sqlContext) + val df = inputData.toDS().as[Long].map { + 10 / _ + } + val listener = new EventCollector + try { + // No events until started + spark.streams.addListener(listener) + assert(listener.startEvent === null) + assert(listener.progressEvents.isEmpty) + assert(listener.terminationEvent === null) + + testStream(df, OutputMode.Append)( + + // Start event generated when query started + StartStream(ProcessingTime(100), triggerClock = clock), + AssertOnQuery { query => + assert(listener.startEvent !== null) + assert(listener.startEvent.id === query.id) + assert(listener.startEvent.runId === query.runId) + assert(listener.startEvent.name === query.name) + assert(listener.progressEvents.isEmpty) + assert(listener.terminationEvent === null) + true + }, + + // Progress event generated when data processed + AddData(inputData, 1, 2), + AdvanceManualClock(100), + CheckAnswer(10, 5), + AssertOnQuery { query => + assert(listener.progressEvents.nonEmpty) + // SPARK-18868: We can't use query.lastProgress, because in progressEvents, we filter + // out non-zero input rows, but the lastProgress may be a zero input row trigger + val lastNonZeroProgress = query.recentProgress.filter(_.numInputRows > 0).lastOption + .getOrElse(fail("No progress updates received in StreamingQuery!")) + assert(listener.progressEvents.last.json === lastNonZeroProgress.json) + assert(listener.terminationEvent === null) + true + }, + + // Termination event generated when stopped cleanly + StopStream, + AssertOnQuery { query => + eventually(Timeout(streamingTimeout)) { + assert(listener.terminationEvent !== null) + assert(listener.terminationEvent.id === query.id) + assert(listener.terminationEvent.runId === query.runId) + assert(listener.terminationEvent.exception === None) } - ) - } finally { - spark.streams.removeListener(listener) - } + listener.checkAsyncErrors() + listener.reset() + true + }, + + // Termination event generated with exception message when stopped with error + StartStream(ProcessingTime(100), triggerClock = clock), + AddData(inputData, 0), + AdvanceManualClock(100), + ExpectFailure[SparkException], + AssertOnQuery { query => + eventually(Timeout(streamingTimeout)) { + assert(listener.terminationEvent !== null) + assert(listener.terminationEvent.id === query.id) + assert(listener.terminationEvent.exception.nonEmpty) + // Make sure that the exception message reported through listener + // contains the actual exception and relevant stack trace + assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException")) + assert( + listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException")) + assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite")) + } + listener.checkAsyncErrors() + true + } + ) + } finally { + spark.streams.removeListener(listener) } } - /* test("adding and removing listener") { def isListenerActive(listener: EventCollector): Boolean = { @@ -343,7 +339,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // to verify that we can skip broken jsons generated by Structured Streaming. testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt") } - */ private def testReplayListenerBusWithBorkenEventJsons(fileName: String): Unit = { val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName") From fc2bc34b70559d9e52837ff05a898b7000389c8c Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 15 Dec 2016 13:20:52 -0800 Subject: [PATCH 6/8] passed 1,000 times --- .../spark/sql/streaming/StreamingQueryListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 4f19cbd0b5aef..4e6ed2d40db2c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -52,7 +52,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { spark.sparkContext.listenerBus.waitUntilEmpty(10000) } - testQuietly(s"single listener, check trigger events are generated correctly- $i") { + testQuietly(s"single listener, check trigger events are generated correctly") { val clock = new StreamManualClock val inputData = new MemoryStream[Int](0, sqlContext) val df = inputData.toDS().as[Long].map { From ea87b0694a38074ac1e8985c5048d286111785a5 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 15 Dec 2016 13:32:16 -0800 Subject: [PATCH 7/8] remove test hack --- dev/run-tests.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dev/run-tests.py b/dev/run-tests.py index 9435df3d5b0f3..ab285ac96af7e 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -395,8 +395,6 @@ def run_scala_tests_sbt(test_modules, test_profiles): if not sbt_test_goals: return - sbt_test_goals = ['sql/testOnly *StreamingQueryListenerSuite'] - profiles_and_goals = test_profiles + sbt_test_goals print("[info] Running Spark tests using SBT with these arguments: ", From 95824f3e5d15ef057d6ef1b665450053cfee586a Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 15 Dec 2016 13:32:57 -0800 Subject: [PATCH 8/8] minimize changes --- .../spark/sql/streaming/StreamingQueryListenerSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 4e6ed2d40db2c..a057d1d36c5a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -52,12 +52,10 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { spark.sparkContext.listenerBus.waitUntilEmpty(10000) } - testQuietly(s"single listener, check trigger events are generated correctly") { + testQuietly("single listener, check trigger events are generated correctly") { val clock = new StreamManualClock val inputData = new MemoryStream[Int](0, sqlContext) - val df = inputData.toDS().as[Long].map { - 10 / _ - } + val df = inputData.toDS().as[Long].map { 10 / _ } val listener = new EventCollector try { // No events until started