Skip to content

Commit

Permalink
Merge pull request #377 from orpiske/fix-camel-stop-npe
Browse files Browse the repository at this point in the history
Prevent NPEs on shutdown when critical errors occur
  • Loading branch information
oscerd committed Aug 18, 2020
2 parents d365f29 + 4e83421 commit 9b5d415
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
Expand Up @@ -153,7 +153,15 @@ public void put(Collection<SinkRecord> sinkRecords) {
public void stop() {
LOG.info("Stopping CamelSinkTask connector task");
try {
cms.stop();
if (cms != null) {
/*
If the CamelMainSupport instance fails to be instantiated (ie.: due to missing classes or similar
issues) then it won't be assigned and de-referencing it could cause an NPE.
*/
cms.stop();
} else {
LOG.warn("A fatal exception may have occurred and the Camel main was not created");
}
} catch (Exception e) {
throw new ConnectException("Failed to stop Camel context", e);
} finally {
Expand Down
Expand Up @@ -179,12 +179,24 @@ public synchronized List<SourceRecord> poll() {
public void stop() {
LOG.info("Stopping CamelSourceTask connector task");
try {
consumer.stop();
if (consumer != null) {
consumer.stop();
} else {
LOG.warn("A critical error may have occurred and there is no consumer to stop");
}
} catch (Exception e) {
LOG.error("Error stopping camel consumer: {}", e.getMessage());
}
try {
cms.stop();
/*
If the CamelMainSupport instance fails to be instantiated (ie.: due to missing classes or similar
issues) then it won't be assigned and de-referencing it could cause an NPE.
*/
if (cms != null) {
cms.stop();
} else {
LOG.warn("A fatal exception may have occurred and the Camel main was not created");
}
} catch (Exception e) {
throw new ConnectException("Failed to stop Camel context", e);
} finally {
Expand Down
Expand Up @@ -72,7 +72,7 @@ public void testDataFormatNotFound() {

CamelSinkTask camelsinkTask = new CamelSinkTask();
assertThrows(ConnectException.class, () -> camelsinkTask.start(props));
assertThrows(ConnectException.class, () -> camelsinkTask.stop());
// No need to check the stop method. The error is already thrown/caught during startup.
}

@Test
Expand Down

0 comments on commit 9b5d415

Please sign in to comment.