Skip to content

Commit

Permalink
Close checks
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronp committed Oct 8, 2020
1 parent 0b9f014 commit 7c04b3f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
9 changes: 6 additions & 3 deletions src/main/scala/kafka4m/admin/RichKafkaAdmin.scala
Expand Up @@ -106,9 +106,12 @@ final class RichKafkaAdmin(val admin: AdminClient) extends AutoCloseable with St
def isClosed() = closed

override def close(): Unit = {
logger.warn("Closing the admin client")
closed = true
admin.close()
if (!closed) {
logger.warn("Closing the admin client")
closed = true
admin.close()
}
logger.warn("Closed the admin client")
}
}

Expand Down
15 changes: 10 additions & 5 deletions src/main/scala/kafka4m/consumer/RichKafkaConsumer.scala
Expand Up @@ -105,7 +105,7 @@ final class RichKafkaConsumer[K, V] private (val consumer: KafkaConsumer[K, V],
val poll: Task[Observable[ConsumerRecord[K, V]]] = {
commandQueue.tryPoll.flatMap {
// try and handle any explicit commands, but if none are queued, then fall-back to polling kafka
case None => nextBatch
case None => nextBatch
case Some(exec: ExecOnConsumer[K, V, _]) =>
Task(exec.run(self)).executeOn(kafkaScheduler).map(_ => NoResults)
}
Expand Down Expand Up @@ -310,11 +310,16 @@ final class RichKafkaConsumer[K, V] private (val consumer: KafkaConsumer[K, V],
def isClosed() = closed

override def close(): Unit = {
withConsumer { c =>
closed = true
c.close()
Schedulers.close(kafkaScheduler)
if (!closed) {
logger.warn("Closing the consumer")
withConsumer { c =>
logger.warn("Closing the consumer on kafka thread")
closed = true
c.close()
Schedulers.close(kafkaScheduler)
}
}
logger.warn("Closed the consumer")
}

override def withConsumer[A](withConsumer: RichKafkaConsumer[K, V] => A): Future[A] = {
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/kafka4m/producer/RichKafkaProducer.scala
Expand Up @@ -60,8 +60,9 @@ final class RichKafkaProducer[K, V](val publisher: KafkaProducer[K, V]) extends
}

override def close(): Unit = {
logger.info("Closing producer")
logger.warn("Closing producer")
publisher.close()
logger.warn("Closed the producer")
}
}

Expand Down

0 comments on commit 7c04b3f

Please sign in to comment.