In [1]:
import numpy as np

# Heap

In [2]:
class PriorityQueueBase:
    class _Item:
        __slots__ = "_key", "_value"

        def __init__(self, key, value):
            self._key = key
            self._value = value

        def __lt__(self, other):
            return self._key < other._key

    def is_empty(self):
        return len(self) == 0

In [3]:
class Heap(PriorityQueueBase):
    def __init__(self, contents=()):
        self._data = [self._Item(k, v) for k, v in contents]
        if len(self._data) > 1:
            self._heapify()
    
    def __len__(self):
        return len(self._data)
    
    def _parent(self, i):
        return (i - 1) // 2
    
    def _left(self, i):
        return 2 * i + 1
    
    def _right(self, i):
        return 2 * i + 2
    
    def _has_left(self, i):
        return self._left(i) < len(self._data)
    
    def _has_right(self, i):
        return self._right(i) < len(self._data)
    
    def _swap(self, i, j):
        self._data[i], self._data[j] = self._data[j], self._data[i]
    
    def _downheap(self, i):
        if self._has_left(i):
            child = left = self._left(i)
            if self._has_right(i):
                right = self._right(i)
                if self._data[right] < self._data[left]:
                    child = right
            if self._data[child] < self._data[i]:
                self._swap(i, child)
                self._downheap(child)
    
    def _upheap(self, i):
        parent = self._parent(i)
        if i > 0 and self._data[i] < self._data[parent]:
            self._swap(i, parent)
            self._upheap(parent)
    
    def _heapify(self):
        parent = self._parent(len(self._data) - 1)
        for i in range(parent, -1, -1):
            self._downheap(i)

    def add(self, k, v):
        self._data.append(self._Item(k, v))
        self._upheap(len(self._data) - 1)
    
    def min(self):
        if self.is_empty():
            raise ValueError("Priority queue is empty.")
        item = self._data[0]
        return (item._key, item._value)
    
    def remove_min(self):
        if self.is_empty():
            raise ValueError("Priority queue is empty.")
        self._swap(0, len(self._data) - 1)
        item = self._data.pop()
        self._downheap(0)
        return (item._key, item._value)

In [4]:
L = Heap([(k, k) for k in np.random.randint(0, 100, 10)])

In [5]:
for _ in range(len(L)):
    print(L.remove_min()[0])

9
9
21
23
26
35
58
66
73
93


The improvement that heap brings is by bringing the run-time of both add/remove from O(n) to O(logn). This is due to the fact that we have complete binary tree which has height __floor(logn)__. With bottom-up construction of the heap, we can improve the efficiency from __O(nlogn)__ to __O(n)__.

|Operation | Running Time |
| --- | --- |
| len | O(1) |
| is empty | O(1) |
| add | O(logn) |
| min | O(1) |
| remove_min | O(logn) |

# Heap Sort

In [9]:
class HeapSort:
    
    @staticmethod
    def sort(arr):
        n = len(arr)
        HeapSort._heapify(arr)
        for i in range(1, n):
            HeapSort._swap(arr, 0, n - i)
            HeapSort._downheap(arr, 0, n - i)

    @staticmethod
    def _swap(arr, i, j):
        arr[i], arr[j] = arr[j], arr[i]
    
    @staticmethod
    def _heapify(arr):
        parent = (len(arr) - 1) // 2
        for i in range(parent, -1, -1):
            HeapSort._downheap(arr, i, len(arr) - 1)

    @staticmethod
    def _downheap(arr, i, j):
        if (2 * i + 1) < j:
            child = left = 2 * i + 1
            if (2 * i + 2) < j:
                right = 2 * i + 2
                if arr[right] < arr[left]:
                    child = right
            if arr[child] < arr[i]:
                HeapSort._swap(arr, i, child)
                HeapSort._downheap(arr, child, j)

In [10]:
L = np.random.randint(-100, 100, 10)
L = [0, 3, -1, 4, 10]
HeapSort.sort(L)
L

[10, 4, 3, 0, -1]

In [11]:
for _ in range(100):
    L = np.random.randint(-100, 100, 10)
    sorted_L = np.sort(L)[::-1]
    HeapSort.sort(L)
    (L == sorted_L).all()

__Heap Sort__ sorts a collection of n items in O(nlogn). The reason for that is because for each item we may need to do either `downheap` or `upheap` which takes O(logn).

# Adaptable Priority Queue

In [None]:
class AdaptablePriorityQueue(HeapPriorityQueue):
    class Locater(HeapPriorityQueue._Item):
        __slots__ = "_index"

        def __init__(self, k, v, i):
            super().__init__(k, v)
            self._index = i

    def _swap(self, i, j):
        super()._swap(i, j)
        self._data[i]._index = i
        self._data[j]._index = j

    def _bubble(self, i):
        if i > 0 and self._data[i] < self._data[self._parent(i)]:
            self._upheap(i)
        else:
            self._downheap(i)

    def add(self, k, v):
        item = self.Locator(k, v, len(self._data))
        self._data.append(item)
        self._upheap(len(self._data) - 1)
        return item

    def remove(self, loc):
        i = loc._index
        if not (0 <= i < len(self._data) and self._data[i] is loc):
            raise ValueError("Invalid locator")
        if i == len(self._data) - 1:
            item = self._data.pop()
        else:
            self._swap(i, len(self._data) - 1)
            item = self._data.pop()
            self._bubble(i)
        return (loc._key, loc._value)

    def update(self, loc, k, v):
        i = loc._index
        if not (0 <= i < len(self._data) and self._data[i] is loc):
            raise ValueError("Invalid locator")
        loc._key, loc._value = k, v
        self._bubble(i)



|Operation | Running Time |
| --- | --- |
| len | O(1) |
| is empty | O(1) |
| add | O(logn) |
| min | O(1) |
| remove_min | O(logn) |
| remove | O(logn) |
| update | O(logn) |