Skip to content

Commit

Permalink
[SPARK-24056][SS] Make consumer creation lazy in Kafka source for Str…
Browse files Browse the repository at this point in the history
…uctured streaming

## What changes were proposed in this pull request?

Currently, the driver side of the Kafka source (i.e. KafkaMicroBatchReader) eagerly creates a consumer as soon as the Kafk aMicroBatchReader is created. However, we create dummy KafkaMicroBatchReader to get the schema and immediately stop it. Its better to make the consumer creation lazy, it will be created on the first attempt to fetch offsets using the KafkaOffsetReader.

## How was this patch tested?
Existing unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21134 from tdas/SPARK-24056.
  • Loading branch information
tdas committed Apr 24, 2018
1 parent 379bffa commit 7b1e652
Showing 1 changed file with 17 additions and 14 deletions.
Expand Up @@ -75,7 +75,17 @@ private[kafka010] class KafkaOffsetReader(
* A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the
* offsets and never commits them.
*/
protected var consumer = createConsumer()
@volatile protected var _consumer: Consumer[Array[Byte], Array[Byte]] = null

protected def consumer: Consumer[Array[Byte], Array[Byte]] = synchronized {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
if (_consumer == null) {
val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
_consumer = consumerStrategy.createConsumer(newKafkaParams)
}
_consumer
}

private val maxOffsetFetchAttempts =
readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
Expand All @@ -95,9 +105,7 @@ private[kafka010] class KafkaOffsetReader(
* Closes the connection to Kafka, and cleans up state.
*/
def close(): Unit = {
runUninterruptibly {
consumer.close()
}
if (_consumer != null) runUninterruptibly { stopConsumer() }
kafkaReaderThread.shutdown()
}

Expand Down Expand Up @@ -304,19 +312,14 @@ private[kafka010] class KafkaOffsetReader(
}
}

/**
* Create a consumer using the new generated group id. We always use a new consumer to avoid
* just using a broken consumer to retry on Kafka errors, which likely will fail again.
*/
private def createConsumer(): Consumer[Array[Byte], Array[Byte]] = synchronized {
val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
consumerStrategy.createConsumer(newKafkaParams)
private def stopConsumer(): Unit = synchronized {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
if (_consumer != null) _consumer.close()
}

private def resetConsumer(): Unit = synchronized {
consumer.close()
consumer = createConsumer()
stopConsumer()
_consumer = null // will automatically get reinitialized again
}
}

Expand Down

0 comments on commit 7b1e652

Please sign in to comment.