> # HEAP

>Imports

In [16]:
from __future__ import annotations
from collections.abc import Iterable
from typing import Callable, Final, Optional, TypeAlias, TypeVar, Generic, Protocol
import time, random, statistics
import gc, sys

>Variables

In [17]:
ITERATIONS : Final[int] = 50_000
HEAP_SIZE : Final[int] = 10_000

RESULT : TypeAlias = list[tuple[str, int, float, float, float, float, float]]

T = TypeVar('T')

>Class

In [18]:
class Heap(Generic[T]):
    """
    A binary heap implementation supporting both min-heap and max-heap behavior,
    with support for a custom key function for flexible ordering.

    Attributes:
        _arr (list[T]): Internal array representing the heap.
        _is_min_heap (bool): If True, heap behaves as a min-heap; otherwise, as a max-heap.
        _key (Callable): Function to extract comparison key from elements.
    """
    __slots__ = ("_arr", "_is_min_heap", "_key")

    def __init__(self, arr: list[T], isMinHeap: bool = False, key: Callable = lambda x: x) -> None:
        """
        Initialize a Heap from an existing list of integers.

        Args:
            arr (list[T]): Initial elements to store in the heap.
            isMinHeap (bool): Whether the heap is a min-heap (default False = max-heap).
            key (Callable): Optional key function for custom ordering (default identity).
        """
        self._arr = list(arr)
        self._is_min_heap = isMinHeap
        self._key = key
        self.build_heap()

    # Helpers
    def _left(self, index: int) -> int:
        """Return the index of the left child of a node."""
        return (index * 2) + 1

    def _right(self, index: int) -> int:
        """Return the index of the right child of a node."""
        return (index * 2) + 2

    def _parent(self, index: int) -> int:
        """Return the index of the parent of a node."""
        return (index - 1) // 2

    def _swap(self, a: int, b: int) -> None:
        """Swap two elements in the heap array."""
        val_a, val_b = self._arr[a], self._arr[b]
        self._arr[a], self._arr[b] = val_b, val_a

    @property
    def size(self) -> int:
        """Return the number of elements in the heap."""
        return len(self._arr)

    def is_empty(self) -> bool:
        """Check whether the heap is empty."""
        return len(self._arr) == 0

    def heapify_down(self, index: int, size: int) -> None:
        """Restore the heap property by moving an element downwards."""
        cumulative: int = index
        left: int = self._left(index)
        right: int = self._right(index)

        if left < size and self._compare(left, cumulative):
            cumulative = left

        if right < size and self._compare(right, cumulative):
            cumulative = right

        if cumulative != index:
            self._swap(index, cumulative)
            self.heapify_down(cumulative, size)

    def heapify_up(self, index: int) -> None:
        """Restore the heap property by moving an element upwards."""
        while index > 0:
            parent = self._parent(index)
            if self._compare(index, parent):
                self._swap(index, parent)
                index = parent
            else:
                break

    @classmethod
    def heapify_list(cls, arr: list[T], isMinHeap=False, key=None) -> list[T]:
        """
        Convert a list into a heap and return the internal array.

        Args:
            arr (list[T]): List to convert.
            isMinHeap (bool): Whether the heap should be min-heap.
            key (Callable): Optional key function.

        Returns:
            list[T]: Heapified array.
        """
        h = cls(arr, isMinHeap=isMinHeap, key=key)
        return h._arr

    def build_heap(self) -> None:
        """Convert the internal array into a valid heap."""

        start_index: int = (self.size // 2) - 1
        for i in range(start_index, -1, -1):
            self.heapify_down(i, self.size)

    def peek(self) -> Optional[T]:
        """Return the top element of the heap without removing it."""
        return None if self.is_empty() else self._arr[0]

    def peek_n(self, n: int) -> list[T]:
        """
        Return the top n elements from the heap without modifying it.

        Args:
            n (int): Number of elements to return.

        Returns:
            list[T]: The n highest-priority elements.
        """
        temp = Heap(self._arr.copy(), isMinHeap=self._is_min_heap, key=self._key)
        return [temp.pop() for _ in range(n)]

    def push(self, value: T) -> None:
        """Insert a new value into the heap."""
        self._arr.append(value)
        self.heapify_up(len(self._arr) - 1)

    def pop(self) -> T:
        """Remove and return the root element of the heap."""
        if self.is_empty():
            raise IndexError("Pop from empty heap")
        
        root = self._arr[0]
        last_index = self.size - 1
        if last_index > 0:
            self._swap(0, last_index)

        self._arr.pop()

        if not self.is_empty():
            self.heapify_down(0, self.size)
            
        return root

    def push_pop(self, value: T) -> T:
        """
        Push a value onto the heap and pop the root in a single efficient operation.

        Args:
            value (int): Value to push.

        Returns:
            int: The popped root value.
        """
        if self.is_empty():
            self.push(value)
            return value

        root = self._arr[0]
        if (self._is_min_heap and value > root) or not (self._is_min_heap and value < root):
            self._arr[0] = value
            self.heapify_down(0, self.size)
            return root
        else:
            return value

    def replace(self, oldValue: T, newValue: T) -> None:
        """
        Replace an existing value in the heap with a new value.

        Args:
            oldValue (T): Value to replace.
            newValue (T): New value to insert.

        Raises:
            ValueError: If heap is empty or value is not found.
        """
        if self.is_empty():
            raise ValueError("Heap is empty")
        try:
            index = self._arr.index(oldValue)
        except ValueError:
            raise KeyError(f"{oldValue} not found in heap")

        self._arr[index] = newValue

        parent = self._parent(index)
        if index > 0 and self._compare(index, parent):
            self.heapify_up(index)
        else:
            self.heapify_down(index, self.size)

    def clear(self) -> None:
        """Remove all elements from the heap."""
        self._arr.clear()

    def merge(self, heap: "Heap | list[T]") -> "Heap":
        """
        Merge another heap or list into this heap.

        Args:
            heap (Heap | list[T]): Heap or list to merge.

        Returns:
            Heap: Self after merging.
        """
        values = heap._arr if isinstance(heap, Heap) else heap
        self._arr.extend(values)
        self.build_heap()
        return self

    def nlargest(self, n: int) -> list[T]:
        """
        Return the n largest elements.

        Args:
            n (int): Number of elements to return.

        Returns:
            list[T]: n largest elements in descending order.
        """
        if n <= 0:
            return []
        if self.size <= n:
            return sorted(self._arr, reverse=True)

        temp_heap = Heap(self._arr[:n], isMinHeap=True, key=self._key)
        for val in self._arr[n:]:
            if self._key(val) > self._key(temp_heap.peek()):
                temp_heap.pop()
                temp_heap.push(val)

        return sorted(temp_heap._copy(), reverse=True)

    def nsmallest(self, n: int) -> list[T]:
        """
        Return the n smallest elements.

        Args:
            n (int): Number of elements to return.

        Returns:
            list[T]: n smallest elements in ascending order.
        """
        if n <= 0:
            return []
        if self.size <= n:
            return sorted(self._arr)

        temp_heap = Heap(self._arr[:n], isMinHeap=False, key=self._key)
        for val in self._arr[n:]:
            if self._key(val) < self._key(temp_heap.peek()):
                temp_heap.pop()
                temp_heap.push(val)

        return sorted(temp_heap._copy())

    def update_key(self, key: Callable = lambda x: x) -> None:
        """
        Update the key function used for ordering and rebuild the heap.

        Args:
            key (Callable): New key function.
        """
        self._key = key
        self.build_heap()

    def display_contents(self) -> None:
        """Display the heap in a tree-like, level-by-level format."""
        if self.is_empty():
            print("Empty Heap")
            return
        print("[MinHeap]:" if self._is_min_heap else "[MaxHeap]:")
        n, level, index = self.size, 0, 0
        while index < n:
            level_size = 2 ** level
            level_nodes = self._arr[index: index + level_size]
            indent = " " * (2 ** (max(0, self.size.bit_length() - level - 1)))
            spacing = " " * (2 ** (max(0, self.size.bit_length() - level)))
            print(indent + spacing.join(str(v) for v in level_nodes))
            index += level_size
            level += 1

    def print_contents(self) -> None:
        """Print the raw internal array representing the heap."""
        if self.is_empty():
            print("Empty tree")
            return
        print("[MinHeap]:" if self._is_min_heap else "[MaxHeap]:")
        print(self._arr)

    def __repr__(self) -> str:
        self.display_contents()

    def __str__(self) -> str:
        """Return a string representation of the heap."""
        heap_type = "MinHeap" if self._is_min_heap else "MaxHeap"
        return f"[{heap_type}] {self._arr}"

    def __len__(self) -> int:
        """Return the number of elements in the heap."""
        return self.size

    def __bool__(self) -> bool:
        """Return True if the heap is not empty."""
        return not self.is_empty()

    def __iter__(self):
        """Return an iterator over a copy of the heap array."""
        return iter(self._arr.copy())

    def __contains__(self, item):
        """Check if an item exists in the heap."""
        return item in self._arr

    def _compare(self, a, b) -> bool:
        """
        Compare two elements based on the heap type and key function.

        Args:
            a (int): Index of first element.
            b (int): Index of second element.

        Returns:
            bool: True if a has higher priority than b.
        """
        ka, kb = self._key(self._arr[a]), self._key(self._arr[b])
        return ka < kb if self._is_min_heap else ka > kb

    def _copy(self) -> list[T]:
        """
        Return a shallow copy of the heap elements.

        Returns:
            list[T]: Heap contents.
        """
        return self._arr.copy()

In [19]:
import time
import gc
import random
import statistics
from typing import Callable, List, Tuple

RESULT = List[Tuple[str, int, float, float, float, float, float]]  # Type alias for results

def benchmark(func: Callable, iterations: int, *args, **kwargs) -> tuple[float, float, list[float]]:
    """
    Measure execution time of a function over multiple iterations.

    Args:
        func (Callable): Function to benchmark.
        iterations (int): Number of times to run the function.
        *args, **kwargs: Arguments to pass to func.

    Returns:
        tuple: (total_time, average_time, list_of_iteration_times)
    """
    times = []
    gc_enabled = gc.isenabled()
    gc.disable()

    for _ in range(iterations):
        start = time.perf_counter()
        func(*args, **kwargs)
        end = time.perf_counter()
        times.append(end - start)

    if gc_enabled: 
        gc.enable()

    total_time = sum(times)
    avg_time = total_time / iterations
    return total_time, avg_time, times


def benchmark_heap(heap_size: int = 100_000, iterations: int = 100, is_min_heap: bool = True) -> None:
    """
    Benchmark various Heap operations and print a formatted table.

    Args:
        heap_size (int): Number of elements to initialize the heap with.
        iterations (int): Number of benchmark iterations for each operation.
        is_min_heap (bool): Whether to use a min-heap or max-heap.
    """
    # Generate random data
    data = [random.randint(0, heap_size * 10) for _ in range(heap_size)]
    heap = Heap(data, isMinHeap=is_min_heap)

    results: RESULT = []

    def record(name: str, total: float, avg: float, times: list[float]):
        results.append((
            name,
            len(times),
            total,
            avg,
            min(times),
            max(times),
            statistics.stdev(times) if len(times) > 1 else 0.0
        ))

    # Benchmark push
    total, avg, times = benchmark(lambda: heap.push(random.randint(0, heap_size * 10)), iterations)
    record("push", total, avg, times)

    # Benchmark pop + push to maintain heap size
    total, avg, times = benchmark(lambda: (heap.pop(), heap.push(random.randint(0, heap_size * 10))), iterations)
    record("pop", total, avg, times)

    # Benchmark peek
    total, avg, times = benchmark(lambda: heap.peek(), iterations)
    record("peek", total, avg, times)

    # Benchmark push_pop
    total, avg, times = benchmark(lambda: heap.push_pop(random.randint(0, heap_size * 10)), iterations)
    record("push_pop", total, avg, times)

    # Benchmark replace
    total, avg, times = benchmark(
        lambda: heap.replace(
            heap._arr[random.randint(0, heap.size - 1)],
            random.randint(0, heap_size * 10)
        ),
        iterations
    )
    record("replace", total, avg, times)

    # Benchmark merge with a small random list
    merge_data = [random.randint(0, heap_size * 10) for _ in range(heap_size // 10)]
    total, avg, times = benchmark(lambda: heap.merge(merge_data), iterations)
    record("merge", total, avg, times)

    # Benchmark nlargest
    total, avg, times = benchmark(lambda: heap.nlargest(10), iterations)
    record("nlargest", total, avg, times)

    # Benchmark nsmallest
    total, avg, times = benchmark(lambda: heap.nsmallest(10), iterations)
    record("nsmallest", total, avg, times)

    # Benchmark clear + rebuild to simulate rebuild_heap
    total, avg, times = benchmark(
        lambda: Heap([random.randint(0, heap_size * 10) for _ in range(heap_size)], isMinHeap=is_min_heap),
        iterations
    )
    record("build_heap", total, avg, times)

    print_table(results)


def print_table(results: RESULT) -> None:
    """
    Print benchmark results in a formatted table.

    Args:
        results (RESULT): List of tuples with benchmark data.
    """
    print(f"{'Operation':<15} {'Iters':>6} {'Total(s)':>10} {'Avg(s)':>10} {'Min(s)':>10} {'Max(s)':>10} {'StdDev':>10}")
    print("-" * 75)
    for op, count, total, avg, mn, mx, stdev in results:
        print(f"{op:<15} {count:>6} {total:>10.6f} {avg:>10.8f} {mn:>10.8f} {mx:>10.8f} {stdev:>10.8f}")

In [20]:
if __name__ == "__main__":
    benchmark_heap(heap_size=100, iterations=1_000, is_min_heap=True)
    



Operation        Iters   Total(s)     Avg(s)     Min(s)     Max(s)     StdDev
---------------------------------------------------------------------------
push              1000   0.001984 0.00000198 0.00000080 0.00004730 0.00000172
pop               1000   0.008311 0.00000831 0.00000460 0.00004010 0.00000236
peek              1000   0.000208 0.00000021 0.00000010 0.00000450 0.00000014
push_pop          1000   0.003177 0.00000318 0.00000060 0.00001810 0.00000287
replace           1000   0.008106 0.00000811 0.00000230 0.00002320 0.00000357
merge             1000   1.776827 0.00177683 0.00031750 0.00370270 0.00078817
nlargest          1000   2.304915 0.00230491 0.00193150 0.00312470 0.00019267
nsmallest         1000   2.098138 0.00209814 0.00169780 0.00295380 0.00019597
build_heap        1000   0.110837 0.00011084 0.00008470 0.00025500 0.00001890
