# Writing a priority queue in Python?

Today, I was talking with a friend of mine, about an interesting problem. They created an internal service to view aggregate data from several backend systems. (At least that is how I understood it.) Their idea was to have several threads that would poll the backends for data and cache them in an internal database, and in case of cache miss on i.e. a page view of something that hasn't been refreshed yet, they would fetch it right-away (because user probably wouldn't want to wait 3 to 5 minutes for the background cache update to finish).

Because they wanted to have unified update interface, they do the background update with a priority queue. If the cache miss happens, just add a high priority request on the top of the queue, and it would be refreshed asap. And the updating would be nice and organized this way.

I imagine their "update request" looks something like this:

In [1]:
class Update:
    count = 0
    def __init__(self, priority, entity):
        Update.count+=1
        self._number=Update.count
        self._priority=priority
        self._entity=entity
        
    def __repr__(self):
        return "%s_%s_%s" % (self._priority, self._number,self._entity.__repr__())
    
    def __hash__(self):
        return self._number;
    
    def __str__(self):
        return self._entity.__str__()
    
    def __lt__(self,other):
        return (self._priority,self._number) < (other._priority,other._number)
    
    def __eq__(self,other):
        return (self._priority,self._number) == (other._priority,other._number)

But there was one problem with this afterwards. It could now happen that a request for the same entity would be pushed into the queue several times and this would put strain on the system. Mostly on the memory, if I recall correctly?

Basically the problem from their part might look like this (if we imagine a priority queue, where lowest goes first):

In [7]:
from queue import Queue, PriorityQueue

q = PriorityQueue()
q.put(Update(10,"Update entity 1"))
q.put(Update(10,"Update entity 2"))
q.put(Update(10,"Update entity 3"))
q.put(Update(10,"Update entity 4"))
q.put(Update(1, "Update entity 2"))
q.put(Update(10,"Update entity 5"))
q.put(Update(1, "Update entity 2"))


def getAll(q):
    while not q.empty():
        yield q.get_nowait()
        
def putAll(q, items):
    for i in items:
        q.put(i)
    return q

def pipeThrough(q,items):
    return list(getAll(putAll(q,items)))
        
[str(x) for x in getAll(q)]

['Update entity 2',
 'Update entity 2',
 'Update entity 1',
 'Update entity 2',
 'Update entity 3',
 'Update entity 4',
 'Update entity 5']

This would result in execution of 
```
["Update entity 2","Update entity 2","Update entity 1","Update entity 2","Update entity 3","Update entity 4","Update entity 5"]
```

but what they would like to see is only execution of:

```
["Update entity 2","Update entity 1","Update entity 3","Update entity 4","Update entity 5"]
```

I thought to myself, "hey, this might be fun to play around with" and tried to do this:

In [3]:
from blist import sorteddict

class PriorityDeduplicatedQueue(Queue):
    def _init(self,maxsize):
        self.que=sorteddict()
        self.items={}
        self.equalityMetric = lambda x:x
    
    def setEqualityMetric(self, newEqualityMetric):
        self.equalityMetric=newEqualityMetric
    
    def _put(self,prioritizedItem):
        item = self.equalityMetric(prioritizedItem)
        if item in self.items:
            oldPrioritizedItem = self.items[item]
            if oldPrioritizedItem > prioritizedItem:
                del self.que[oldPrioritizedItem]
                self.que[prioritizedItem] = item
                self.items[item] = prioritizedItem
        else: 
            self.que[prioritizedItem] = item
            self.items[item] = prioritizedItem
        
    def _get(self):
        key, item = self.que.popitem()
        del self.items[item]
        return key
    
    def _qsize(self):
        return len(self.items)

Core of this is the sorteddict, from blist package, which provides me with a data-structure that is
* sorted and has popitem, making it a decent heap
* supports fast removal of keys


## Testing

First, lets try, that this actually works:

In [4]:
q = PriorityDeduplicatedQueue()
q.put(Update(10,"Update entity 1"))
q.put(Update(10,"Update entity 2"))
q.put(Update(10,"Update entity 3"))
q.put(Update(10,"Update entity 4"))
q.put(Update(1, "Update entity 2"))
q.put(Update(10,"Update entity 5"))
q.put(Update(1, "Update entity 2"))
        
[str(x) for x in getAll(q)]

['Update entity 2',
 'Update entity 2',
 'Update entity 1',
 'Update entity 2',
 'Update entity 3',
 'Update entity 4',
 'Update entity 5']

In [5]:
dq = PriorityDeduplicatedQueue()
dq.setEqualityMetric(lambda x: x._entity)

dq.put(Update(10,"Update entity 1"))
dq.put(Update(10,"Update entity 2"))
dq.put(Update(10,"Update entity 3"))
dq.put(Update(10,"Update entity 4"))
dq.put(Update(1, "Update entity 2"))
dq.put(Update(10,"Update entity 5"))
dq.put(Update(1, "Update entity 2"))
        
[str(x) for x in getAll(dq)]

['Update entity 2',
 'Update entity 1',
 'Update entity 3',
 'Update entity 4',
 'Update entity 5']

The result should look like the thing in the first paragraph:

```
['Update entity 2', 'Update entity 1', 'Update entity 3', 'Update entity 4', 'Update entity 5']
```

Now I would like to use property based testing framework from package 'hypothesis' to verify that my new queue works as intended.

1. if I only push unique items, it should behave the same as 'queue.PriorityQueue'
2. if I only push equal items with different priorities, queue size would allways be 1, with the smallest priority.
3. if I push random items with some duplication to both deduplicated and standard priority queue and then put all of these items to two ordered sets, they should be equal.

For the testing itself I will use the `pipeThrough` utility function that accepts a queue and iterable to fill that queue and returns a list.

First I will define the test for the unique items:

In [34]:
from hypothesis import given, reject, note
from hypothesis.strategies import integers, text, lists, tuples, binary, streaming, sampled_from

hexadecimal = text(alphabet="0123456789abcdef")
uniqueItems = lists(elements=tuples(integers(-10,10),hexadecimal),average_size=20, unique_by=lambda x: x[1])
@given(uniqueItems)
def test1(setsToTest):
    pq = PriorityQueue()
    dq = PriorityDeduplicatedQueue()
    assert pipeThrough(pq,setsToTest) == pipeThrough(dq,setsToTest),\
      "Priority queue contained %s but deduplicated queue had %s" % (qremaining,dqremaining)
    
test1()


Now I will define the test for single item of different priorities.
As I stated previously, there should be only 1 remainint item in queue, of the smallest priority 

In [35]:
priorities = lists(elements=integers(),min_size=2)
@given(priorities)
def test2(p):
    dq = PriorityDeduplicatedQueue()
    dq.setEqualityMetric(lambda x: x._entity)
    
    result = pipeThrough(dq,(Update(x,"entity") for x in p))
    assert len(result) == 1, "There should only be one entry"
    assert result[0]._priority == min(p), "It should have the smallest priority vlaue"

test2()

In [None]:
Now I will define the test for single item of different priorities.
As I stated previously, there should be only 1 remainint item in queue, of the smallest priority 

In [33]:
entities = sampled_from(['Update entity 2',
 'Update entity 1',
 'Update entity 3',
 'Update entity 4',
 'Update entity 5'])

items = lists(elements=tuples(integers(-10,10),entities),average_size=20, unique_by=lambda x: x[1])
@given(items)
def test3(i):
    items = [Update(x,e) for x,e in i]
    
    pq = PriorityQueue()
    dq = PriorityDeduplicatedQueue()
    dq.setEqualityMetric(lambda x: x._entity)
    
    deduplicated =[k._entity for k in pipeThrough(dq,items)]
    ordinary = pipeThrough(pq,items)
    deduplicateOrdinary =[k for k in collections.OrderedDict([k._entity,0] for k in ordinary)]
    assert deduplicated == deduplicateOrdinary, \
      "Priority queue contained %s but deduplicated queue had %s" % (deduplicateOrdinary,deduplicated)

test3()

Beware, that I cheated in my test-cases, `pipeThrough` first fills and then epties the whole que, and this is not really representative of the usuall usecase of Queue ... but if I ever create a package out of this, I will add more representative tests as well. Now I wonder if I can convince my friend to try my `PriorityDeduplicatedQueue` in his project :-)