Skip to content

Commit

Permalink
fix kafka test
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Mar 6, 2019
1 parent 3261ed5 commit f835a5c
Showing 1 changed file with 24 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import java.util.Locale
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.scalatest.time.SpanSugar._
import scala.collection.JavaConverters._

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{BinaryType, DataType}
Expand Down Expand Up @@ -227,39 +226,23 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
val topic = newTopic()
testUtils.createTopic(topic)

/* No topic field or topic option */
var writer: StreamingQuery = null
var ex: Exception = null
try {
writer = createKafkaWriter(input.toDF())(
val ex = intercept[AnalysisException] {
/* No topic field or topic option */
createKafkaWriter(input.toDF())(
withSelectExpr = "value as key", "value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
}
assert(ex.getMessage
.toLowerCase(Locale.ROOT)
.contains("topic option required when no 'topic' attribute is present"))

try {
val ex2 = intercept[AnalysisException] {
/* No value field */
writer = createKafkaWriter(input.toDF())(
createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "value as key"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
"required attribute 'value' not found"))
}

Expand All @@ -278,53 +261,30 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
val topic = newTopic()
testUtils.createTopic(topic)

var writer: StreamingQuery = null
var ex: Exception = null
try {
val ex = intercept[AnalysisException] {
/* topic field wrong type */
writer = createKafkaWriter(input.toDF())(
createKafkaWriter(input.toDF())(
withSelectExpr = s"CAST('1' as INT) as topic", "value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string"))

try {
val ex2 = intercept[AnalysisException] {
/* value field wrong type */
writer = createKafkaWriter(input.toDF())(
createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
"value attribute type must be a string or binary"))

try {
val ex3 = intercept[AnalysisException] {
/* key field wrong type */
writer = createKafkaWriter(input.toDF())(
createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value"
)
testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
assert(ex3.getMessage.toLowerCase(Locale.ROOT).contains(
"key attribute type must be a string or binary"))
}

Expand Down Expand Up @@ -369,35 +329,22 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
var writer: StreamingQuery = null
var ex: Exception = null
try {
writer = createKafkaWriter(

val ex = intercept[IllegalArgumentException] {
createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.key.serializer" -> "foo"))()
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'key.serializer' is not supported"))
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'key.serializer' is not supported"))

try {
writer = createKafkaWriter(
val ex2 = intercept[IllegalArgumentException] {
createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.value.serializer" -> "foo"))()
eventually(timeout(streamingTimeout)) {
assert(writer.exception.isDefined)
ex = writer.exception.get
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'value.serializer' is not supported"))
} finally {
writer.stop()
}
assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
"kafka option 'value.serializer' is not supported"))
}

test("generic - write big data with small producer buffer") {
Expand Down

0 comments on commit f835a5c

Please sign in to comment.