Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Priority Queues can return items out of order, duplicate/drop items, and have inconsistent replicas from calling IQueue.drainTo with a size limit #26344

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

cyber-andy
Copy link

I'm using priority queues with Hazelcast to send prioritized messages to users. Each user has a queue for incoming messages with various priorities. I changed to dequeue messages in bulk and noticed the drain methods didn't return messages in priority order. I did some digging to figure out why, which led to finding there were other issues. I figured I could fix this myself and open a PR for it.

A bug occurs when a priority queue has at least two items and is offered items that have higher priority than those already in the queue. The items returned by IQueue.drainTo(Collection,int) may be out of order, and they may not be the same items removed from the queue's primary replica if the drain size limit is less than the queue size. The returned items are removed from the backup replicas which desynchronizes them from the primary replica.

This issue comes from these two methods in QueueContainer using both iterator() and poll() to get items from the queue as part of the drain operation:

    public Map<Long, Data> drain(int maxSize) {
        int maxSizeParam = maxSize;
        if (maxSizeParam < 0 || maxSizeParam > getItemQueue().size()) {
            maxSizeParam = getItemQueue().size();
        }
        Map<Long, Data> map = createLinkedHashMap(maxSizeParam);
        mapDrainIterator(maxSizeParam, map);
        if (store.isEnabled() && maxSizeParam != 0) {
            try {
                store.deleteAll(map.keySet());
            } catch (Exception e) {
                throw new HazelcastException(e);
            }
        }
        long current = Clock.currentTimeMillis();
        for (int i = 0; i < maxSizeParam; i++) {
            QueueItem item = getItemQueue().poll();
            // for stats
            age(item, current);
        }
        if (maxSizeParam != 0) {
            scheduleEvictionIfEmpty();
        }
        return map;
    }

    public void mapDrainIterator(int maxSize, Map<Long, Data> map) {
        Iterator<QueueItem> iterator = getItemQueue().iterator();
        for (int i = 0; i < maxSize; i++) {
            QueueItem item = iterator.next();
            if (store.isEnabled() && item.getSerializedObject() == null) {
                try {
                    load(item);
                } catch (Exception e) {
                    throw new HazelcastException(e);
                }
            }
            map.put(item.getItemId(), item.getSerializedObject());
        }
    }

Elements of a PriorityQueue are ordered in the underlying array as a priority heap. Only the head of the queue is guaranteed to be an element with the highest priority.
The documentation for PriorityQueue.iterator() states:

Returns an iterator over the elements in this queue. The iterator does not return the elements in any particular order.

The iterator is used to determine which items to return from the drain operation, but the queue is polled to remove items and update stats. This implementation only works if items are offered to the queue in priority order, which is a valid heap order. This defeats the purpose of priority queues. Some items may be discarded as a side effect if they were polled from the queue but were not in the iterated portion, and are replaced by one of the returned items. This can result in dequeuing the same item more than once. Backup replicas correctly reflect the outcome of the drain operation, so the missing items should come back if a backup is promoted before the queue is used again.

This bug evaded existing tests because either items were offered to the queue in priority order or there were too few items to detect heap ordering from using the iterator.

This PR adds a test for priority queues to verify IQueue.drainTo(Collection,int) returns the next items in priority order and updates QueueAdvancedTest.testDrainBackup() to offer messages in reverse priority order then verify the queue has the same items from when the drain operation completed and after the backup replica is promoted. A fix is provided for QueueContainer that passes these tests.

Fixes #19533 I think, this is the only issue that relates to this bug I could find.

Backport of: N/A

Breaking changes (list specific methods/types/messages):

  • Both IQueue.drainTo methods return items in priority order for priority queues.
    • IQueue.drainTo(Collection,int) returns the next items in priority queue ordering instead of priority heap ordering, which is likely to be a different set of items from the queue.

Checklist:

  • Labels (Team:, Type:, Source:, Module:) and Milestone set
  • Add Add to Release Notes label if changes should be mentioned in release notes or Not Release Notes content if changes are not relevant for release notes
  • Request reviewers if possible
  • New public APIs have @Nonnull/@Nullable annotations
  • New public APIs have @since tags in Javadoc
  • Send backports/forwardports if fix needs to be applied to past/future releases

@hz-devops-test hz-devops-test added the Source: Community PR or issue was opened by a community user label May 25, 2024
@devOpsHazelcast
Copy link
Collaborator

Can one of the admins verify this patch?

2 similar comments
@devOpsHazelcast
Copy link
Collaborator

Can one of the admins verify this patch?

@devOpsHazelcast
Copy link
Collaborator

Can one of the admins verify this patch?

@devOpsHazelcast
Copy link
Collaborator

devOpsHazelcast commented May 25, 2024

CLA assistant check
All committers have signed the CLA.

@devOpsHazelcast
Copy link
Collaborator

Internal PR hazelcast/hazelcast-mono#2073
Internal message only. Nothing to see here, move along

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Source: Community PR or issue was opened by a community user
Projects
None yet
Development

Successfully merging this pull request may close these issues.

IQueue.drainT(Collection,maxElemenet) not removed element from queue
3 participants