In [1]:
# Topic 7: Heaps & Priority Queues 
# Task 1: Implementing a Min-Heap and Max-Heap 

In [2]:
class Heap:
    def __init__(self, heap_type="min"):
        if heap_type not in ["min", "max"]:
            raise ValueError("heap_type must be either 'min' or 'max'")
        self.heap_type = heap_type
        self.data = []

    def _compare(self, parent, child):
        if self.heap_type == "min":
            return parent > child
        return parent < child

    def _heapify_up(self, index):
        parent_index = (index - 1) // 2
        if index > 0 and self._compare(self.data[parent_index], self.data[index]):
            self.data[parent_index], self.data[index] = self.data[index], self.data[parent_index]
            self._heapify_up(parent_index)

    def _heapify_down(self, index):
        left_child = 2 * index + 1
        right_child = 2 * index + 2
        smallest_or_largest = index

        if left_child < len(self.data) and self._compare(self.data[smallest_or_largest], self.data[left_child]):
            smallest_or_largest = left_child
        if right_child < len(self.data) and self._compare(self.data[smallest_or_largest], self.data[right_child]):
            smallest_or_largest = right_child

        if smallest_or_largest != index:
            self.data[index], self.data[smallest_or_largest] = self.data[smallest_or_largest], self.data[index]
            self._heapify_down(smallest_or_largest)

    def insert(self, value):
        self.data.append(value)
        self._heapify_up(len(self.data) - 1)

    def extract_root(self):
        if not self.data:
            raise IndexError("extract_root from an empty heap")
        root = self.data[0]
        self.data[0] = self.data[-1]
        self.data.pop()
        if self.data:
            self._heapify_down(0)
        return root

    def peek(self):
        if not self.data:
            raise IndexError("peek from an empty heap")
        return self.data[0]

    def heapify(self, array):
        self.data = array[:]
        for i in range(len(self.data) // 2 - 1, -1, -1):
            self._heapify_down(i)

In [3]:
# Task 2: Implementing a Priority Queue Using a Heap 

In [4]:
class PriorityQueue:
    def __init__(self):
        self.heap = Heap(heap_type="min")

    def enqueue(self, value, priority):
        # Insert a tuple (priority, value) into the heap
        self.heap.insert((priority, value))

    def dequeue(self):
        # Extract the root of the heap (highest priority element)
        if not self.heap.data:
            raise IndexError("dequeue from an empty priority queue")
        return self.heap.extract_root()[1]  # Return the value, not the priority

    def peek(self):
        # Peek at the root of the heap (highest priority element)
        if not self.heap.data:
            raise IndexError("peek from an empty priority queue")
        return self.heap.peek()[1]  # Return the value, not the priority

# Test cases
pq = PriorityQueue()
pq.enqueue("Task A", 3)
pq.enqueue("Task B", 1)
pq.enqueue("Task C", 2)

print(pq.dequeue())  # Output: "Task B" (highest priority)
print(pq.peek())     # Output: "Task C" (next highest priority)
pq.enqueue("Task D", 0)
print(pq.dequeue())  # Output: "Task D" (highest priority)

Task B
Task C
Task D


In [5]:
# Task 3: Finding the K Smallest and K Largest Elements Using a Heap 

In [6]:
import time
import random

def find_k_smallest(arr, k):
    # Use a Min-Heap to find the K smallest elements
    min_heap = Heap(heap_type="min")
    min_heap.heapify(arr)
    return [min_heap.extract_root() for _ in range(k)]

def find_k_largest(arr, k):
    # Use a Max-Heap to find the K largest elements
    max_heap = Heap(heap_type="max")
    max_heap.heapify(arr)
    return [max_heap.extract_root() for _ in range(k)]

# Test cases
arr = [10, 4, 3, 20, 15, 7]
print("K Smallest Elements:", find_k_smallest(arr, 3))  # Output: [3, 4, 7]
print("K Largest Elements:", find_k_largest(arr, 2))   # Output: [20, 15]

# Comparison with sorting

# Generate a large random list
large_arr = random.sample(range(1, 1000000), 100000)

k = 100

# Measure time for heap-based approach
start_time = time.time()
heap_k_smallest = find_k_smallest(large_arr, k)
heap_k_largest = find_k_largest(large_arr, k)
heap_time = time.time() - start_time

# Measure time for sorting-based approach
start_time = time.time()
sorted_k_smallest = sorted(large_arr)[:k]
sorted_k_largest = sorted(large_arr, reverse=True)[:k]
sort_time = time.time() - start_time

print(f"Heap-based approach time: {heap_time:.6f} seconds")
print(f"Sorting-based approach time: {sort_time:.6f} seconds")

K Smallest Elements: [3, 4, 7]
K Largest Elements: [20, 15]
Heap-based approach time: 0.348802 seconds
Sorting-based approach time: 0.100807 seconds
