Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] ASB: ordered delivery with maxConcurrentSessions > 1 only possible single-threaded #24487

Closed
3 tasks done
p4p4 opened this issue Oct 1, 2021 · 5 comments
Closed
3 tasks done
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Milestone

Comments

@p4p4
Copy link

p4p4 commented Oct 1, 2021

Describe the bug
With a ServiceBusProcessorClient and maxConcurrentSessions > 1, still only one message is being processed simultaneously. The received order of the messages for any session is correct though.

When setting maxConcurrentSessions > 1 AND also maxConcurrentCalls > 1, the throughput is increased, but the order of the messages is not maintained any more.

Expected behaviour
Two (or more) sessions can be processed completely concurrently, while also maintaining the order of messages per session.

To Reproduce
I slightly modified ServiceBusSessionProcessorSample.java, full code can be found below.

First send 20 messages:

  • session A, payload: A-0
  • session A, payload: A-1
  • ...
  • session A, payload: A-9
  • session B, payload: B-0
  • ...
  • session B, payload: B-9

The message processing consumer takes 1 second per message to consume.

Scenario 1: When setting maxConcurrentSessions(2), the consumption of all events still takes 20 seconds.
Scenario 2: When setting maxConcurrentSessions(2) and maxConcurrentCalls(2), it only takes 10 seconds, but the order is not correct any more

Scenario 1 logs:

2021-10-01T11:33:18.498241Z Processing message. 8f4ae8ea4f214c2bb5f69bf1a1c97bd5 Session: A, Sequence #: 201. Contents: A-0
2021-10-01T11:33:19.565135Z Processing message. e0e9925de54244a8a77a8b5f380034ae Session: B, Sequence #: 211. Contents: B-0
2021-10-01T11:33:20.696463Z Processing message. a637a7b7bf294df683195d8858cd365b Session: A, Sequence #: 202. Contents: A-1
2021-10-01T11:33:21.866021Z Processing message. c669ddfed3a0442db9181c6018adb192 Session: A, Sequence #: 203. Contents: A-2
2021-10-01T11:33:23.006618Z Processing message. 479de17c4ab549f89d902b4099580c37 Session: A, Sequence #: 204. Contents: A-3
2021-10-01T11:33:24.113851Z Processing message. 18e31e646a764d0683090457305b1d25 Session: A, Sequence #: 205. Contents: A-4
2021-10-01T11:33:25.174357Z Processing message. 944064029153474392b7247c7da6fcb9 Session: A, Sequence #: 206. Contents: A-5
2021-10-01T11:33:26.230849Z Processing message. 6f2298e922794064bf0eb5e35f8773c7 Session: A, Sequence #: 207. Contents: A-6
2021-10-01T11:33:27.292331Z Processing message. 14f582915ecf4a93bde464d1d9e3a2ab Session: A, Sequence #: 208. Contents: A-7
2021-10-01T11:33:28.476362Z Processing message. 67d61910bea34bc6b8be7a37db93bb5a Session: A, Sequence #: 209. Contents: A-8
2021-10-01T11:33:29.606050Z Processing message. 200e94b8d3fa442c90557eb219ef656a Session: A, Sequence #: 210. Contents: A-9
2021-10-01T11:33:30.717297Z Processing message. c822898b0b7249f6b401d05ba931c98d Session: B, Sequence #: 212. Contents: B-1
2021-10-01T11:33:31.811627Z Processing message. 470549d6fdf540088716ea741373a7d5 Session: B, Sequence #: 213. Contents: B-2
2021-10-01T11:33:32.979258Z Processing message. fc3a4df59bf046609408ee81af4a85ad Session: B, Sequence #: 214. Contents: B-3
2021-10-01T11:33:34.105473Z Processing message. 3911e41b14ae4d5daccb5decb4a3ad9c Session: B, Sequence #: 215. Contents: B-4
2021-10-01T11:33:35.155853Z Processing message. 31ca3a2720f941c28f713b93d32808d7 Session: B, Sequence #: 216. Contents: B-5
2021-10-01T11:33:36.251573Z Processing message. 6bb0c71952ec487fb82a532c55173407 Session: B, Sequence #: 217. Contents: B-6
2021-10-01T11:33:37.411050Z Processing message. 609faacf70a54fa5af586b223ca82851 Session: B, Sequence #: 218. Contents: B-7
2021-10-01T11:33:38.415045Z Processing message. 1a7b746aeee04ed890c66bd0c3849688 Session: B, Sequence #: 219. Contents: B-8
2021-10-01T11:33:39.420912Z Processing message. 13a17bacbbc24e2e996a9973829606df Session: B, Sequence #: 220. Contents: B-9

Scenario 2 logs:

Especially notice the wrong order of those 2 lines:

2021-10-01T12:15:31.177621Z Processing message. b09598ae11de4f26997aed16193ebf90 Session: A, Sequence #: 268. Contents: A-7
2021-10-01T12:15:31.265148Z Processing message. 49aec11f1d554bf88ea7423e319b1283 Session: A, Sequence #: 267. Contents: A-6
2021-10-01T12:15:22.194543Z Processing message. 452ad69739bf4860961eb4ba814503d5 Session: A, Sequence #: 261. Contents: A-0
2021-10-01T12:15:22.343255Z Processing message. 4a2a6ebba8964eae8823a4ceee81be9a Session: B, Sequence #: 271. Contents: B-0
2021-10-01T12:15:23.346715Z Processing message. 07bc737f20a6444ca5d8fbbb35078fbd Session: B, Sequence #: 272. Contents: B-1
2021-10-01T12:15:23.462585Z Processing message. cb7e61816b1e4a04aed302689b16eb28 Session: A, Sequence #: 262. Contents: A-1
2021-10-01T12:15:24.461149Z Processing message. 958c3510067f44a19fdaf3924b4f14ff Session: B, Sequence #: 273. Contents: B-2
2021-10-01T12:15:24.593921Z Processing message. 2fac107a6d9a4ef59c30a94002cd7586 Session: B, Sequence #: 274. Contents: B-3
2021-10-01T12:15:25.561341Z Processing message. 58b6ecc293cc4bffb80564bcb6f6153a Session: B, Sequence #: 275. Contents: B-4
2021-10-01T12:15:25.751417Z Processing message. 49ab2d8cb1ed4634937027bfc1d0229d Session: B, Sequence #: 276. Contents: B-5
2021-10-01T12:15:26.667184Z Processing message. a2eee98f8ad94f6e8be6f26ac0de720e Session: B, Sequence #: 277. Contents: B-6
2021-10-01T12:15:26.809998Z Processing message. ecc33172880246a7bf900f1c09b4c5f1 Session: B, Sequence #: 278. Contents: B-7
2021-10-01T12:15:27.792291Z Processing message. 448d71954f4e49759176ba27235120ec Session: B, Sequence #: 279. Contents: B-8
2021-10-01T12:15:27.898315Z Processing message. ff66b64d475143e586dda739667af64d Session: B, Sequence #: 280. Contents: B-9
2021-10-01T12:15:28.854356Z Processing message. e7571706322b456098bf8feac9fe8639 Session: A, Sequence #: 263. Contents: A-2
2021-10-01T12:15:28.991704Z Processing message. c5500fa8628a42249ead4e805036d134 Session: A, Sequence #: 264. Contents: A-3
2021-10-01T12:15:29.912469Z Processing message. 61afbe1a8a1b40c094280ae1c3ac9795 Session: A, Sequence #: 265. Contents: A-4
2021-10-01T12:15:30.125805Z Processing message. 552502321d4f4f1191d3948cf82601bc Session: A, Sequence #: 266. Contents: A-5
2021-10-01T12:15:31.177621Z Processing message. b09598ae11de4f26997aed16193ebf90 Session: A, Sequence #: 268. Contents: A-7
2021-10-01T12:15:31.265148Z Processing message. 49aec11f1d554bf88ea7423e319b1283 Session: A, Sequence #: 267. Contents: A-6
2021-10-01T12:15:32.178924Z Processing message. 2be5edc84b7244efbb9cd136253d077b Session: A, Sequence #: 269. Contents: A-8
2021-10-01T12:15:32.269651Z Processing message. 0ed0691aa62649eab1aca2509d9ab077 Session: A, Sequence #: 270. Contents: A-9

Code

package asb.demo;// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.


import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.ServiceBusSenderClient;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;


public class SessionExample {

    String connectionString =
          "Endpoint=sb://........";
    String sessionQueueName = "testSessionQueue";


    public static void main(String[] args) throws InterruptedException {

        SessionExample e = new SessionExample();
        e.produceTestData();
        e.runConsumers();

        TimeUnit.MINUTES.sleep(20);
    }


    void runConsumers() throws InterruptedException {

        // Create an instance of session-enabled processor through the ServiceBusClientBuilder that processes
        // two sessions concurrently.
        ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
              .connectionString(connectionString)
              .sessionProcessor()
              .queueName(sessionQueueName)
              .maxConcurrentSessions(10)
              .maxConcurrentCalls(2)
              .processMessage(SessionExample::processMessage)
              .processError(SessionExample::processError)
              .buildProcessorClient();

        System.out.println("Starting the processor");
        processorClient.start();
        TimeUnit.MINUTES.sleep(20);
        processorClient.close();
    }


    private static void processMessage(ServiceBusReceivedMessageContext context) {
        ServiceBusReceivedMessage message = context.getMessage();
        System.out.printf(Instant.now() + " Processing message. %s Session: %s, Sequence #: %s. Contents: %s%n",
              message.getMessageId(),
              message.getSessionId(),
              message.getSequenceNumber(),
              message.getBody());

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    private static void processError(ServiceBusErrorContext context) {
        System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
              context.getFullyQualifiedNamespace(), context.getEntityPath());

        if (!(context.getException() instanceof ServiceBusException)) {
            System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException());
            return;
        }

        ServiceBusException exception = (ServiceBusException) context.getException();
        System.out.printf("ServiceBusException source: %s. Reason: %s. Is transient? %s%n", context.getErrorSource(),
              exception.getReason(), exception.isTransient());
    }


    void produceTestData() {
        ServiceBusSenderClient sender = new ServiceBusClientBuilder()
              .connectionString(connectionString)
              .sender()
              .queueName(sessionQueueName)
              .buildClient();

        int messagesPerSession = 10;
        generateMessages("A", messagesPerSession, sender);
        generateMessages("B", messagesPerSession, sender);

    }

    List<ServiceBusMessage> generateMessages(String sessionId, int amount, ServiceBusSenderClient sender) {
        List<ServiceBusMessage> messages = new ArrayList<>();
        for (int i = 0; i < amount; i++) {
            String payload = sessionId + "-" + i;
            System.out.print(payload + ", ");
            messages.add(new ServiceBusMessage(payload).setSessionId(sessionId));
            sender.sendMessage(new ServiceBusMessage(payload).setSessionId(sessionId));
        }

        return messages;
    }
}

Setup (please complete the following information):

  • Java 11
  • 'com.azure', name: 'azure-core', version: '1.20.0'
  • 'com.azure', name: 'azure-messaging-servicebus', version: '7.4.1'

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added
@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Oct 1, 2021
@joshfree joshfree added Client This issue points to a problem in the data-plane of the library. Service Bus labels Oct 4, 2021
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Oct 4, 2021
@joshfree
Copy link
Member

joshfree commented Oct 4, 2021

@ki1729 could you follow up with @p4p4 on their github issue?

@p4p4
Copy link
Author

p4p4 commented Oct 28, 2021

This seems to be the same issue as described here:
#24047

@ramya-rao-a ramya-rao-a added this to the [2022] March milestone Feb 11, 2022
@ramya-rao-a
Copy link
Contributor

Hey @p4p4

Apologies for the late reply.

This behavior is by design.
Though the client gets the messages in order within the session from the service, choosing maxConcurrentCalls > 1 results in having parallel threads. The scheduling of these threads would be out of control of the client.

@ramya-rao-a ramya-rao-a added the issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. label Feb 14, 2022
@ghost
Copy link

ghost commented Feb 14, 2022

Hi @p4p4. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text “/unresolve” to remove the “issue-addressed” label and continue the conversation.

@ghost
Copy link

ghost commented Feb 21, 2022

Hi @p4p4, since you haven’t asked that we “/unresolve” the issue, we’ll close this out. If you believe further discussion is needed, please add a comment “/unresolve” to reopen the issue.

@ghost ghost closed this as completed Feb 21, 2022
@github-actions github-actions bot locked and limited conversation to collaborators Apr 11, 2023
This issue was closed.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Projects
None yet
Development

No branches or pull requests

4 participants