From 67c890c72d37827021f29e4628b6ee07d6afeb1c Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 25 Jan 2018 11:11:09 -0800 Subject: [PATCH 1/4] await termination --- .../scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 27dbb3f7a8f31..2b56d721c1159 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -1116,6 +1116,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared } query.stop() + query.awaitTermination() // `failOnDataLoss` is `false`, we should not fail the query if (query.exception.nonEmpty) { throw query.exception.get From e81e1748652bd2f6aa95769ca05eed31753473bd Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 25 Jan 2018 11:55:40 -0800 Subject: [PATCH 2/4] full fix --- .../spark/sql/kafka010/KafkaContinuousSourceSuite.scala | 6 ++++++ .../org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 1 + 2 files changed, 7 insertions(+) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index a7083fa4e3417..5ccd2849362fc 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -87,6 +87,12 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { class KafkaContinuousSourceStressForDontFailOnDataLossSuite extends KafkaSourceStressForDontFailOnDataLossSuite { + + override def createSparkSession(): TestSparkSession = { + // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic + new TestSparkSession(new SparkContext("local[10,3]", "test-sql-context", sparkConf)) + } + override protected def startStream(ds: Dataset[Int]) = { ds.writeStream .format("memory") diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 2b56d721c1159..2fd325c543f29 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -1115,6 +1115,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared } } + query.processAllAvailable() query.stop() query.awaitTermination() // `failOnDataLoss` is `false`, we should not fail the query From 3093d919ada739fca10b1fcc73e9b2c620eeeb16 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 25 Jan 2018 12:26:24 -0800 Subject: [PATCH 3/4] more descriptive test spark session names --- .../spark/sql/kafka010/KafkaContinuousSourceSuite.scala | 5 ++++- .../org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index 5ccd2849362fc..b77bb9cfcf287 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -90,7 +90,10 @@ class KafkaContinuousSourceStressForDontFailOnDataLossSuite override def createSparkSession(): TestSparkSession = { // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic - new TestSparkSession(new SparkContext("local[10,3]", "test-sql-context", sparkConf)) + new TestSparkSession(new SparkContext( + "local[10,3]", + "test-KafkaContinuousSourceStressForDontFailOnDataLossSuite", + sparkConf)) } override protected def startStream(ds: Dataset[Int]) = { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 2fd325c543f29..e855c5ce46673 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -1006,7 +1006,10 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared override def createSparkSession(): TestSparkSession = { // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic - new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf)) + new TestSparkSession(new SparkContext( + "local[2,3]", + "test-KafkaSourceStressForDontFailOnDataLossSuite", + sparkConf)) } override def beforeAll(): Unit = { From 66bdcea6c18770fa339b03b3765a4724325c2cb3 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 13 Feb 2018 09:30:23 -0800 Subject: [PATCH 4/4] add imports --- .../apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index 4bf7d05a08d39..c9c8f34c6b2ff 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.kafka010 +import org.apache.spark.SparkContext import org.apache.spark.sql.Dataset import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.sql.test.TestSparkSession // Run tests in KafkaSourceSuiteBase in continuous execution mode. class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest