# 优先队列基类

In [1]:
from exceptions import Empty

In [2]:
class PriorityQueueBase():
    """Abstract base class for a priority queue."""

    # ------------------------------ nested _Item class ------------------------------
    class _Item():
        """Lightweight composite to store priority queue items."""
        __slots__ = '_key', '_value'

        def __init__(self, k, v):
            self._key = k
            self._value = v

        def __lt__(self, other):
            return self._key < other._key

        def __repr__(self):
            return f'({self._key}, {self._value})'

    # ------------------------------ public behaviors ------------------------------
    def is_empty(self):
        """Return True if the priority queue is empty."""
        return len(self) == 0

    def __len__(self):
        """Return the number of items in the priority queue."""
        raise NotImplementedError('must be implemented by subclass')

    def add(self, key, value):
        """Add a key-value pair."""
        raise NotImplementedError('must be implemented by subclass')

    def min(self):
        """Return but do not remove (k,v) with minimum key.
        Raise Empty exception if empty.
        """
        raise NotImplementedError('must be implemented by subclass')

    def remove_min(self):
        """Remove and return (k,v) with minimum key.
        Raise Empty exception if empty.
        """
        raise NotImplementedError('must be implemented by subclass')


# 基于无序列表的优先队列

In [3]:
from positional_list import PositionalList

In [4]:
class UnsortedPriorityQueue(PriorityQueueBase): 
    """A min-oriented priority queue implemented with an unsorted list."""

    # ----------------------------- nonpublic behavior -----------------------------
    def _find_min(self):
        """Return Position of item with minimum key."""
        if self.is_empty():
            raise Empty('Priority queue is empty')
        small = self._data.first()
        walk = self._data.after(small)
        while walk is not None:
            if walk.element() < small.element():
                small = walk
            walk = self._data.after(walk)
        return small

    # ------------------------------ public behaviors ------------------------------
    def __init__(self):
        """Create a new empty Priority Queue."""
        self._data = PositionalList()

    def __len__(self):
        """Return the number of items in the priority queue."""
        return len(self._data)

    def add(self, key, value):
        """Add a key-value pair."""
        self._data.add_last(self._Item(key, value))

    def min(self):
        """Return but do not remove (k,v) with minimum key.
        Raise Empty exception if empty.
        """
        p = self._find_min()
        item = p.element()
        return (item._key, item._value)

    def remove_min(self):
        """Remove and return (k,v) with minimum key.
        Raise Empty exception if empty.
        """
        p = self._find_min()
        item = self._data.delete(p)
        return (item._key, item._value)
    
    def __repr__(self):
        return f'{self._data}'
    
    def __str__(self):
        return self.__repr__()

In [5]:
P = UnsortedPriorityQueue()
P.add(5, 'A')
P.add(9, 'C')
P.add(3, 'B')
P.add(7, 'D')
print(P)
print(P.min())
print(P.remove_min())
print(P.remove_min())
print(P)
print(P.remove_min())
print(P.remove_min())
print(len(P))

[(5, A), (9, C), (3, B), (7, D)]
(3, 'B')
(3, 'B')
(5, 'A')
[(9, C), (7, D)]
(7, 'D')
(9, 'C')
0


# 基于有序列表的优先队列

In [6]:
class SortedPriorityQueue(PriorityQueueBase):  # base class defines _Item
    """A min-oriented priority queue implemented with a sorted list."""

    # ------------------------------ public behaviors ------------------------------
    def __init__(self):
        """Create a new empty Priority Queue."""
        self._data = PositionalList()

    def __len__(self):
        """Return the number of items in the priority queue."""
        return len(self._data)

    def add(self, key, value):
        """Add a key-value pair."""
        new = self._Item(key, value)  # make new item instance
        walk = self._data.last()  # walk backward looking for smaller key
        while walk is not None and new < walk.element():
            walk = self._data.before(walk)
        if walk is None:
            self._data.add_first(new)  # new key is smallest
        else:
            self._data.add_after(walk, new)  # new goes after walk

    def min(self):
        """Return but do not remove (k,v) with minimum key.
        Raise Empty exception if empty.
        """
        if self.is_empty():
            raise Empty('Priority queue is empty.')
        p = self._data.first()
        item = p.element()
        return (item._key, item._value)

    def remove_min(self):
        """Remove and return (k,v) with minimum key.
        Raise Empty exception if empty.
        """
        if self.is_empty():
            raise Empty('Priority queue is empty.')
        item = self._data.delete(self._data.first())
        return (item._key, item._value)

    def __repr__(self):
        return f'{self._data}'
    
    def __str__(self):
        return self.__repr__()

In [7]:
P = SortedPriorityQueue()
P.add(5, 'A')
P.add(9, 'C')
P.add(3, 'B')
P.add(7, 'D')
print(P)
print(P.min())
print(P.remove_min())
print(P.remove_min())
print(P)
print(P.remove_min())
print(P.remove_min())
print(len(P))

[(3, B), (5, A), (7, D), (9, C)]
(3, 'B')
(3, 'B')
(5, 'A')
[(7, D), (9, C)]
(7, 'D')
(9, 'C')
0


# 基于堆的优先列表

In [8]:
class HeapPriorityQueue(PriorityQueueBase):  # base class defines _Item
    """A min-oriented priority queue implemented with a binary heap."""

    # -------------- public behaviors ----------
    def __init__(self):
        """Create a new empty Priority Queue."""
        self._data = []

    def __len__(self):
        """Return the number of items in the priority queue."""
        return len(self._data)

    # -------------- nonpublic behaviors ---------
    def _parent(self, j):
        return (j-1) // 2

    def _left(self, j):
        return 2*j + 1

    def _right(self, j):
        return 2*j + 2

    def _has_left(self, j):
        return self._left(j) < len(self._data)  # index beyond end of list?

    def _has_right(self, j):
        return self._right(j) < len(self._data)  # index beyond end of list?

    def _swap(self, i, j):
        """Swap the elements at indices i and j of array."""
        self._data[i], self._data[j] = self._data[j], self._data[i]

    def _upheap(self, j):
        parent = self._parent(j)
        if j > 0 and self._data[j] < self._data[parent]:
            self._swap(j, parent)
            self._upheap(parent)  # recur at position of parent

    def _downheap(self, j):
        if self._has_left(j):
            left = self._left(j)
            small_child = left  # although right may be smaller
            if self._has_right(j):
                right = self._right(j)
                if self._data[right] < self._data[left]:
                    small_child = right
            if self._data[small_child] < self._data[j]:
                self._swap(j, small_child)
                # recur at position of small child
                self._downheap(small_child)

    # ----------- public behaviors ----------------
    def add(self, key, value):
        """Add a key-value pair to the priority queue."""
        self._data.append(self._Item(key, value))
        # upheap newly added position
        self._upheap(len(self._data) - 1)

    def min(self):
        """Return but do not remove (k,v) with minimum key.
        Raise Empty exception if empty.
        """
        if self.is_empty():
            raise Empty('Priority queue is empty.')
        item = self._data[0]
        return (item._key, item._value)

    def remove_min(self):
        """Remove and return (k,v) with minimum key.
        Raise Empty exception if empty.
        """
        if self.is_empty():
            raise Empty('Priority queue is empty.')
        # put minimum item at the end
        self._swap(0, len(self._data) - 1)
        item = self._data.pop()   # and remove it from the list
        self._downheap(0)         # then fix new root
        return (item._key, item._value)

In [9]:
P = HeapPriorityQueue()
P.add(4, 'C'), P.add(5, 'A'), P.add(6, 'Z')
P.add(15, 'K'), P.add(9, 'F'), P.add(7, 'Q')
P.add(20, 'B'), P.add(16, 'X'), P.add(25, 'J')
P.add(14, 'E'), P.add(12, 'H'), P.add(11, 'S')
P.add(13, 'W')
print(P._data)

P.add(2, 'T')
print(P._data)

P.remove_min()
print(P._data)

[(4, C), (5, A), (6, Z), (15, K), (9, F), (7, Q), (20, B), (16, X), (25, J), (14, E), (12, H), (11, S), (13, W)]
[(2, T), (5, A), (4, C), (15, K), (9, F), (7, Q), (6, Z), (16, X), (25, J), (14, E), (12, H), (11, S), (13, W), (20, B)]
[(4, C), (5, A), (6, Z), (15, K), (9, F), (7, Q), (20, B), (16, X), (25, J), (14, E), (12, H), (11, S), (13, W)]


# 选择排序 

In [10]:
def selection_sort(C):
    """implementation of selection sort"""
    n = len(C)
    P = UnsortedPriorityQueue()
    for j in range(n):
        elem = C.delete(C.first())
        P.add(elem, elem)
    for j in range(n):
        (k, v) = P.remove_min()
        C.add_last(v)

# 插入排序 

In [11]:
def insertion_sort(C):
    """implementation of selection sort"""
    n = len(C)
    P = SortedPriorityQueue()
    for j in range(n):
        elem = C.delete(C.first())
        P.add(elem, elem)
    for j in range(n):
        (k, v) = P.remove_min()
        C.add_last(v)

# 堆排序

In [12]:
def heap_sort(C):
    """implementation of selection sort"""
    n = len(C)
    P = HeapPriorityQueue()
    for j in range(n):
        elem = C.delete(C.first())
        P.add(elem, elem)
    for j in range(n):
        (k, v) = P.remove_min()
        C.add_last(v)

In [25]:
import random
import time
n = 10000
L = random.sample(range(1,n+1), n)
C = PositionalList()
for e in L:
    C.add_last(e)

In [26]:
start = time.time()
selection_sort(C)
print(f'ellapsed time of selection sort is {time.time() - start}')

ellapsed time of selection sort is 62.3137264251709


In [28]:
C = PositionalList()
for e in L:
    C.add_last(e)

In [29]:
start = time.time()
insertion_sort(C)
print(f'ellapsed time of selection sort is {time.time() - start}')

ellapsed time of selection sort is 27.88514757156372


In [30]:
C = PositionalList()
for e in L:
    C.add_last(e)

In [31]:
start = time.time()
heap_sort(C)
print(f'ellapsed time of selection sort is {time.time() - start}')

ellapsed time of selection sort is 0.2636992931365967
