Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[*ERS] AMQP concurrent_requests basic.ack exception #4146

Closed
Integration-IT opened this issue Oct 12, 2023 · 4 comments
Closed

[*ERS] AMQP concurrent_requests basic.ack exception #4146

Integration-IT opened this issue Oct 12, 2023 · 4 comments
Assignees

Comments

@Integration-IT
Copy link

Hi Teams,

Insprired from AMQPER / amqp_it_test.go to configure the consumer, I have unexpectedly closed TCP connection between MQserver and Cgrates.

[*ERS] *amqp_json_map TO [*ESS] *els : ERROR.
[*ERS] *amqp_json_map queue A TO [*ESS] *amqp_json_map queue B : ERROR.

The context is to consume the CDR in queue A with the *ERS, and use the *ESS to push the valuable CDR as *rated mode with cost details to queue B for a third party.
OR
Consume the CDR in queue A with the *ERS, and use the *ESS to push the valuable CDR with cost details to ELK for a third party analysis.

Both contexts are working very well until concurrent_requests is set to a very low value.
Actualy not possible to replicate the amqp_it_test.go configuration, got closed TCP connection after few seconds from MQServer.

The main vector to replicate is to play with the "concurrent_requests" parameter and keep all others according to the amqp_it_test.go.


CGRateS@v0.11.0~dev


test:

  • ers "concurrent_requests" : 1 --> STABLE
  • ers "concurrent_requests" : 2 --> operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 1
  • ers "concurrent_requests" : 10 --> operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 1, operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 5, operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 3.
  • ers "concurrent_requests" : 100 --> operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 10, operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 19
@Integration-IT
Copy link
Author

Delivery Identifiers: Delivery Tags
Before we proceed to discuss other topics it is important to explain how deliveries are identified (and acknowledgements indicate their respective deliveries). When a consumer (subscription) is registered, messages will be delivered (pushed) by RabbitMQ using the basic.deliver method. The method carries a delivery tag, which uniquely identifies the delivery on a channel. Delivery tags are therefore scoped per channel.

Delivery tags are monotonically growing positive integers and are presented as such by client libraries. Client library methods that acknowledge deliveries take a delivery tag as an argument.

Because delivery tags are scoped per channel, deliveries must be acknowledged on the same channel they were received on. Acknowledging on a different channel will result in an "unknown delivery tag" protocol exception and close the channel.

@Integration-IT
Copy link
Author

@danbogos please bump 2 000 000 CDR in stress test are locked

ionutboangiu added a commit to ionutboangiu/cgrates that referenced this issue Oct 16, 2023
Previously, msg.Ack(true) was used, which is mostly used for batch
processing. It mistakenly acknowledged all previously unacknowledged
messages, causing errors from the AMQP server. Now, messages are
acknowledged individually after each one is processed.

Messages that ERs failed to process are now rejected and requeued
for future processing attempts.

The reader is now closed immediately if the message delivery
channel closes. Therefore, it prevents an endless loop by avoiding
continuous consumption from empty or closed channels.

Addresses: cgrates#4146
danbogos pushed a commit that referenced this issue Oct 16, 2023
Previously, msg.Ack(true) was used, which is mostly used for batch
processing. It mistakenly acknowledged all previously unacknowledged
messages, causing errors from the AMQP server. Now, messages are
acknowledged individually after each one is processed.

Messages that ERs failed to process are now rejected and requeued
for future processing attempts.

The reader is now closed immediately if the message delivery
channel closes. Therefore, it prevents an endless loop by avoiding
continuous consumption from empty or closed channels.

Addresses: #4146
@ionutboangiu
Copy link
Collaborator

Hi,

Thanks for the find and for providing detailed info. This issue should be solved by the latest commit on master. Let us know if you have any other issues.

Also, regarding your latest comment:

@danbogos please bump 2 000 000 CDR in stress test are locked

Could you please clarify what you meant by this?

Thanks,
Ionuț

ionutboangiu added a commit to ionutboangiu/cgrates that referenced this issue Oct 16, 2023
Previously, msg.Ack(true) was used, which is mostly used for batch
processing. It mistakenly acknowledged all previously unacknowledged
messages, causing errors from the AMQP server. Now, messages are
acknowledged individually after each one is processed.

Messages that ERs failed to process are now rejected and requeued
for future processing attempts.

The reader is now closed immediately if the message delivery
channel closes. Therefore, it prevents an endless loop by avoiding
continuous consumption from empty or closed channels.

Addresses: cgrates#4146
danbogos pushed a commit that referenced this issue Oct 16, 2023
Previously, msg.Ack(true) was used, which is mostly used for batch
processing. It mistakenly acknowledged all previously unacknowledged
messages, causing errors from the AMQP server. Now, messages are
acknowledged individually after each one is processed.

Messages that ERs failed to process are now rejected and requeued
for future processing attempts.

The reader is now closed immediately if the message delivery
channel closes. Therefore, it prevents an endless loop by avoiding
continuous consumption from empty or closed channels.

Addresses: #4146
@Integration-IT
Copy link
Author

Hi @ionutboangiu ,

Tested with 1 to 1024 // requests.
Thank you, no error, tags are managed, no exception.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants