Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] ServiceBusReceiverClient stops consuming messages after some time though messages are present in subscription. #26465

Closed
3 tasks done
abhijitkushwaha1998 opened this issue Jan 13, 2022 · 25 comments
Assignees
Labels
bug This issue requires a change to an existing behavior in the product in order to be resolved. Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention This issue needs attention from Azure service team or SDK team pillar-reliability The issue is related to reliability, one of our core engineering pillars. (includes stress testing) Service Bus
Milestone

Comments

@abhijitkushwaha1998
Copy link

abhijitkushwaha1998 commented Jan 13, 2022

Describe the bug
We are using ServiceBusReceiverClient as receiver but we have observed that though the services were running and messages were present in subscription it has stopped consuming the messages .

Exception or Stack Trace

2022-01-06` 15:14:25,169 ERROR [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Error in SendLinkHandler. Disposing unconfirmed sends.
The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96] com.azure.core.amqp.exception.AmqpException: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]
at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:85)
at com.azure.core.amqp.implementation.handler.LinkHandler.handleRemoteLinkClosed(LinkHandler.java:110)
at com.azure.core.amqp.implementation.handler.LinkHandler.onLinkRemoteClose(LinkHandler.java:61)
at com.azure.core.amqp.implementation.handler.SendLinkHandler.onLinkRemoteClose(SendLinkHandler.java:29)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:92)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2022-01-06 15:14:25,196 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Closing request/response channel.
2022-01-06 15:14:25,196 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Closing request/response channel.
2022-01-06 15:14:25,220 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel:$cbs - namespace[MF_7b61df_1641477262829] entityPath[$cbs]: Retry #1. Transient error occurred. Retrying after 4964 ms. com.azure.core.amqp.exception.AmqpException: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]
2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] terminating 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]).
2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] terminating 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]).
2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] completed the termination of 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]).
2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] completed the termination of 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]).
2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Channel already closed.
2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Channel already closed.
2022-01-06 15:14:25,222 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] SendLinkHandler disposed. Remaining: 1
2022-01-06 15:14:25,222 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] SendLinkHandler disposed. Remaining: 1
2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - onLinkRemoteClose connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver], errorCondition[amqp:connection:forced] errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25]
2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - onLinkRemoteClose connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver], errorCondition[amqp:connection:forced] errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25]
2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver] state[ACTIVE] Local link state is not closed.
2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver] state[ACTIVE] Local link state is not closed.
2022-01-06 15:14:25,224 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] ReceiveLinkHandler disposed. Remaining: 0

To Reproduce
This issue is coming randomly.

Expected behavior
ServiceBusReceiverClient should consume the messages available in subscription.

Setup (please complete the following information):

  • OS: Linux
  • IDE: IntelliJ
  • Library/Libraries: com.azure:azure-messaging-servicebus-7.5.1.jar
  • Java version: Java 8

Additional context
Similar to issue https://github.com/Azure/azure-sdk-for-java/issues/26138

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added
@ghost ghost added needs-triage This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Jan 13, 2022
@abhijitkushwaha1998 abhijitkushwaha1998 changed the title [BUG] ESB Service Bus Receiver stops consuming messages after some time although messages are present in subscription. [BUG] ESB Service Bus Receiver stops consuming messages after some time though messages are present in subscription. Jan 13, 2022
@abhijitkushwaha1998 abhijitkushwaha1998 changed the title [BUG] ESB Service Bus Receiver stops consuming messages after some time though messages are present in subscription. [BUG] ServiceBusReceiverClient stops consuming messages after some time though messages are present in subscription. Jan 13, 2022
@renst
Copy link

renst commented Jan 13, 2022

We have experienced the exact same issue but we are using the spring cloud stream binder. From our analysis the issue occurs when we receive a non retriable amqp exception. Which we have been able to correlate with internal server errors on the servicebus, last time this occurred there was a server downtime of over 1 hour on azure side. The disconnect happens across all applications and pods at the exact same time when these errors occurs.

Currently we are trying to setup a health check towards the binder in order to restart the pods when this occurs as a temporary workaround while the issue persists. From what we can see there is no adequate way of attaching our own logic to the health check. The ServiceBusQueueHealthIndicator that was added in recent release only verifies that the binder managed to connect once and returns "UP" status even though we lose connection to the servicebus.

Is there any way of overriding this healthIndicator class to create a more elaborate healthcheck?

@joshfree joshfree added Client This issue points to a problem in the data-plane of the library. pillar-reliability The issue is related to reliability, one of our core engineering pillars. (includes stress testing) Service Bus labels Jan 13, 2022
@ghost ghost removed the needs-triage This is a new issue that needs to be triaged to the appropriate team. label Jan 13, 2022
@joshfree
Copy link
Member

@anuchandy @ki1729 can you please follow up?

@conniey conniey added this to To do in Azure AMQP - Nickel Jan 14, 2022
@nomisRev
Copy link

nomisRev commented Mar 1, 2022

I think we have found a fix for this, but not sure where to report. So replying on this issue report, but happy to elaborate else where.

We're also seeing this issue when using ServiceBusReceiverAsyncClient with ServiceBusReceiverAsyncClient#receiveMessages, after some time (random) the client stops receiving messages even though there is messages in the subscription.

We also witnessed that this issue does not occur with ServiceBusProcessorClient, so I went to investigate and found that ProcessorClient has a workaround on top of ServiceBusReceiverAsyncClient to fix this issue by monitoring the Connection. The fix can be found here on line 209, it checks every 30 seconds if the ServiceBusConnectionProcessor of the ServiceBusReceiverAsyncClient#isConnectionClosed. If the connection is closed, it restarts the ServiceBusReceiverAsyncClient solving the issue.

In our own project we have fixed this, in a very dirty way using reflection but we also check the ServiceBusConnectionProcessor of the ServiceBusReceiverAsyncClient and close/recreate the ServiceBusReceiverAsyncClient when the channel is closed.

We're now not seeing this issue any more in production.

Happy to discuss further, or elaborate more if there are any doubts or questions. If I have any more details I'll report back.
If desired I can attempt to make a PR, but it's very hard to tests. We just ran the fix in production to see if it had effect since sometimes this issue doesn't come up for serveral days (especially with continous delivery).

EDIT: sad to report we're still seeing it going down :( even with this fix.

@anuchandy
Copy link
Member

Hello @nomisRev, underneath the ServiceBusReceiverAsyncClient also designed to recover from endpoint (connection, session, link) closure. If for some reason, ServiceBusReceiverAsyncClient object cannot recover, then it is designed to notify the application-subscriber about termination (by calling onComplete or onError). I assume in your case, the application-subscriber is not receiving one of these terminal notifications - Right? Do you happen to have the logging enabled?

  1. If logs are available, could you share those?
  2. If not, would you enable the VERBOSE|DEBUG log and share the logs when this happens next time? I would like to understand what caused the receiver to stop, and fix it

The logging instruction is here: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/docs/troubleshooting.md

@nomisRev
Copy link

nomisRev commented Mar 11, 2022

Hey @anuchandy,

underneath the ServiceBusReceiverAsyncClient also designed to recover from endpoint (connection, session, link) closure.

Could you point me to this code? I could not find it in the SDK, and as I mentioned in the comment above porting the fix from ProcessorClient seems to have solve the issue.

If for some reason, ServiceBusReceiverAsyncClient object cannot recover, then it is designed to notify the application-subscriber about termination (by calling onComplete or onError). I assume in your case, the application-subscriber is not receiving one of these terminal notifications - Right?

At no point an error is being received in onError as far as we do not see it coming up in the logs, and I'm curious to know in what case onComplete would be called for receiveMessages which should run indefinitely (right?).

The onComplete case might not be handled correctly in this case, since when we detect onComplete I guess we should manually restart the receiver since we want to receiving of messages to never stop.

Do you happen to have the logging enabled?

We do have logging enabled. Both custom logging, and Azure AppInsights but comparing to the other issues that reported this issue we cannot find any additional logging that points in the direction of any other issues. I will check again next week, and share logs here.

I'm also going to add additional logging to track onComplete and onError.

@conniey conniey added the bug This issue requires a change to an existing behavior in the product in order to be resolved. label Apr 26, 2022
@ghost ghost added the needs-team-attention This issue needs attention from Azure service team or SDK team label Apr 26, 2022
@conniey conniey removed the question The issue doesn't require a change to the product in order to be resolved. Most issues start as that label Apr 26, 2022
@maksatbolatov
Copy link

Hi @nomisRev,

Can you please share with code snippet where you check the ServiceBusConnectionProcessor of the ServiceBusReceiverAsyncClient and close/recreate the ServiceBusReceiverAsyncClient when the channel is closed?

We are facing the same issue. After some time ServiceBusReceiverAsyncClient stops receiving messages.

@nomisRev
Copy link

Hey @maksatbolatov,

Of course. I have the code written in Kotlin with some custom things, but here is a translation of the code to Project Reactor in Java. if you have any questions I'd be happy to help :)

import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.lang.reflect.Field;
import java.time.Duration;

class WorkAround {
    public static Flux<ServiceBusReceivedMessage> receiveMessagesForever(ServiceBusReceiverAsyncClient client) {
        return client.receiveMessages()
            .takeUntilOther(connectionClosed(client))
            .concatWith(receiveMessagesForever(client));
    }

    public static Mono<ServiceBusConnectionProcessor> conn(ServiceBusReceiverAsyncClient client) {
        return Mono.fromCallable(() -> {
            Field field = client.getClass().getDeclaredField("connectionProcessor");
            field.setAccessible(true);
            return (ServiceBusConnectionProcessor) field.get(client);
        });
    }

    public static Mono<Void> connectionClosed(ServiceBusReceiverAsyncClient client) {
        return conn(client).flatMap((connection) ->
            Flux.interval(Duration.ofSeconds(30))
                .map((ignored) -> connection.isChannelClosed())
                .takeUntil((isChannelClosed) -> !isChannelClosed)
                .then()
        );
    }
}

@mseiler90
Copy link

mseiler90 commented Jun 24, 2022

We are seeing similar issues in our production environment where we have an Azure App Service connected to Service Bus with multiple queues either sending to, receiving from, or both depending on the queue and business flow. We are using the ServiceBusSenderAsyncClient and ServiceBusReceiverAsyncClient in this App Service. We have 4 randomly occurring occasions where a queue just starts filling up with messages and the receiver is no longer picking up messages. We get a alerted as we have monitoring alerts setup, but at this time, the quick "fix" is to restart the App Service and it will start picking up the messages again. We haven't been able to quite figure out what exactly is causing the connection to randomly be lost, but regardless, we would expect that even if there were a network issue or Service Bus server error, the ServiceBusReceiverAsyncClient would be able to reestablish a connection.

I have found a way to replicate this in our nonprod environment by doing the following:

  1. The App Service is up and running and connected to Service Bus
  2. Go to the Service Bus in the Azure Portal and to a queue that you are sending and receiving from and "disable" the queue
  3. Wait a few minutes and then make it "active" again
  4. Run whatever logic in your App Service that will send a message to the queue and then expect that it would be picked up from the queue

At this point, the sending of the message to the queue works as expected without issues. The connection on the send was reestablished, however, the message just sits in the queue and will never get picked up by the receiver because it has not reconnected.

This may not be exactly what is happening in production to cause the lost connection to begin with, but we should expect that after making the queue "active" again in this test that our ServiceBusReceiverAsyncClient would have noticed the lost connection and made a new one. Or at least attempt to make a new connection until it succeeds. Even setting AmqpRetryOptions on the ServiceBusClientBuilder doesn't help.

It seems like there are a number of both open and closed issues that are similar to this issue (though not specifically this and seeing some similar ones that were for eventhubs and blob storage).

@anuchandy
Copy link
Member

Hi @mseiler90, thank you for sharing the observations. I will take a look to understand what is happening when we disable queue.

Could you take a look at these recommendations - #28573 (comment) and #28573 (comment).

I will be moving these recommendations to Java doc.

@mseiler90
Copy link

@anuchandy Thanks for the response. I should have mentioned that we are using Camel in this service and using Camel routes for all of our sending/receiving with Service Bus. Camel is using the ServiceBusSenderAsyncClient and ServiceBusReceiverAsyncClient and perhaps I am just not familiar with it enough, but our expectation would be that we would get this connection retry out of the box and not have to build out custom logic for this. We tried creating our own ServiceBusSenderAsyncClient and ServiceBusReceiverAsyncClient to set AMQPRetryOptions and such in an effort to hopefully resolve the issue until a fix is in place in the azure sdk, but that didn't help and also defeats the purpose of what we thought we would be getting out of the box. It's looking like to get this to work as expected would likely mean a complete rewrite and potentially skip out on Camel since it is relying on these async clients.

@anuchandy
Copy link
Member

anuchandy commented Jun 26, 2022

Hi @mseiler90 thanks for the response.

The ServiceBusReceiverAsyncClient (Aka LowLevelClient) indeed performs connection recoveries underneath so that the Flux (aka receive-flux) from receiveMessages() can transparently notify messages to the application (subscriber).

The cases where receive-flux notify a terminal error (hence no longer emit messages) to the application (subscriber) are -

  1. When the connection encounters a non-retriable error.
  2. Or a series of connection recovery attempts fail in a row which exhausts the max-retry.

A few examples of non-retriable errors are - the app attempting to connect to a queue that does not exist, someone deleting the queue in the middle of receiving, the user explicitly initiating Geo-DR, user disabling the queue (yes the test you did :)). These are certain events where Service Bus service communicates to the SDK that a non-retriable error occurred.

An example of locally initiated non-retriable errors is SDK threads receiving interrupted exceptions from outside the application.

  • "It's By design" that receive-flux (of LowLevelClient) emits such non-retriable error to the application to take appropriate action (most of the time application logs it for auditing and recreates the client).

  • On the other hand, "It's a bug" that if receive-flux (of LowLevelClient) hangs without ever emitting any event (message events, terminal error, or completion event).

I've created a custom class ServiceBusIndefiniteRetryReceiverAsyncClient for reference showing the normal application pattern dealing with non-retriable errors when using LowLevelClient -

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public final class ServiceBusIndefiniteRetryReceiverAsyncClient implements AutoCloseable {
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusIndefiniteRetryReceiverAsyncClient.class);
    // On rare cases when Retry exhausts or a non-retryable error occurs back-off for 4 sec matching
    // default server busy time.
    private static final Duration RETRY_WAIT_TIME = Duration.ofSeconds(4);

    private final String connectionString;
    private final String queueName;
    private final AtomicReference<ServiceBusReceiverAsyncClient> currentLowLevelClient = new AtomicReference<>();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final AtomicBoolean isInitial = new AtomicBoolean(true);

    public ServiceBusIndefiniteRetryReceiverAsyncClient(String connectionString,
                                                        String queueName) {
        this.connectionString = connectionString;
        this.queueName = queueName;
        this.currentLowLevelClient.set(createLowLevelClient());
    }

    public Flux<ServiceBusReceivedMessage> receiveMessages() {
        return Flux.using(
                () -> {
                    if (isClosed.get()) {
                        throw new IllegalStateException("Cannot perform receive on the closed client.");
                    }
                    if (!isInitial.getAndSet(false)) {
                        LOGGER.verbose("Creating a new LowLevelClient");
                        currentLowLevelClient.set(createLowLevelClient());
                    }
                    return currentLowLevelClient.get();
                },
                client ->  {
                    return client.receiveMessages();
                },
                client -> {
                    LOGGER.verbose("Disposing current LowLevelClient");
                    client.close();
                })
            .retryWhen(
                Retry.fixedDelay(Long.MAX_VALUE, RETRY_WAIT_TIME)
                    .filter(throwable -> {
                        if (isClosed.get()) {
                            return false;
                        }
                        LOGGER.warning("Current LowLevelClient's retry exhausted or a non-retryable error occurred.",
                            throwable);
                        return true;
                    }));
    }

    public Mono<Void> complete(ServiceBusReceivedMessage message) {
        final ServiceBusReceiverAsyncClient lowLevelClient = currentLowLevelClient.get();
        return lowLevelClient.complete(message);

    }

    public Mono<Void> abandon(ServiceBusReceivedMessage message) {
        final ServiceBusReceiverAsyncClient lowLevelClient = currentLowLevelClient.get();
        return lowLevelClient.abandon(message);
    }

    public Mono<Void> deadLetter(ServiceBusReceivedMessage message) {
        final ServiceBusReceiverAsyncClient lowLevelClient = currentLowLevelClient.get();
        return lowLevelClient.deadLetter(message);
    }

    @Override
    public void close() throws Exception {
        if (!isClosed.getAndSet(true)) {
            this.currentLowLevelClient.get().close();
        }
    }

    private ServiceBusReceiverAsyncClient createLowLevelClient() {
        return new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .receiver()
            .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
            .queueName(queueName)
            .disableAutoComplete()
            .maxAutoLockRenewDuration(Duration.ZERO)
            .prefetchCount(0)
            .buildAsyncClient();
    }
}

One last case where receive-flux terminates (hence no longer emits messages) is if the application throws an error from within the SDK callback. You should try-catch and log any application exception rather than bubbling it to the SDK. For e.g. it means, app will need to handle any exception coming from complete/abandon calls as well. Here is an example, showing how to use the above class along with handling exception -

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public final class MessageConsumeExample {
    private static final ClientLogger LOGGER = new ClientLogger(MessageConsumeExample.class);

    private final ServiceBusIndefiniteRetryReceiverAsyncClient client;

    public MessageConsumeExample() {
        String connectionString = System.getenv("CON_STR");
        String queueName = System.getenv("Q_NAME");
        client = new ServiceBusIndefiniteRetryReceiverAsyncClient(connectionString, queueName);
    }

    public void handleMessages() {
        client.receiveMessages()
            .flatMapSequential(new Function<ServiceBusReceivedMessage, Publisher<State>>() {
                @Override
                public Publisher<State> apply(ServiceBusReceivedMessage message) {
                    return handleMessage(message)
                        .onErrorResume(new Function<Throwable, Mono<State>>() {
                            @Override
                            public Mono<State> apply(Throwable businessError) {
                                try {
                                    client.abandon(message).block();
                                    return Mono.just(State.MESSAGE_ABANDONED);
                                } catch (Throwable abandonError) {
                                    LOGGER.warning("Couldn't abandon message {}", message.getMessageId(), abandonError);
                                    return Mono.just(State.MESSAGE_ABANDON_FAILED);
                                }
                            }
                        })
                        .flatMap(state -> {
                            if (state == State.HANDLING_SUCCEEDED) {
                                try {
                                    client.complete(message).block();
                                    return Mono.just(State.MESSAGE_COMPLETED);
                                } catch (Throwable completionError) {
                                    LOGGER.warning("Couldn't complete message {}", message.getMessageId(), completionError);
                                    return Mono.just(State.MESSAGE_COMPLETION_FAILED);
                                }
                            } else {
                                return Mono.just(state);
                            }
                        });
                }
            }, 1, 1)
            .then()
            .subscribe();
    }

    private Mono<State> handleMessage(ServiceBusReceivedMessage message) {
        // A business logic taking 5 seconds to process the message which randomly fails.
        return  Mono.fromCallable(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 1;
            })
            .flatMap(ignored -> {
                LOGGER.info("Handling message: " + message.getMessageId());
                final boolean handlingSucceeded = Math.random() < 0.5;
                if (handlingSucceeded) {
                    return Mono.just(State.HANDLING_SUCCEEDED);
                } else {
                    return Mono.error(
                        new RuntimeException("Business logic failed to handle message: " + 
                            message.getMessageId()));
                }
            });
    }

    private enum State {
        HANDLING_SUCCEEDED,
        MESSAGE_COMPLETED,
        MESSAGE_ABANDONED,
        MESSAGE_COMPLETION_FAILED,
        MESSAGE_ABANDON_FAILED
    }
}

I'm unfamiliar with the camel framework and don't know how they do the bridging with Azure SDK. Do you have some cycles to create a git-repo with a sample runnable Camel project showing the following? -

  1. Setting Service Bus configurations (e.g., AmqpRetryOptions, etc.) in the Camel project.
  2. Receive messages from Service Bus using Camel.
  3. Listening for terminal error from Receive using Camel.
  4. "Explicitly" completing / abandoning the Service Bus message using Camel.

Also, a minimal README to run the code locally.

@mseiler90
Copy link

@anuchandy thank you for such a detailed response. I can try to get a camel example in the next couple days. I also tried the ServiceBusProcessorClient in a simple test service (not using Camel) and did the same "disable" the Service Bus queue test like I mentioned previously and with that it had no problems reconnecting and picking up messages again. Seems to align with what @nomisRev was describing.

I did want to add some log messages that I got after enabling WARN level logs in prod and then this issue occurred again yesterday. Fortunately I have a monitor/alert in place watching queue depths as a way to alert me when this happens. I looked through logs that Application Insights was able to give me and pulled out any of the relevant logs that I could find for when this happened again yesterday. I will post them below to see if that maybe gives any additional insight for you or anyone else. Apologies for the format, but I didn't find a good way to export them in a nice looking log stream format out of Application Insights for just the specific logs I wanted to share here:

1:45:23 PM 
{"az.sdk.message":"Transient error occurred.","exception":"The link 'G15:47625:decision-response-queue_765c20_1656006008739' is force detached. Code: consumer(link64). Details: AmqpMessageConsumer.IdleTimerExpired: Idle timeout: 00:10:00. TrackingId:5892c658000007e90000004062b6c3b7_G15_B31, SystemTracker:name-of-servicebus-suppressed:Queue:decision-response-queue, Timestamp:2022-06-25T17:45:18, errorContext[NAMESPACE: name-of-servicebus-suppressed.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: decision-response-queue, REFERENCE_ID: decision-response-queue_765c20_1656006008739, LINK_CREDIT: 0]","linkName":"n/a","entityPath":"n/a","attempt":1,"retryAfter":4511}

All remaining logs are 1:46:23PM 
{"az.sdk.message":"signalWorkQueue failed before reactor closed.","exception":null,"connectionId":"MF_9d93a5_1656006008658"}

{"az.sdk.message":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","exception":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","connectionId":"MF_9d93a5_1656006008658"} *** This log is repeated 7 times

{"az.sdk.message":"Unable to open send and receive link.","exception":"Unable to open send and receive link.","connectionId":"MF_9d93a5_1656006008658","linkName":"decision-response-queue-mgmt"}

{"az.sdk.message":"Retry attempts exhausted or exception was not retriable.","exception":"Unable to open send and receive link.","connectionId":"MF_9d93a5_1656006008658","entityPath":"decision-response-queue/$management","retry":1}

{"az.sdk.message":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","exception":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","connectionId":"MF_9d93a5_1656006008658"}

{"az.sdk.message":"Unable to open send and receive link.","exception":"Unable to open send and receive link.","connectionId":"MF_9d93a5_1656006008658","linkName":"cbs"}

{"az.sdk.message":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","exception":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","connectionId":"MF_9d93a5_1656006008658"}

{"az.sdk.message":"Retry attempts exhausted or exception was not retriable.","exception":"Unable to open send and receive link.","connectionId":"MF_9d93a5_1656006008658","entityPath":"$cbs","retry":1}

{"az.sdk.message":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","exception":"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor.","connectionId":"MF_9d93a5_1656006008658"}

java.lang.RuntimeException: Unable to open send and receive link.

Error occurred. Passing downstream.
Cannot perform operations on a disposed receiver.

Unable to 'complete' message.
Cannot perform operations on a disposed receiver.

{"az.sdk.message":"Cannot perform operations on a disposed receiver.","exception":"Cannot perform operations on a disposed receiver.","entityPath":"decision-response-queue","linkName":"decision-response-queue_765c20_1656006008739"}

{"az.sdk.message":"Unable to emit shutdown signal.","connectionId":"MF_9d93a5_1656006008658","entityPath":"decision-response-queue","linkName":"decision-response-queue_765c20_1656006008739","signalType":"onComplete","emitResult":"FAIL_TERMINATED"}

@anuchandy
Copy link
Member

anuchandy commented Jun 27, 2022

Hi @mseiler90, I should have mentioned it in my last comment; The ServiceBusProcessorClient (aka HighLevelClient) internally uses ServiceBusReceiverAsyncClient (aka LowLevelClient), such that it listens for any error from the LowLevelClient (i.e., non-retriable, max-retry exhausted) and recreate the internal LowLevelClient. The logic is somewhat similar to the custom class ServiceBusIndefiniteRetryReceiverAsyncClient I shared in the previous comment.

Now, coming to the "disable" test you did with HighLevelClient, it recovered because "disable" results in a non-retriable "terminal" error (from the service) that the current LowLevelClient threw asynchronously, and HighLevelClient recreated a new LowLevelClient.

This is slightly different from Simon's case; from his comment, it appears that his app listens to "terminal" error from LowLevelClient, but he seems to run into a situation where the LowLevelClient never threw a "terminal" error hence app never get to take action.

Looking at the log you shared - It seems there was a failure in the complete attempt (I don't understand the whole picture here), but the gist is - an exception happened on the complete call, either the application or Camel didn't handle it hence error thrown back to SDK from callback, which results in receive-flux to terminate. I explained this in the previous comment above the MessageConsumeExample code.

This is why I requested the Camel sample project; I don't know how Camel internally wires complete calls (or other disposition calls), but a sample project should help find it.

I hope it helps to clarify.

@nomisRev
Copy link

Hey @anuchandy,

I can confirm that we're not throwing an exception from our own code, all our code is wrapped in try/catch where it interleaves code with the Azure SDK. So we have try/catch around our processing of the messages, as well as around the settlement of the messages. We have added logs everywhere in our Azure interaction and cannot see errors coming from Azure nor from our own code. We're seeing this on Topics, and we don't seem to be having issues with Queues btw.

As mentioned in my comments above, HighLevelClient doesn't seem to have this issue because it implements a patch around LowLevelClient. I am curious why this patch is placed inside the HighLevelClient rather than the LowLevelClient, the LowLevelClient should also act on changes in the ServiceBusConnectionProcessor#isChannelClosed no?

Currently I can also see that HighLevelClient is polling the public boolean isChannelClosed() state, but it seems this can also be implemented in a reactive manner to improve perf and reaction time.

I'm happy to discuss further, or share any more details if you need more information. I can sadly not share our production code, but I can rewrite them as I've done above to share a patch. We seem to have success with this patch, but more time / battle testing in production will confirm that.

@mseiler90
Copy link

Hey @anuchandy,

I made a quick demo camel repo which can be found here https://github.com/mseiler90/camel-demo

You asked for a number of configurations which I did not include because Camel is taking care of everything for us by default. We have the ability to set certain configurations as seen in the Camel documentation here https://camel.apache.org/components/3.17.x/azure-servicebus-component.html, but I wanted to create this using the "out of the box" and default settings which we are currently using in production. For example, we are not disabling auto-complete, we aren't setting our own retry options, and we don't have any custom logic built around the Service Bus clients. We have tried tests with things like setting retry options, but nothing has helped. As you can see by the example repo, Camel with the help of the Azure SDK is taking care of all of our configurations for us as it should.

For testing, if you just update the application.yml file with a Service Bus connection string and a queue name, then you should be able to just start it up in your IDE and it will automatically put a message on the queue every 10 seconds. There is a Camel route that is listening on that same queue and just logs out the body of the message. Once that is up and running, if you do the "disable" on the queue and after several seconds or so, reactivate the queue. You will see that messages still get sent to the queue, but will no longer get picked up until restarting the application.

As we already have discussed, the High Level Client would successfully reconnect on the receiver, but the Low Level Client doesn't. We just would like to see the same "isChannelClosed" logic in the Low Level Client and it seems everything should work as we would hope for.

I do agree that we have an issue causing the connection to get lost to begin with, but regardless of it losing the connection to begin with, we should be able to automatically reconnect on the receiver without writing custom logic to do so. I did find some additional information on our production issue. It seems that this has only been happening a specific queue where we are picking up the message and then sending a REST API call to another service. I have found logs in Application Insights showing that there was a java.net.SocketException: Connection timed out (Read failed) for the API call at around the same time the Service Bus issue occurs. Looking back at all occurrences (which is like 8 times now in the past couple weeks), we see this connection timeout error at the same time. So it seems that may be causing the lost connection on the queue somehow and we'll need to investigate why that connection timeout is occurring

@anuchandy
Copy link
Member

anuchandy commented Jun 28, 2022


Hi @nomisRev, to give you some background, the messages arrive via an amqp-link that has the life cycle "open-event -> message-delivery-events -> close-event". The LowLevelClient creates a new amqp-link when the current amqp-link emits a close-event. You asked me in the past where this link recovery logic is, which I missed to follow up; the logic is in the class "ServiceBusReceiveLinkProcessor" here.

An amqp-session owns the amqp-link, and multiple amqp-session are owned by an amqp-connection. Both of these ancestors have the life-cycle, and has recovery logic. The amqp-connection recovery logic is in the generic class "AmqpChannelProcessor" here .

As you can see, the underlying types "react" to close (successful or error) events by creating a new amqp-link / amqp-connection (collectively called channels).

The expectation is that these "reactive" logic should take care of recovery, but there were edge cases (which are very hard to repro) where these "reactive" handlers never get to kick off recovery due to signal missing. If you go through the SDK changelog, you can find multiple signal loss root causing via DEBUG logs. A signal loss bug fix for the "reactive" recovery is coming in mid-July release (details here).

The historical reason for "HighLevelClient" using a timer to "proactive" channel check (in addition to "LowLevelClient"'s "reactive" channel check) is something I need to check.

Interestingly, you're not (so far) seem to have recovery problems with the queue entity but with the topic entity. I'll kick off a long-running topic test with DEBUG enabled to see if we can get some helpful log.


Hi @mseiler90, thanks for the Camel project; I'll use it to debug to understand how Camel is doing wiring with SDK.

But your observation "It seems that this has only been happening a specific queue where we are picking up the message and then sending a REST API call to another service" is giving some hint. I think what could be happening here is -

  1. A subset of REST API call blocks a long time, eventually throwing read-timeout.
  2. Let's say those problematic API call blocks for 90 seconds, which will block the SDK callback for that period. I.e., no message will be read from the queue for 90 sec, leading to an increase in queue depth.
  3. Then eventually, API calls throw read timeout; if the app doesn't handle it, it gets propagated to SDK callback which stops the receive-flux (refer my previous comment on exception handling) that further increased the queue depth.

As you investigate the REST API call connection (read) timeout, see if app was running into the above flow.

I'll find sometime go over the Camel sample to understand the Camel's bridging with the SDK and see if there is any issue in bridging.

@mseiler90
Copy link

Thanks @anuchandy. In case it helps, here is a link to the Service Bus component in the Camel repo https://github.com/apache/camel/tree/main/components/camel-azure/camel-azure-servicebus

@jsquire jsquire added this to the Backlog milestone Jun 30, 2022
@mseiler90
Copy link

Hey @anuchandy. Curious if you have had a chance to look into this any further? I see this was added to the Backlog milestone 12 days ago. What exactly does this mean? Is there work that is planned for this?

We have been able to prevent the connection timeout errors from happening on that API call as I had explained above, so we haven't seen this issue happening anymore. However, we are still cautious that if something were to cause the disconnection, we don't have something to automatically health check and reconnect unless we write custom logic which would also mean not using the Camel Service Bus component.

@tux4ever
Copy link

tux4ever commented Feb 2, 2023

Hi, is this issue still in progress? We have switched from the old azure lib to the new one and since then we face the same issue, that the threads are waiting and do not consume any message any longer. Unfortunately this stop occurs quite often, at least once a day, and we do not find a possible solution on our side. We have carefully checked our implementation and all exceptions are catched in our message handler. The bad part is, that we cannot reproduce the unexpected stop easily but we have some ideas. If we set the maxConcurrentCalls property of the service bus processor to 1 everything is working well and the processing of messages never stops. With a value of example: 10 we can force the stop of the processor only if we randomly cut the network connection for a short period of time (< 5 seconds). The threads will stop working and stay in the state "waiting" and never start working again. Our assumption is, that the .block() calls in the ServiceBusReceivedMessageContext line 81 do not have a timeout and due to this missing timeout the thread is eternally blocked. The thread dump reveals following state

java.base@11.0.8/jdk.internal.misc.Unsafe.park(NativeMethod)
java.base@11.0.8/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
java.base@11.0.8/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
java.base@11.0.8/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1039)
java.base@11.0.8/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345)
java.base@11.0.8/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232)app//reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)app//reactor.core.publisher.Mono.block(Mono.java:1707)app//com.azure.messaging.servicebus.ServiceBusReceivedMessageContext.complete(ServiceBusReceivedMessageContext.java:81)

Would it be an option to set a timeout to improve the unexpected stops of messaging processings? Additionally we tried to do some exception handling to recognize this stop of message processing, but unfortunately we could not find a suitable place to receive the exception. Our processor configuration is created with

new ServiceBusClientBuilder()
    .connectionString("connectionString")    
    .processor()
    .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
    .topicName("topicName") 
    .subscriptionName("subscriptionName")
    .prefetchCount(0)
    .maxConcurrentCalls(5)
    .maxAutoLockRenewDuration(Duration.ofMinutes(1L))    
    .disableAutoComplete()
    .processMessage(serviceBusMessageHandler::receiveMessage)    
    .processError(serviceBusMessageHandler::handleError);
public void receiveMessage(final ServiceBusReceivedMessageContext context){
    try    {
        final ServiceBusReceivedMessage message = context.getMessage();
        logDebug(() -> String.format("Message received: Header: %s", message.getApplicationProperties())); 
       final MessageStatus messageStatus = azureMessageProcessor.processMessage(message);        
    }   catch (Exception ex)    {
        LOGGER.error("CRITICAL: error in messaging processing!", ex);
    }
   context.complete();  
}
public void handleError(final ServiceBusErrorContext context) {
    if (LOGGER.isErrorEnabled())    {
        LOGGER.error(String.format("Exception '%s' occurred during '%s' action for entity '%s'", context.getException().getMessage(), context.getErrorSource(), context.getEntityPath()),context.getException());
    }}

But the error callback is never called. Thank your for the support.

@TheDevOps
Copy link

Hi, with regards to #26465 (comment) above we've done a test on a local fork and found that indeed calling the complete() action with a timeout would see the thread recovered after the timeout expired and process messages again as normal. Would be great if someone could provides thoughts about the proposed change in #33298, it has been created in a non breaking and invasive fashion by only offering the option to use an additional method with a timeout input but not changing any current behavior. Thanks!

@liukun-msft
Copy link
Contributor

Hi @tux4ever @TheDevOps
Thanks for your information and contribution! Consider you are using ServiceBusProcessorClient, which is not related to this issue, I created a seperate github issue #33493. Please check my comments there and use that channel to discuss your problem.

@mseiler90
Copy link

@liukun-msft Do you know if there is any further investigation into this issue (not the separate issue you created, but this one)?

@liukun-msft
Copy link
Contributor

Hi @mseiler90, the original question was followed up by @anuchandy. I'll talk to him to know the current stage.

@liukun-msft liukun-msft self-assigned this Feb 20, 2023
@liukun-msft
Copy link
Contributor

Hi @mseiler90

The design of the ServiceBusReceiverAsyncClient (LowLevelClient) is to emit terminal-error to the user if it encounters an non-retriable error or exhaustion of retry.

The Apache Camel plugin is not taking care of re-creating the ServiceBusReceiverAsyncClient when this terminal error happens.

From the log shared by you, we can see that the ServiceBusReceiverAsyncClient emits a terminal error, and the Apache Camel plugin logs the error and assumes the same ServiceBusReceiverAsyncClient will work. This assumption that plugin making is not correct. Please see @anuchandy's previous explanation in this thread.

As we don't own or have expertise in the Apache Camel plugin, we suggest you to open a ticket in the Apache Camel plugin repo to use ServiceBusReceiverAsyncClient correctly or use the ServiceBusProcessorClient (HighLevelClient).

azure-sdk pushed a commit to azure-sdk/azure-sdk-for-java that referenced this issue Nov 2, 2023
@anuchandy
Copy link
Member

As explained in the previous comments (here and here), this is external to the SDK. Closing this.

@github-actions github-actions bot locked and limited conversation to collaborators Apr 16, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug This issue requires a change to an existing behavior in the product in order to be resolved. Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention This issue needs attention from Azure service team or SDK team pillar-reliability The issue is related to reliability, one of our core engineering pillars. (includes stress testing) Service Bus
Projects
Status: Done
Development

No branches or pull requests