Skip to content

Commit

Permalink
#1619 adjusted monitor to use in OutboundMappingProcessorActor#messag…
Browse files Browse the repository at this point in the history
…eDiscarded

* to not use `responseDispatchedMonitor`, but getMonitorsForOutboundSignal() with using `responseMappedMonitor` instead for responses

Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle authored and Stanchev Aleksandar committed May 4, 2023
1 parent 316edf6 commit f57e41f
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,21 @@ protected OutboundSignalWithSender mapMessage(final OutboundSignal message) {
}

@Override
protected void messageDiscarded(OutboundSignal message, QueueOfferResult result) {
protected void messageDiscarded(final OutboundSignal message, final QueueOfferResult result) {
final Set<ConnectionMonitor> monitorsForOutboundSignal =
getMonitorsForOutboundSignal(message, MAPPED, LogType.MAPPED, responseMappedMonitor);
if (QueueOfferResult.dropped().equals(result)) {
responseDispatchedMonitor.failure(message.getSource(), "Message is dropped as a result of backpressure strategy!");
monitorsForOutboundSignal.forEach(monitor ->
monitor.failure(message.getSource(), "Message is dropped as a result of backpressure strategy!")
);
} else if (result instanceof final QueueOfferResult.Failure failure) {
responseDispatchedMonitor.failure(message.getSource(), "Enqueue failed! - failure: {}", failure.cause());
monitorsForOutboundSignal.forEach(monitor ->
monitor.failure(message.getSource(), "Enqueue failed! - failure: {}", failure.cause())
);
} else {
responseDispatchedMonitor.failure(message.getSource(), "Enqueue failed without acknowledgement!");
monitorsForOutboundSignal.forEach(monitor ->
monitor.failure(message.getSource(), "Enqueue failed without acknowledgement!")
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;

@FixMethodOrder(MethodSorters.DEFAULT)
/**
* Tests in addition to {@link MessageMappingProcessorActorTest}
* for {@link OutboundMappingProcessorActor} only.
*/
@FixMethodOrder(MethodSorters.DEFAULT)
public final class OutboundMappingProcessorActorTest {

@ClassRule
Expand Down

0 comments on commit f57e41f

Please sign in to comment.