Skip to content

Commit

Permalink
Update KafkaRecord to retrieve records from multiple partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
AbeleMM committed Oct 8, 2020
1 parent d791761 commit 4ec1913
Showing 1 changed file with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import org.apache.kafka.common.TopicPartition

import scala.collection.JavaConverters._

/** *
* Retrieves records from a specified Kafka topic in reverse order.
* @param topicName name of the topic to retrieve records from
* @param kafkaAddress address of Kafka instance to connect to
*/
class KafkaRecordRetriever(topicName: String, kafkaAddress: String) {

var n = 1

val props = new Properties()
private val props = new Properties()
props.put("bootstrap.servers", kafkaAddress)
props.put(
"key.deserializer",
Expand All @@ -22,35 +25,39 @@ class KafkaRecordRetriever(topicName: String, kafkaAddress: String) {
"value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"
)
props.put("max.poll.records", "1")

val kafkaConsumer = new KafkaConsumer[String, String](props)
private val kafkaConsumer = new KafkaConsumer[String, String](props)

val partitions: Seq[TopicPartition] = kafkaConsumer
private val partitions = kafkaConsumer
.partitionsFor(topicName)
.asScala
.map(x => new TopicPartition(x.topic(), x.partition()))
.asJava

kafkaConsumer.assign(partitions)

kafkaConsumer.assign(partitions.asJava)
private val partEndOffsetMap = kafkaConsumer.endOffsets(partitions).asScala
private val partBegOffsetMap =
kafkaConsumer.beginningOffsets(partitions).asScala

def getNextRecord: String = {

kafkaConsumer.seekToEnd(List().asJava)
val maxOffsetKey =
partEndOffsetMap.maxBy(x => x._2 - partBegOffsetMap(x._1))._1

val latestPartAndPos: (TopicPartition, Long) =
partitions.map(x => (x, kafkaConsumer.position(x))).maxBy(_._2)
partEndOffsetMap(maxOffsetKey) -= 1

if (latestPartAndPos._2 < n)
if (partEndOffsetMap(maxOffsetKey) < partBegOffsetMap(maxOffsetKey))
throw new IllegalArgumentException(
"Topic does not contain enough messages to be inferred properly."
)

kafkaConsumer.seek(latestPartAndPos._1, latestPartAndPos._2 - n)
kafkaConsumer.seek(maxOffsetKey, partEndOffsetMap(maxOffsetKey))

val records: ConsumerRecords[String, String] =
kafkaConsumer.poll(Duration.ofMillis(100))

n += 1

records.iterator().next().value()
}

Expand Down

0 comments on commit 4ec1913

Please sign in to comment.