Skip to content

Commit

Permalink
Fix new Sonar smells
Browse files Browse the repository at this point in the history
* Use `tryEmitNext` on Reactor `Sink` since `emitNext` is deprecated
* Add `MessageDeliveryException` emission when `send()` returns `false`
in the `FluxMessageChannel` for `subscribeTo` provided `Publisher`
  • Loading branch information
artembilan committed Sep 11, 2020
1 parent 383af8c commit 481d5eb
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 8 deletions.
Expand Up @@ -23,6 +23,7 @@
import org.reactivestreams.Subscriber;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.util.Assert;

import reactor.core.Disposable;
Expand Down Expand Up @@ -71,7 +72,8 @@ protected boolean doSend(Message<?> message, long timeout) {
long parkTimeout = 10; // NOSONAR
long parkTimeoutNs = TimeUnit.MILLISECONDS.toNanos(parkTimeout);
while (this.active && !tryEmitMessage(message)) {
if (timeout >= 0 && (remainingTime -= parkTimeout) <= 0) {
remainingTime -= parkTimeout;
if (timeout >= 0 && remainingTime <= 0) {
return false;
}
LockSupport.parkNanos(parkTimeoutNs);
Expand All @@ -95,9 +97,9 @@ private boolean tryEmitMessage(Message<?> message) {
@Override
public void subscribe(Subscriber<? super Message<?>> subscriber) {
this.processor
.doFinally((s) -> this.subscribedSignal.emitNext(this.processor.hasDownstreams()))
.doFinally((s) -> this.subscribedSignal.tryEmitNext(this.processor.hasDownstreams()))
.subscribe(subscriber);
this.subscribedSignal.emitNext(this.processor.hasDownstreams());
this.subscribedSignal.tryEmitNext(this.processor.hasDownstreams());
}

@Override
Expand All @@ -108,7 +110,10 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
.publishOn(Schedulers.boundedElastic())
.doOnNext((message) -> {
try {
send(message);
if (!send(message)) {
throw new MessageDeliveryException(message,
"Failed to send message to channel '" + this);
}
}
catch (Exception ex) {
logger.warn("Error during processing event: " + message, ex);
Expand All @@ -120,7 +125,7 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
@Override
public void destroy() {
this.active = false;
this.subscribedSignal.emitNext(false);
this.subscribedSignal.tryEmitNext(false);
this.upstreamSubscriptions.dispose();
this.processor.onComplete();
super.destroy();
Expand Down
Expand Up @@ -100,7 +100,7 @@ public static <T> Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageS
* - a {@link org.springframework.integration.channel.FluxMessageChannel}
* is returned as is because it is already a {@link Publisher};
* - a {@link SubscribableChannel} is subscribed with a {@link MessageHandler}
* for the {@link Sinks.Many#emitNext(Object)} which is returned from this method;
* for the {@link Sinks.Many#tryEmitNext(Object)} which is returned from this method;
* - a {@link PollableChannel} is wrapped into a {@link MessageSource} lambda and reuses
* {@link #messageSourceToFlux(MessageSource)}.
* @param messageChannel the {@link MessageChannel} to adapt.
Expand Down
Expand Up @@ -301,7 +301,7 @@ public void testReactiveStreamsConsumerFluxMessageChannelReactiveMessageHandler(

ReactiveMessageHandler messageHandler =
m -> {
sink.emitNext(m);
sink.tryEmitNext(m);
return Mono.empty();
};

Expand Down
Expand Up @@ -544,7 +544,7 @@ static class TestController {

@MessageMapping("receive")
void receive(String payload) {
this.fireForgetPayloads.emitNext(payload);
this.fireForgetPayloads.tryEmitNext(payload);
}

@MessageMapping("echo")
Expand Down

0 comments on commit 481d5eb

Please sign in to comment.