-
Notifications
You must be signed in to change notification settings - Fork 1.5k
AMQ-9813 - fix wrong QueueSize for non-persistent message with TTL #1551
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -206,7 +206,19 @@ public synchronized LinkedList<MessageReference> pageInList(int maxItems) { | |
| * @throws Exception | ||
| */ | ||
| @Override | ||
| public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { | ||
| public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { | ||
| // Discarding expired message should be done outside of synchronized section (deadlock, see AMQ-5785) | ||
| List<MessageReference> expiredMessages = new ArrayList<>(); | ||
| boolean isExpiration = tryAddMessageLastInternal(node, maxWaitTime, expiredMessages); | ||
| for (MessageReference expiredMessage : expiredMessages) { | ||
| discardExpiredMessage(expiredMessage); | ||
| } | ||
| return isExpiration; | ||
| } | ||
|
|
||
| private synchronized boolean tryAddMessageLastInternal( | ||
| MessageReference node, long maxWaitTime, List<MessageReference> expiredMessages | ||
| ) { | ||
| if (!node.isExpired()) { | ||
| try { | ||
| regionDestination = (Destination) node.getMessage().getRegionDestination(); | ||
|
|
@@ -220,7 +232,7 @@ public synchronized boolean tryAddMessageLast(MessageReference node, long maxWai | |
| } | ||
| if (!hasSpace()) { | ||
| if (isDiskListEmpty()) { | ||
| expireOldMessages(); | ||
| expiredMessages.addAll(expireOldMessages()); | ||
| if (hasSpace()) { | ||
| memoryList.addMessageLast(node); | ||
| node.incrementReferenceCount(); | ||
|
|
@@ -242,7 +254,7 @@ public synchronized boolean tryAddMessageLast(MessageReference node, long maxWai | |
| throw new RuntimeException(e); | ||
| } | ||
| } else { | ||
| discardExpiredMessage(node); | ||
| expiredMessages.add(node); | ||
| } | ||
| //message expired | ||
| return true; | ||
|
|
@@ -254,7 +266,16 @@ public synchronized boolean tryAddMessageLast(MessageReference node, long maxWai | |
| * @param node | ||
| */ | ||
| @Override | ||
| public synchronized void addMessageFirst(MessageReference node) { | ||
| public void addMessageFirst(MessageReference node) { | ||
| // Discarding expired message should be done outside of synchronized section (deadlock, see AMQ-5785) | ||
| List<MessageReference> expiredMessages = addMessageFirstInternal(node); | ||
| for (MessageReference expiredMessage : expiredMessages) { | ||
| discardExpiredMessage(expiredMessage); | ||
| } | ||
| } | ||
|
|
||
| private synchronized List<MessageReference> addMessageFirstInternal(MessageReference node) { | ||
| List<MessageReference> expiredMessages = new ArrayList<>(); | ||
| if (!node.isExpired()) { | ||
| try { | ||
| regionDestination = (Destination) node.getMessage().getRegionDestination(); | ||
|
|
@@ -263,16 +284,16 @@ public synchronized void addMessageFirst(MessageReference node) { | |
| memoryList.addMessageFirst(node); | ||
| node.incrementReferenceCount(); | ||
| setCacheEnabled(true); | ||
| return; | ||
| return expiredMessages; | ||
| } | ||
| } | ||
| if (!hasSpace()) { | ||
| if (isDiskListEmpty()) { | ||
| expireOldMessages(); | ||
| expiredMessages = expireOldMessages(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I just realized that this is actually could be issue because the expireOldMessages() method is going to decrement the reference on each message, but the messages get copied to that temporary list. The problem is that now that we are hanging onto the list reference (to process later outside of the lock) we are more likely to blow memory because the hasSpace() method will think there is free space but those messages are still held in the heap until later. We are only adding one message but if that message happens to be very large it could be an issue.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for very quick feedback and analyze. I think, I got the point, which you mean. But unfortunately I have no idea, how to change/improve it, except the way you already mentioned - put new message to disk (temp storage) immediately without attempt to "dispose" expired messages before switch to disk (if I really got the point). Maybe it will be better to close this PR without merge to give you more comfortable way to make fix/changes/improvements. I am happy, you got the problem, which I tried to report. |
||
| if (hasSpace()) { | ||
| memoryList.addMessageFirst(node); | ||
| node.incrementReferenceCount(); | ||
| return; | ||
| return expiredMessages; | ||
| } else { | ||
| flushToDisk(); | ||
| } | ||
|
|
@@ -289,8 +310,9 @@ public synchronized void addMessageFirst(MessageReference node) { | |
| throw new RuntimeException(e); | ||
| } | ||
| } else { | ||
| discardExpiredMessage(node); | ||
| expiredMessages.add(node); | ||
| } | ||
| return expiredMessages; | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as in addMessageFirst