Skip to content

Commit

Permalink
deal with ProcessAllAvailable
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-torres committed Jan 13, 2018
1 parent 71bfbcf commit 4bb9c3f
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 113 deletions.
Expand Up @@ -86,15 +86,13 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {

try {
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
failAfter(streamingTimeout) {
writer.processAllAvailable()
eventually(timeout(streamingTimeout)) {
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
}
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10"))
failAfter(streamingTimeout) {
writer.processAllAvailable()
eventually(timeout(streamingTimeout)) {
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
}
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
} finally {
writer.stop()
}
Expand Down Expand Up @@ -128,15 +126,13 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {

try {
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
failAfter(streamingTimeout) {
writer.processAllAvailable()
eventually(timeout(streamingTimeout)) {
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
}
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10"))
failAfter(streamingTimeout) {
writer.processAllAvailable()
eventually(timeout(streamingTimeout)) {
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
}
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
} finally {
writer.stop()
}
Expand Down Expand Up @@ -178,15 +174,13 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {

try {
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
failAfter(streamingTimeout) {
writer.processAllAvailable()
eventually(timeout(streamingTimeout)) {
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
}
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10"))
failAfter(streamingTimeout) {
writer.processAllAvailable()
eventually(timeout(streamingTimeout)) {
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
}
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
} finally {
writer.stop()
}
Expand All @@ -210,12 +204,13 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
var writer: StreamingQuery = null
var ex: Exception = null
try {
ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(input.toDF())(
withSelectExpr = "CAST(null as STRING) as topic", "value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
writer.processAllAvailable()
writer = createKafkaWriter(input.toDF())(
withSelectExpr = "CAST(null as STRING) as topic", "value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
Expand Down Expand Up @@ -243,12 +238,13 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
var writer: StreamingQuery = null
var ex: Exception = null
try {
ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(input.toDF())(
withSelectExpr = "value as key", "value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
writer.processAllAvailable()
writer = createKafkaWriter(input.toDF())(
withSelectExpr = "value as key", "value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
Expand All @@ -259,12 +255,13 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {

try {
/* No value field */
ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "value as key"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
writer.processAllAvailable()
writer = createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "value as key"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
Expand Down Expand Up @@ -292,12 +289,13 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
var ex: Exception = null
try {
/* topic field wrong type */
ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(input.toDF())(
withSelectExpr = s"CAST('1' as INT) as topic", "value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
writer.processAllAvailable()
writer = createKafkaWriter(input.toDF())(
withSelectExpr = s"CAST('1' as INT) as topic", "value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
Expand All @@ -306,12 +304,13 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {

try {
/* value field wrong type */
ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
writer.processAllAvailable()
writer = createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
Expand All @@ -320,13 +319,14 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
"value attribute type must be a string or binarytype"))

try {
ex = intercept[StreamingQueryException] {
/* key field wrong type */
writer = createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
writer.processAllAvailable()
/* key field wrong type */
writer = createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
Expand Down Expand Up @@ -379,11 +379,12 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
var writer: StreamingQuery = null
var ex: Exception = null
try {
ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.key.serializer" -> "foo"))()
writer.processAllAvailable()
writer = createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.key.serializer" -> "foo"))()
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'key.serializer' is not supported"))
Expand All @@ -392,11 +393,12 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
}

try {
ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.value.serializer" -> "foo"))()
writer.processAllAvailable()
writer = createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.value.serializer" -> "foo"))()
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'value.serializer' is not supported"))
Expand Down
Expand Up @@ -95,9 +95,10 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends AddData {

override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
if (query.get.isActive) {
query match {
// Make sure no Spark job is running when deleting a topic
query.get.processAllAvailable()
case Some(m: MicroBatchExecution) => m.processAllAvailable()
case _ =>
}

val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap
Expand Down Expand Up @@ -435,6 +436,49 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
query.stop()
}

test("delete a topic when a Spark job is running") {
KafkaSourceSuite.collectedData.clear()

val topic = newTopic()
testUtils.createTopic(topic, partitions = 1)
testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray)

val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("subscribe", topic)
// If a topic is deleted and we try to poll data starting from offset 0,
// the Kafka consumer will just block until timeout and return an empty result.
// So set the timeout to 1 second to make this test fast.
.option("kafkaConsumer.pollTimeoutMs", "1000")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
KafkaSourceSuite.globalTestUtils = testUtils
// The following ForeachWriter will delete the topic before fetching data from Kafka
// in executors.
val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] {
override def open(partitionId: Long, version: Long): Boolean = {
KafkaSourceSuite.globalTestUtils.deleteTopic(topic)
true
}

override def process(value: Int): Unit = {
KafkaSourceSuite.collectedData.add(value)
}

override def close(errorOrNull: Throwable): Unit = {}
}).start()
query.processAllAvailable()
query.stop()
// `failOnDataLoss` is `false`, we should not fail the query
assert(query.exception.isEmpty)
}
}

class KafkaSourceSuiteBase extends KafkaSourceTest {
Expand Down Expand Up @@ -604,49 +648,6 @@ class KafkaSourceSuiteBase extends KafkaSourceTest {
testUnsupportedConfig("kafka.auto.offset.reset", "latest")
}

test("delete a topic when a Spark job is running") {
KafkaSourceSuite.collectedData.clear()

val topic = newTopic()
testUtils.createTopic(topic, partitions = 1)
testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray)

val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("subscribe", topic)
// If a topic is deleted and we try to poll data starting from offset 0,
// the Kafka consumer will just block until timeout and return an empty result.
// So set the timeout to 1 second to make this test fast.
.option("kafkaConsumer.pollTimeoutMs", "1000")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
KafkaSourceSuite.globalTestUtils = testUtils
// The following ForeachWriter will delete the topic before fetching data from Kafka
// in executors.
val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] {
override def open(partitionId: Long, version: Long): Boolean = {
KafkaSourceSuite.globalTestUtils.deleteTopic(topic)
true
}

override def process(value: Int): Unit = {
KafkaSourceSuite.collectedData.add(value)
}

override def close(errorOrNull: Throwable): Unit = {}
}).start()
query.processAllAvailable()
query.stop()
// `failOnDataLoss` is `false`, we should not fail the query
assert(query.exception.isEmpty)
}

test("get offsets from case insensitive parameters") {
for ((optionKey, optionValue, answer) <- Seq(
(STARTING_OFFSETS_OPTION_KEY, "earLiEst", EarliestOffsetRangeLimit),
Expand Down Expand Up @@ -746,9 +747,11 @@ class KafkaSourceSuiteBase extends KafkaSourceTest {
.queryName("kafkaColumnTypes")
.trigger(defaultTrigger)
.start()
query.processAllAvailable()
val rows = spark.table("kafkaColumnTypes").collect()
assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
var rows: Array[Row] = Array()
eventually(timeout(streamingTimeout)) {
rows = spark.table("kafkaColumnTypes").collect()
assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
}
val row = rows(0)
assert(row.getAs[Array[Byte]]("key") === null, s"Unexpected results: $row")
assert(row.getAs[Array[Byte]]("value") === "1".getBytes(UTF_8), s"Unexpected results: $row")
Expand Down

0 comments on commit 4bb9c3f

Please sign in to comment.