Skip to content

Commit

Permalink
removed topic filtering from poll
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronp committed Mar 30, 2020
1 parent 1cc305b commit 1c8500f
Showing 1 changed file with 1 addition and 5 deletions.
6 changes: 1 addition & 5 deletions src/main/scala/kafka4m/consumer/RichKafkaConsumer.scala
Expand Up @@ -74,11 +74,7 @@ final class RichKafkaConsumer[K, V] private (val consumer: KafkaConsumer[K, V],
try {
val records: ConsumerRecords[K, V] = consumer.poll(timeout)
logger.debug(s"Got ${records.count()} records from ${records.partitions().asScala.mkString(s"[", ",", "]")}")
val forTopic: Iterable[ConsumerRecord[K, V]] = {
records.asScala.filter { record =>
topics.contains(record.topic())
}
}
val forTopic: Iterable[ConsumerRecord[K, V]] = records.asScala
logger.trace(s"Got ${forTopic.size} of ${records.count()} for topic '$topics' records from ${records.partitions().asScala.mkString(s"[", ",", "]")}")
forTopic
} catch {
Expand Down

0 comments on commit 1c8500f

Please sign in to comment.