Skip to content

Commit

Permalink
Issue #1273: Moved logic for escalating connection failure to avoid u…
Browse files Browse the repository at this point in the history
…nwanted side effects.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Jan 18, 2022
1 parent fc7a047 commit b8f87db
Showing 1 changed file with 3 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,18 +183,6 @@ protected CompletionStage<SendResult> publishMessage(final Signal<?> signal,
.withHeader("ditto-connection-id", connection.getId().toString());

return producerStream.publish(publishTarget, messageWithConnectionIdHeader)
.whenComplete((recordMetadata, throwable) -> {
if (null != throwable) {
final var context = getContext();
final var parent = context.getParent();
final var self = getSelf();
parent.tell(ConnectionFailure.of(self,
throwable,
ConnectionFailure.determineFailureDescription(Instant.now(),
throwable,
"Broker may not be available.")), self);
}
})
.thenApply(callback);
}

Expand Down Expand Up @@ -387,6 +375,9 @@ private void handleSendResult(
logger.debug("Failed to send kafka record: [{}] {}", exception.getClass().getName(),
exception.getMessage());
resultFuture.completeExceptionally(exception);
escalate(exception, ConnectionFailure.determineFailureDescription(Instant.now(),
exception,
"Broker may not be available."));
}
}

Expand Down

0 comments on commit b8f87db

Please sign in to comment.