Skip to content

Commit

Permalink
fix: PubSubPublishCallback handling of success and failure callbacks (#…
Browse files Browse the repository at this point in the history
…1800)

Fixes #1799.
  • Loading branch information
nathanpiper committed May 6, 2023
1 parent 1b04165 commit b134c92
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,25 @@ private class PubSubPublishCallback implements BiConsumer<String, Throwable> {
this.message = message;
}

@Override
public void accept(String messageId, Throwable throwable) {
private void handleSuccess(String messageId) {
if (PubSubMessageHandler.this.successCallback != null) {
PubSubMessageHandler.this.successCallback.onSuccess(messageId, message);
}
}

private void handleFailure(Throwable throwable) {
if (PubSubMessageHandler.this.failureCallback != null) {
PubSubMessageHandler.this.failureCallback.onFailure(throwable, message);
}
}

@Override
public void accept(String messageId, Throwable throwable) {
if (throwable == null) {
handleSuccess(messageId);
} else {
handleFailure(throwable);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,18 +224,25 @@ void publishWithSuccessCallback() {

AtomicReference<String> messageIdRef = new AtomicReference<>();
AtomicReference<String> ackIdRef = new AtomicReference<>();
AtomicReference<Throwable> failureCauseRef = new AtomicReference<>();

this.adapter.setSuccessCallback(
(ackId, message) -> {
messageIdRef.set(message.getHeaders().get("message_id", String.class));
ackIdRef.set(ackId);
});

this.adapter.setFailureCallback(
(exception, message) -> {
failureCauseRef.set(exception);
});

this.adapter.handleMessage(testMessage);
Awaitility.await().atMost(Duration.ofSeconds(1)).untilAtomic(messageIdRef, notNullValue());

assertThat(messageIdRef).hasValue("123");
assertThat(ackIdRef).hasValue("published12345");
assertThat(failureCauseRef).hasValue(null);
}

@Test
Expand All @@ -249,9 +256,15 @@ void publishWithFailureCallback() {
Message<String> testMessage =
new GenericMessage<>("testPayload", Collections.singletonMap("message_id", "123"));

AtomicReference<String> ackIdRef = new AtomicReference<>();
AtomicReference<Throwable> failureCauseRef = new AtomicReference<>();
AtomicReference<String> messageIdRef = new AtomicReference<>();

this.adapter.setSuccessCallback(
(ackId, message) -> {
ackIdRef.set(ackId);
});

this.adapter.setFailureCallback(
(exception, message) -> {
failureCauseRef.set(exception);
Expand All @@ -264,5 +277,7 @@ void publishWithFailureCallback() {
assertThat(messageIdRef).hasValue("123");
Throwable cause = failureCauseRef.get();
assertThat(cause).isInstanceOf(RuntimeException.class).hasMessage("boom!");

assertThat(ackIdRef).hasValue(null);
}
}

0 comments on commit b134c92

Please sign in to comment.