Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus][7.0.0][Bug] get_queue_receiver will lock 1 message #15555

Closed
DzcleisureCN opened this issue Nov 26, 2020 · 7 comments
Closed

[ServiceBus][7.0.0][Bug] get_queue_receiver will lock 1 message #15555

DzcleisureCN opened this issue Nov 26, 2020 · 7 comments
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus

Comments

@DzcleisureCN
Copy link

  • Package Name:
    azure-servicebus
  • Package Version:
    7.0.0
  • Operating System:
    Centos
  • Python Version:
    3.7
    Describe the bug
    get_queue_receiver will lock 1 message in the queue.

To Reproduce
Steps to reproduce the behavior:

  1. #There are 5 messages in the queue if use below command can get 5 message.
with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(queue_name, prefetch=10) as receiver:
        received_message_array = receiver.receive_messages(max_message_count=5, max_wait_time=10)
  1. But if the use below command can only get 4 messages it seems get_queue_receiver will lock 1 message.
#There are 5 messages in the queue if use below command can get 4 message.
with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(queue_name, prefetch=10) as receiver:
        pass
    with client.get_queue_receiver(queue_name, prefetch=10) as receiver:
        received_message_array = receiver.receive_messages(max_message_count=5, max_wait_time=10)
  1. Why I ask this because I want separate "receive the message and complete message", but now the complete message must have a receiver, so I use a get_queue_receiver but do not use receive_messages to delete a message, but get_queue_receiver will lock 1 message in the queue.
#I just want like SQS can delete the message anywhere not after receive_messages then delete.
with ServiceBusClient.from_connection_string(connstr) as client:
   #just open a receiver and(do not use receive_messages)  to complete the message but seems lock 1 message in the queue.
    with client.get_queue_receiver(queue_name, prefetch=10) as receiver:
        receiver.complete_message(message)--> this message is input.

Expected behavior
Also can get 5 messages.

#There are 5 messages in the queue if use below command can get 4 message.
with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(queue_name, prefetch=10) as receiver:
        pass
    with client.get_queue_receiver(queue_name, prefetch=10) as receiver:
        received_message_array = receiver.receive_messages(max_message_count=5, max_wait_time=10)

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

@ghost ghost added needs-triage This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Nov 26, 2020
@DzcleisureCN
Copy link
Author

Add another question

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(queue_name, prefetch=10) as receiver:
                #do something
  1. How long the connection of "client" and "receiver" can be used? If do not use the close function can always reuse them in a daemon?
  2. If happens some network or other error, can the "client" and "receiver" re-connected auto?
    Thanks.

@ghost ghost removed the needs-triage This is a new issue that needs to be triaged to the appropriate team. label Nov 30, 2020
@yunhaoling yunhaoling added Client This issue points to a problem in the data-plane of the library. needs-triage This is a new issue that needs to be triaged to the appropriate team. labels Nov 30, 2020
@ghost ghost removed the needs-triage This is a new issue that needs to be triaged to the appropriate team. label Nov 30, 2020
@yunhaoling yunhaoling self-assigned this Dec 1, 2020
@yunhaoling
Copy link
Contributor

yunhaoling commented Dec 2, 2020

hey @DzcleisureCN , thanks for reaching out.

I'm investigating the lock 1 message issue.

to your third point:

"I want separate receive the message and complete message"

are you saying that you want to receive message from one receiver but complete the message using another receiver?
I'm afraid this is not a supported operation yet because each message underlying is bounded to the receiver which receives that message. e.g. if you receive the msg1 by receiver1, and then try to settle the msg1 with a new receiver2 by receiver2.complete_message(msg1), under the ground it is still using the receiver1 to settle the message if the receiver1 remains open. If receiver1 is closed, you will get a ValueError.

sorry we should make it more clear, we would add some notes or make it raise error in a big fix release.

It is not suggested for now to settle messages using a receiver different than the one which receives the message as the settlement actually happens on the specific amqp receiver link.

And I will bring this to the team to see how we could ultimately achieve your goal.


re: another question:

  1. If the receiver is inactive for 10 mins, the service would proactively send a request telling the client to shutdown the connection. Otherwise the client/receiver will remain open as long as there's no non-recoverable error being met.

  2. Yes and no, it depends on the error. For retryable errors, we will do the retry(reconnection) according to the retry config.

    :keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs.
    Default value is 3.
    :keyword float retry_backoff_factor: Delta back-off internal in the unit of second between retries.
    Default value is 0.8.
    :keyword float retry_backoff_max: Maximum back-off interval in the unit of second. Default value is 120.
    .
    for non-recoverable errors, we will raise the error and shutdown the receiver.

Please check the exception part in our readme for more information.

@DzcleisureCN
Copy link
Author

Hi Yunhaoling,
Thanks for your help!

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(queue_name, prefetch=10) as receiver:
                #do something

if the receiver is inactive for 10 mins, the service would proactively send a request telling the client to shut down the connection.
Would I ask how to check the receiver is closed/shut down by the server?
is there any method or property of receiver can use? or just catch the exception when reused the receiver?

Why I ask this is because we find get a receiver usually cost some time, so we want to keep receiver to re-used, but if the receiver closed by the server we need a way to know that and to re-open a new receiver.

# this usually need some time nearly 1s
with client.get_queue_receiver(queue_name, prefetch=10) as receiver:

Thanks

@yunhaoling
Copy link
Contributor

yunhaoling commented Dec 4, 2020

hey @DzcleisureCN , in general you would need to do the latter -- catch the exception

TL,DR: you could reuse the receiver as long as the receiver is not explicitly closed, the sdk could try to reconnect to the service.

  • an explicit close is done by calling receiver.close() or the program executes out of the receiver context manager block, a bit of code might help explain this better:
# some initialization code
work_flag = True
with servicebus_receiver:
    while work_flag:
        try:
            receiver.receive_messages(max_message_count=10)
        exception ServiceBusConnectionError, ServiceBusError:
            # if the sdk has encountered a connection error leading to the connection closed, an error would bubble up
            # but the program execution is still in the context manager block -- with servicebus_receiver:
            # so you could choose to ignore the connection error, and in next service request call the sdk tries to reestablish the connection.
            log.info('error occurred')
            continue

# exiting the with block will close the servicebus_receiver, and it can no longer be used
servicebus_receiver.receive_messages(max_message_count=10)  # this call would raise value error

We also have some sample codes on failure and recovery, you could check it here:
https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/servicebus/azure-servicebus/samples/sync_samples/failure_and_recovery.py

Let me know if this answers your question.

@DzcleisureCN
Copy link
Author

Hi @yunhaoling

Thanks for your help and I had verified your PR, now close the Issue.
Thanks.

@yunhaoling
Copy link
Contributor

hey @DzcleisureCN , are you referring to the PR #15622?

it's still under construction, I need to do more test to convince myself that it is a solid fix😋

@DzcleisureCN
Copy link
Author

hi @yunhaoling
Yes, I patch and test PR #15622. It works for me.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Projects
None yet
Development

No branches or pull requests

2 participants