Skip to content

Commit

Permalink
[SPARK-28232][SS][SQL] Add groupIdPrefix for Kafka batch connector
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

According to the documentation `groupIdPrefix` should be available for `streaming and batch`.
It is not the case because the batch part is missing.

In this PR I've added:
* Structured Streaming test for v1 and v2 to cover `groupIdPrefix`
* Batch test for v1 and v2 to cover `groupIdPrefix`
* Added `groupIdPrefix` usage in batch

## How was this patch tested?

Additional + existing unit tests.

Closes #25030 from gaborgsomogyi/SPARK-28232.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
gaborgsomogyi authored and cloud-fan committed Jul 2, 2019
1 parent f148674 commit a006c85
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[kafka010] class KafkaBatch(
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = KafkaSourceProvider.batchUniqueGroupId()
val uniqueGroupId = KafkaSourceProvider.batchUniqueGroupId(sourceOptions)

val kafkaOffsetReader = new KafkaOffsetReader(
strategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private[kafka010] class KafkaRelation(
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = KafkaSourceProvider.batchUniqueGroupId()
val uniqueGroupId = KafkaSourceProvider.batchUniqueGroupId(sourceOptions)

val kafkaOffsetReader = new KafkaOffsetReader(
strategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,10 @@ private[kafka010] object KafkaSourceProvider extends Logging {
* Returns a unique batch consumer group (group.id), allowing the user to set the prefix of
* the consumer group
*/
private[kafka010] def batchUniqueGroupId(): String = {
s"spark-kafka-relation-${UUID.randomUUID}"
private[kafka010] def batchUniqueGroupId(parameters: Map[String, String]): String = {
val groupIdPrefix = parameters
.getOrElse(GROUP_ID_PREFIX, "spark-kafka-relation")
s"${groupIdPrefix}-${UUID.randomUUID}"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,23 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
)
}

test("allow group.id prefix") {
testGroupId("groupIdPrefix", (expected, actual) => {
assert(actual.exists(_.startsWith(expected)) && !actual.exists(_ === expected),
"Valid consumer groups don't contain the expected group id - " +
s"Valid consumer groups: $actual / expected group id: $expected")
})
}

test("allow group.id override") {
testGroupId("kafka.group.id", (expected, actual) => {
assert(actual.exists(_ === expected), "Valid consumer groups don't " +
s"contain the expected group id - Valid consumer groups: $actual / " +
s"expected group id: $expected")
})
}

private def testGroupId(groupIdKey: String, validateGroupId: (String, Iterable[String]) => Unit) {
// Tests code path KafkaSourceProvider.{sourceSchema(.), createSource(.)}
// as well as KafkaOffsetReader.createConsumer(.)
val topic = newTopic()
Expand All @@ -673,7 +689,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
val dsKafka = spark
.readStream
.format("kafka")
.option("kafka.group.id", customGroupId)
.option(groupIdKey, customGroupId)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
Expand All @@ -689,9 +705,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
val consumerGroups = testUtils.listConsumerGroups()
val validGroups = consumerGroups.valid().get()
val validGroupsId = validGroups.asScala.map(_.groupId())
assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " +
s"contain the expected group id - Valid consumer groups: $validGroupsId / " +
s"expected group id: $customGroupId")
validateGroupId(customGroupId, validGroupsId)
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,23 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSQLContext wi
testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
}

test("allow group.id overriding") {
test("allow group.id prefix") {
testGroupId("groupIdPrefix", (expected, actual) => {
assert(actual.exists(_.startsWith(expected)) && !actual.exists(_ === expected),
"Valid consumer groups don't contain the expected group id - " +
s"Valid consumer groups: $actual / expected group id: $expected")
})
}

test("allow group.id override") {
testGroupId("kafka.group.id", (expected, actual) => {
assert(actual.exists(_ === expected), "Valid consumer groups don't " +
s"contain the expected group id - Valid consumer groups: $actual / " +
s"expected group id: $expected")
})
}

private def testGroupId(groupIdKey: String, validateGroupId: (String, Iterable[String]) => Unit) {
// Tests code path KafkaSourceProvider.createRelation(.)
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
Expand All @@ -256,15 +272,13 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSQLContext wi
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))

val customGroupId = "id-" + Random.nextInt()
val df = createDF(topic, withOptions = Map("kafka.group.id" -> customGroupId))
val df = createDF(topic, withOptions = Map(groupIdKey -> customGroupId))
checkAnswer(df, (1 to 30).map(_.toString).toDF())

val consumerGroups = testUtils.listConsumerGroups()
val validGroups = consumerGroups.valid().get()
val validGroupsId = validGroups.asScala.map(_.groupId())
assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " +
s"contain the expected group id - Valid consumer groups: $validGroupsId / " +
s"expected group id: $customGroupId")
validateGroupId(customGroupId, validGroupsId)
}

test("read Kafka transactional messages: read_committed") {
Expand Down

0 comments on commit a006c85

Please sign in to comment.