Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27494][SS] Null keys/values don't work in Kafka source v2 #24441

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

@@ -30,13 +30,18 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {

def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = {
rowWriter.reset()
rowWriter.zeroOutNullBytes()

if (record.key == null) {
rowWriter.setNullAt(0)
} else {
rowWriter.write(0, record.key)
}
rowWriter.write(1, record.value)
if (record.value == null) {
rowWriter.setNullAt(1)
} else {
rowWriter.write(1, record.value)
}
This conversation was marked as resolved by uncleGen

This comment has been minimized.

Copy link
@dongjoon-hyun

dongjoon-hyun Apr 23, 2019

Member

Hi, @uncleGen . To prevent a future regression, could you add a unit test case, too?

This comment has been minimized.

Copy link
@dongjoon-hyun

dongjoon-hyun Apr 23, 2019

Member

Please update the PR description. If existing unit tests already covers your case, Apache Spark don't need this PR. :)

This comment has been minimized.

Copy link
@uncleGen

uncleGen Apr 23, 2019

Author Contributor

ok

This comment has been minimized.

Copy link
@dongjoon-hyun
rowWriter.write(2, UTF8String.fromString(record.topic))
This conversation was marked as resolved by uncleGen

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Apr 24, 2019

Contributor

is topic also nullable?

This comment has been minimized.

Copy link
@uncleGen

uncleGen Apr 25, 2019

Author Contributor

topic should not be null

rowWriter.write(3, record.partition)
rowWriter.write(4, record.offset)
@@ -169,6 +169,10 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo
}
}
}

test("SPARK-27494: read kafka record containing null key/values.") {
testNullableKeyValue(ContinuousTrigger(100))
}
}

class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
@@ -1040,6 +1040,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
q.stop()
}
}

test("SPARK-27494: read kafka record containing null key/values.") {
testNullableKeyValue(Trigger.ProcessingTime(100))
}
}


@@ -1511,6 +1515,60 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
)
}

protected def testNullableKeyValue(trigger: Trigger): Unit = {
val table = "kafka_null_key_value_source_test"
withTable(table) {
val topic = newTopic()
testUtils.createTopic(topic)
testUtils.withTranscationalProducer { producer =>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.isolation.level", "read_committed")
.option("startingOffsets", "earliest")
.option("subscribe", topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

val q = df
.writeStream
.format("memory")
.queryName(table)
.trigger(trigger)
.start()
try {
var idx = 0
producer.beginTransaction()
val expected1 = Seq.tabulate(5) { _ =>
producer.send(new ProducerRecord[String, String](topic, null, null)).get()
(null, null)
}.asInstanceOf[Seq[(String, String)]]

val expected2 = Seq.tabulate(5) { _ =>
idx += 1
producer.send(new ProducerRecord[String, String](topic, idx.toString, null)).get()
(idx.toString, null)
}.asInstanceOf[Seq[(String, String)]]

val expected3 = Seq.tabulate(5) { _ =>
idx += 1
producer.send(new ProducerRecord[String, String](topic, null, idx.toString)).get()
(null, idx.toString)
}.asInstanceOf[Seq[(String, String)]]

producer.commitTransaction()
eventually(timeout(streamingTimeout)) {
checkAnswer(spark.table(table), (expected1 ++ expected2 ++ expected3).toDF())
}
} finally {
q.stop()
}
}
}
}
}

object KafkaSourceSuite {
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.