From f36ec155c0efd0a99b0e53a53dff15f52b7d849b Mon Sep 17 00:00:00 2001 From: Otavio Rodolfo Piske Date: Mon, 12 Dec 2022 15:25:12 +0100 Subject: [PATCH] camel-kafka: prevent exceptions in close from leaking (CAMEL-18796) --- .../resume/kafka/SingleNodeKafkaResumeStrategy.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java index 9a60cce760fd2..9224ffa129bbb 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java @@ -190,7 +190,11 @@ private void refresh(CountDownLatch latch) { } finally { if (consumer != null) { consumer.unsubscribe(); - consumer.close(Duration.ofSeconds(5)); + try { + consumer.close(Duration.ofSeconds(5)); + } catch (Exception e) { + LOG.warn("Error closing the consumer: {} (this error will be ignored)", e.getMessage(), e); + } } } } @@ -373,6 +377,8 @@ public void stop() { IOHelper.close(producer, "Kafka producer", LOG); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.warn("Error closing the Kafka producer: {} (this error will be ignored)", e.getMessage(), e); } finally { lock.unlock(); }