diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 57a15bae4b..02dc8e385a 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -153,7 +153,15 @@ public void put(Collection sinkRecords) { public void stop() { LOG.info("Stopping CamelSinkTask connector task"); try { - cms.stop(); + if (cms != null) { + /* + If the CamelMainSupport instance fails to be instantiated (ie.: due to missing classes or similar + issues) then it won't be assigned can de-referencing it could cause an NPE. + */ + cms.stop(); + } else { + LOG.warn("A fatal exception may have occurred and the Camel main was not created"); + } } catch (Exception e) { throw new ConnectException("Failed to stop Camel context", e); } finally { diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index acdba59c6f..10f13133b2 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -179,12 +179,24 @@ public synchronized List poll() { public void stop() { LOG.info("Stopping CamelSourceTask connector task"); try { - consumer.stop(); + if (consumer != null) { + consumer.stop(); + } else { + LOG.warn("A critical error may have occurred and there is no consumer to stop"); + } } catch (Exception e) { LOG.error("Error stopping camel consumer: {}", e.getMessage()); } try { - cms.stop(); + /* + If the CamelMainSupport instance fails to be instantiated (ie.: due to missing classes or similar + issues) then it won't be assigned can de-referencing it could cause an NPE. + */ + if (cms != null) { + cms.stop(); + } else { + LOG.warn("A fatal exception may have occurred and the Camel main was not created"); + } } catch (Exception e) { throw new ConnectException("Failed to stop Camel context", e); } finally { diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java index 6b7abf3bf7..679d554818 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java @@ -72,7 +72,7 @@ public void testDataFormatNotFound() { CamelSinkTask camelsinkTask = new CamelSinkTask(); assertThrows(ConnectException.class, () -> camelsinkTask.start(props)); - assertThrows(ConnectException.class, () -> camelsinkTask.stop()); + // No need to check the stop method. The error is already thrown/caught during startup. } @Test