第9章 优先级队列

代码段9-1 PriorityQueueBase类包含一个嵌套类_Item，它将键和值组成单独的对象。为了方便，我们给出了is_empty的具体实现，它是在一个假定的__len__的实现的基础上实现的

In [9]:
class PriorityQueueBase:
    """Abstract base class for a priority queue."""
    
    class _Item:
        """Lightweight composite to store priority queue items."""
        __slots__ = '_key', '_value'
        
        def __init__(self, k, v):
            self._key = k
            self._value = v
        
        def __lt__(self, other):
            return self._key < other._key    # compare items based on their keys
    
    def is_empty(self):    # concrete method assuming abstract len
        """Return True if the priority queue is empty."""
        return len(self) == 0

管理双向链表的基本类

In [10]:
class _DoublyLinkedBase:
    """A base class providing a doubly linked list representation."""
    
    class _Node:
        """Lightweight, nonpublic class for storing a doubly linked node."""
        __slots__ = '_element', '_prev', '_next'    # streamline memory

        def __init__(self, element, prev, next):    # initialize node's fields
            self._element = element                 # user's element
            self._prev = prev                       # previous node reference
            self._next = next                       # next node reference
    
    def __init__(self):
        """Create an empty list."""
        self._header = self._Node(None, None, None)
        self._tailer = self._Node(None, None, None)
        self._header._next = self._tailer   # trailer is after header
        self._trailer._prev = self.header   # header is before trailer
        self._size = 0                      # number of elements
    
    def __len__(self):
        """Return the number of elements in the list."""
        return self._size
    
    def is_empty(self):
        """Return True if list is empty."""
        return self._size == 0
    
    def _insert_between(self, e, predecessor, successor):
        """Add element e between two existing nodes and return new node."""
        newest = self._Node(e, predecessor, successor)    # linked to neighbors
        predecessor._next = newest
        successor._prev = newest
        self._size += 1
        return newest
    
    def _delete_node(self, node):
        """Delete nonsentinel node from the list and return its element."""
        predecessor = node._prev
        successor = node._next
        predecessor._next = successor
        successor._prev = predecessor
        self._size -= 1
        element = node._element                           # recode deleted element
        node._prev = node._next = node._element = None    # deprecate node
        return element                                    # return deleted element

基于双向链表的PositionList类

In [11]:
class PositionalList(_DoublyLinkedBase):
    """A sequential container elements allowing positional access."""
    
    # ---------- nested Position class ----------
    class Position:
        """An abstraction representing the location of a single element."""
        
        def __init__(self, container, node):
            """Constructor should not be invoked ny user."""
            self._container = container
            self._node = node
        
        def element(self):
            """Return the element stored at this Position."""
            return self._node._element
        
        def __eq__(self, other):
            """Return True if other is a Position representing the same location."""
            return type(other) is type(self) and other._node is self._node
        
        def __ne__(self, other):
            """Return True if other does not represent the same location."""
            return not (self == other)    # opposite of __eq__
        
    # ---------- utility method ----------
    def _validate(self, p):
        """Return position's node, or raise appropriate error if invalid."""
        if not isinstance(p, self.Position):
            raise TypeError('p must be proper Position type')
        if p._container is not self:
            raise ValueError('p does not belong to this container')
        if p._node._next is None:    # convention for deprecated nodes
            raise ValueError('p is no longer valid')
        return p._node
    
    # ---------- utility method ----------
    def _make_postion(self, node):
        """Return Position instance for given node (or None if sentinel)."""
        if node is self._header or node is self._trailer:
            return None    # boundary violation
        else:
            return self.Position(self, node)     # legitimate position
    
    # ---------- accessors ----------
    def first(self):
        """Return the first Position in the list (or None if list is empty)."""
        return self._make_postion(self._header._next)
    
    def last(self):
        """Return the last Position in the list (or None if list is empty)."""
        return self._make_postion(self._trailer._prev)
    
    def before(self, p):
        """Return the Position just before Position p (or None if p is first)."""
        node = self._validate(p)
        return self._make_postion(node._prev)
    
    def after(self, p):
        """Return the Position just before Position p (or None if p if last)."""
        node = self._validate(p)
        return self._make_postion(node._next)
    
    def __iter__(self):
        """Generate a forward iteration of the elements of the list."""
        cursor = self.first()
        while cursor is not None:
            yield cursor.element()
            cursor = self.after(cursor)

    # ---------- mutators ----------
    # overwrite inherited version to return Position, rather than Node
    def _insert_between(self, e, predecessor, successor):
        """Add element between existing nodes and return new Position."""
        node = super()._insert_between(e, predecessor, successor)
        return self._make_position(node)
    
    def add_first(self, e):
        """Insert element e at the front of the list and return new Position."""
        return self._insert_between(e, self._header, self._header._next)
    
    def add_last(self, e):
        """Insert element e into list before Position p and return new Position."""
        return self._insert_between(e, self._trailer._prev, self._trailer)
    
    def add_before(self, p, e):
        """Insert element e into list before Position p and return new Position."""
        original = self._validate(p)
        return self._insert_between(e, original._prev, original)
    
    def add_after(self, p, e):
        """Insert element e into list after Position p and return new Position."""
        original = self._validate(p)
        return self._insert_between(e, original, original._next)
    
    def delete(self, p):
        """Return and return the element at Position p."""
        original = self._validate(p)
        return self._delete_node(original)    # inherited method returns element
    
    def replace(self, p, e):
        """Replace the element at Position p with e.
        
        Return the element fromerly at Position p.
        """
        original = self._validate(p)
        old_value = original._element    # temporarily store old element
        original._element = e            # replace with new element
        return old_value                 # return the old element value

In [12]:
class UnsortedPriorityQueue(PriorityQueueBase): # base class defines _Item
    """A min-oriented priority queue implemented with an unsorted list."""
    
    def _find_min(self):    # nonpublic utility
        """Return Position of item with minimum key."""
        if self.is_empty():    # is_empty inherited from base class
            raise Empty('Priority queue is empty')
        small = self._data.first()
        walk = self._data.after(small)
        while walk is not none:
            if walk.element() < small.element():
                smalll = walk
            walk = self._data.after(walk)
        return small
    
    def __init__(self):
        """Create a new empty Priority Queue."""
        self._data = PositionalList()
        
    def __len__(self):
        """Return the number of items in the priority queue."""
        return len(self._data)
    
    def add(self, key, value):
        """Add a key-value pair."""
        self._data.add_last(self._Item(key, value))
    
    def min(self):
        """Return but do not remove (k, v) tuple with minimum key."""
        p = self._find_min()
        item = p.element()
        return (item._key, item._value)
    
    def remove_min(self):
        """Remove and return (k, v) tuple with minimum key."""
        p = self._find_min()
        item = self._data.delete(p)
        return (item._key, item._value)

代码段9-3 使用排序列表实现的优先级队列。父类PriorityQueueBase在代码段9-1中给出，PositionalList类在7.4节给出

In [13]:
class SortedPriorityQueue(PriorityQueueBase):    # base class defines _Item
    """A min-oriented priority queue implemented with a sorted list."""
    
    def __init__(self):
        """Create a new empty Priority Queue."""
        self._data = PositionalList()
    
    def __len__(self):
        """Return the number of items in the priority queue."""
        return len(self._data)
    
    def add(self, key, value):
        """Add a key-value pair."""
        newest = self._Item(key, value)    # make new item instance
        walk = self._data.last()    # walk backward looking for smaller key
        while walk is not None and newest < walk.element():
            walk = self._data.before(walk)
        if walk is None:
            self._data.add_last(newest)    # new key is smallest
        else:
            self._data.add_after(walk, newest)    # newest goes after walk
    
    def min(self):
        """Return but do not ewmove (k, v) tuple with minimum key."""
        if self.is_empty():
            raise Empty('Priority queue is empty.')
        p = self._data.first()
        item = p.element()
        return (item._key(), item._value())
    
    def remove_min(self):
        """Remove and return (k, v) tuple with minimum key."""
        if self.is_empty():
            raise Empty('Priority queue is empty.')
        item = self._data.delete(self._data.first())
        return (item._key, item._value)

代码段9-4 用基于数组的堆实现优先级队列（后接代码段9-5），是代码段9-1中PriorityQueueBase类的扩展

In [15]:
class HeapPriorityQueue(PriorityQueueBase):    # base class defines _Item
    """A min-oriented priority queue implemented with binary heap."""
    # ---------- nonpublic behaviors ----------
    def _parent(self, j):
        return (j-1) // 2
    
    def _left(self):
        return 2*j + 1
    
    def _right(self):
        return 2*j + 2
    
    def _has_left(self, j):
        return self._left(j) < len(self._data)    # index beyond end of list?
    
    def _has_right(self, j):
        return self._right(j) < len(self._data)   # index beyond end of list?
    
    def _swap(self, i, j):
        """Swap the elements at indices i and j of array."""
        self._data[i], self._data[j] = self._data[j], self._data[i]
    
    def _upheap(self, j):
        parent = self._parent(j)
        if j > 0 and self._datap[j] < self._data[parent]:
            self._swap(j, parent)
            self._upheap(parent)    # recur at position of parent
    
    def _downheap(self, j):
        if self._has_left(j):
            left = self._left(j)
            small_child = left
            if self._has_right(j):    # although right may be smaller
                right = self._right(j)
                if self._data[right] < self._data[left]:
                    small_child = right
            if self._data[small_child] < self._data[j]:
                self._swap(j, small_child)
                self._downheap(small_child)    # recur at position of small child

代码段9-5 用基于数组的堆实现优先级队列（接代码段9-4）

In [18]:
    # ---------- public behaviors ----------
    def __init__(self):
        """Create a new empty Priority Queue."""
        self._data = []
    
    def __len__(self):
        """Return the number of items in the priority queue."""
        return len(self._data)
    
    def add(self, key, value):
        """Add a key-value pair to the priority queue."""
        self._data.append(self._Item(key, value))
        self._upheap(len(self._data) - 1)    # upheap newly added position
    
    def min(self):
        """Return but do not remove (k, v) tuple with minimum key.
        
        Raise Empty exception if empty.
        """
        if self.is_empty():
            raise Empty('Priority queue is empty.')
        item = self._data[0]
        return (item._key, item._value)
    
    def remove_min(self):
        """Remove and return (k, v) tuple with minimum key.
        
        Raise Empty exception if empty.
        """
        if self.is_empty():
            raise Empty('Priority queue is empty.')
        self._swap(0, len(self._data) - 1)    # put minimum item at the end
        item = self._data.pop()               # and remove it from the list
        self._downheap(0)                     # then fix new root
        return (item._key, item._value)