Skip to content

Commit

Permalink
door,pool: handle multiple possible KafkaExceptions
Browse files Browse the repository at this point in the history
Motivation:
KafaTemplate class might throw

  org.springframework.kafka.KafkaException
as well as
  org.apache.kafka.common.KafkaException

Thus KafkaTemplate#send should be wrapped ti try-catch block that
handles both.

Result:
proper handling of multiple KafkaException.

Acked-by: Lea Morschel
Target: master, 9.2, 9.1, 9.0, 8.2
Require-book: no
Require-notes: yes
(cherry picked from commit 2ddd74a)
Signed-off-by: Tigran Mkrtchyan <tigran.mkrtchyan@desy.de>
  • Loading branch information
kofemann authored and mksahakyan committed Oct 27, 2023
1 parent c9fe2f6 commit af96f5c
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 23 deletions.
Expand Up @@ -1231,9 +1231,8 @@ private void sendRemoveInfoToBilling(FileAttributes attributes, FsPath path) {

try {
_kafkaSender.accept(infoRemove);
} catch (KafkaException e) {
LOGGER.warn(Throwables.getRootCause(e).getMessage());

} catch (KafkaException | org.apache.kafka.common.KafkaException e) {
LOGGER.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage());
}
}

Expand Down
@@ -1,6 +1,6 @@
/* dCache - http://www.dcache.org/
*
* Copyright (C) 2014 - 2022 Deutsches Elektronen-Synchrotron
* Copyright (C) 2014 - 2023 Deutsches Elektronen-Synchrotron
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -731,9 +731,8 @@ private void sendRemoveInfoToBilling(PnfsId pnfsId, FsPath path, Subject subject

try {
_kafkaSender.accept(infoRemove);
} catch (KafkaException e) {
_log.warn(Throwables.getRootCause(e).getMessage());

} catch (KafkaException | org.apache.kafka.common.KafkaException e) {
_log.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage());
}
}

Expand Down
Expand Up @@ -157,9 +157,8 @@ private void sendBillingInfo(MoverInfoMessage moverInfoMessage) {

try {
_kafkaSender.accept(moverInfoMessage);
} catch (KafkaException e) {
LOGGER.warn(Throwables.getRootCause(e).getMessage());

} catch (KafkaException | org.apache.kafka.common.KafkaException e) {
LOGGER.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage());
}
}

Expand Down
Expand Up @@ -610,9 +610,8 @@ public void stateChanged(StateChangeEvent event) {

try {
_kafkaSender.accept(msg);
} catch (KafkaException e) {
LOGGER.warn(Throwables.getRootCause(e).getMessage());

} catch (KafkaException | org.apache.kafka.common.KafkaException e) {
LOGGER.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage());
}
}
}
Expand Down
Expand Up @@ -1197,12 +1197,10 @@ private void done(@Nullable Throwable cause) {
addFromNearlineStorage(infoMsg, storage);

billingStub.notify(infoMsg);

try {
_kafkaSender.accept(infoMsg);
} catch (KafkaException e) {
LOGGER.warn(Throwables.getRootCause(e).getMessage());

} catch (KafkaException | org.apache.kafka.common.KafkaException e) {
LOGGER.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage());
}
flushRequests.removeAndCallback(pnfsId, cause);
}
Expand Down Expand Up @@ -1435,9 +1433,8 @@ private void done(@Nullable Throwable cause) {
billingStub.notify(infoMsg);
try {
_kafkaSender.accept(infoMsg);
} catch (KafkaException e) {
LOGGER.warn(Throwables.getRootCause(e).getMessage());

} catch (KafkaException | org.apache.kafka.common.KafkaException e) {
LOGGER.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage());
}
stageRequests.removeAndCallback(pnfsId, cause);
}
Expand Down
5 changes: 2 additions & 3 deletions modules/dcache/src/main/java/org/dcache/util/Transfer.java
Expand Up @@ -1192,9 +1192,8 @@ public synchronized void notifyBilling(int code, String error) {

try {
_kafkaSender.accept(msg);
} catch (KafkaException e) {
_log.warn(Throwables.getRootCause(e).getMessage());

} catch (KafkaException | org.apache.kafka.common.KafkaException e) {
_log.warn("Failed to send message to kafka: {} ", Throwables.getRootCause(e).getMessage());
}
}

Expand Down

0 comments on commit af96f5c

Please sign in to comment.