In [1]:
import heapq
from collections import Counter, defaultdict
from typing import List

Mapping the elements of a heap into an array is trivial: if a node is stored at index k, then its left child is stored at index 2k + 1 and its right child at index 2k + 2 for 0 based indexing and for 1 based indexing the left child will be at 2k and right child will be at 2k + 1.

Same is true for max heap but it is larger nodes not smaller ones


**703. Kth Largest Element in a Stream**


In [31]:
class KthLargest:  # 95% time, 45% memory
    def __init__(self, k: int, nums: List[int]):
        # minHeap with K largest integers
        self.minHeap, self.k = nums, k
        heapq.heapify(self.minHeap)  # turn into min heap in O(n)
        # then remove values until len(self.minHeap) is k if there are more than k
        while len(self.minHeap) > k:  # O(n-k) ~ O(n)
            heapq.heappop(self.minHeap)

    def add(self, val: int) -> int:
        heapq.heappush(self.minHeap, val)  # O(logn)
        if len(self.minHeap) > self.k:
            heapq.heappop(self.minHeap)  # O(logn)
        return self.minHeap[
            0
        ]  # because it is a heap it is always the min is always zeroth index


# Your KthLargest object will be instantiated and called as such:
# obj = KthLargest(k, nums)
# param_1 = obj.add(val)

**1046. Last Stone Weight**


In [32]:
class Solution:  # 96% time, 60% memory
    def lastStoneWeight(self, stones: List[int]) -> int:
        # want a maxHeap so multiply everything by negative 1
        stones = [-s for s in stones]
        heapq.heapify(stones)  # maxHeap
        while len(stones) > 1:
            heavy1, heavy2 = heapq.heappop(stones), heapq.heappop(stones)
            if heavy1 == heavy2:
                continue
            heapq.heappush(stones, -abs(heavy1 - heavy2))

        return 0 if not stones else abs(stones[0])


lastStoneWeight = Solution()
lastStoneWeight.lastStoneWeight([2, 7, 4, 1, 8, 1])

1

**973. K Closest Points to Origin**


In [33]:
class Solution:  # 93% time, 70% memory
    def kClosest(self, points: List[List[int]], k: int) -> List[List[int]]:
        # total time complexity: O(2n + klogn) = O(klogn) <= O(nlogn) from just sorting
        # don't need to square root it, because everything gets scaled the same way
        distances = [(x**2 + y**2, x, y) for x, y in points]  # O(n)
        res = []
        heapq.heapify(distances)  # O(n) to make a minHeap
        for _ in range(k):  # O(k) * O(logn) = O(klogn)
            res.append(heapq.heappop(distances)[1:])
        return res


kClosest = Solution()
kClosest.kClosest([[1, 3], [-2, 2]], 1)

[(-2, 2)]

**215. Kth Largest Element in an Array**


In [34]:
class Solution:  # 70% time, 68% memory
    def findKthLargest(self, nums: List[int], k: int) -> int:
        nums = [-n for n in nums]  # maxHeap needs negative values
        heapq.heapify(nums)
        for _ in range(k - 1):
            heapq.heappop(nums)
        return abs(heapq.heappop(nums))

    def findKthLargestClever(self, nums: List[int], k: int) -> int:
        # gets a minHeap of length k
        # goes through the entire array and replaces the smallest value in the minHeap with an encountered larger value
        # then the smallest value in the minHeap at the end is kth largest of the entire array.
        min_heap = nums[:k]
        heapq.heapify(min_heap)

        for num in nums[k:]:
            if num > min_heap[0]:
                heapq.heappop(min_heap)
                heapq.heappush(min_heap, num)

        return min_heap[0]

    # doesnt pass the final test case
    # average time is O(n + n/2 + n/4 + n/8) = O(2n) = O(n)
    # but worst case time is O(n^2) so this fails the last test case
    def findKthLargestQuickSelect(self, nums: List[int], k: int) -> int:
        # want to get the len(nums) - k index of a sorted array
        k = len(nums) - k

        def quickSelect(left, right):
            pivot, ptr = nums[right], left
            for i in range(left, right):
                if nums[i] <= pivot:
                    nums[i], nums[ptr] = nums[ptr], nums[i]
                    # reorganizes al smaller values to be on the left of the array
                    ptr += 1
                nums[right], nums[ptr] = (
                    nums[ptr],
                    nums[right],
                )  # placing the pivot after all the values smaller than it
            if ptr < k:
                return quickSelect(ptr + 1, right)
            elif ptr > k:
                return quickSelect(left, ptr - 1)
            else:
                return nums[ptr]

        return quickSelect(0, len(nums) - 1)

**Trying to implement quicksort**


In [1]:
def quickSort(arr: list[int]) -> None:
    "Mutates inplace"
    # at each step you want to put the current pivot value in the correct index for the sorted array
    # all values less than it need to be before, all values greater than it need to be after

    def doSort(left, right):
        if left >= right:
            return
        pivot, ptr = arr[right], left
        for i in range(left, right):  # not right + 1 because that is just the pivot
            if arr[i] <= pivot:
                arr[ptr], arr[i] = arr[i], arr[ptr]
                ptr += 1  # this guarantees that an old value that is less than pivot doesn't get moved away
        arr[ptr], arr[right] = (
            pivot,
            arr[ptr],
        )  # swap pivot into its correct index for the sorted array (after all values less than it)

        # now sort left side of pivot
        doSort(left, ptr - 1)
        # now sort right side of pivot
        doSort(ptr + 1, right)

    doSort(0, len(arr) - 1)


lemon = [1, 0, 6, 3, -10, 5]
quickSort(lemon)
lemon

[-10, 0, 1, 3, 5, 6]

**621. Task Scheduler**


In [None]:
class Solution:
    def leastInterval(self, tasks: List[str], n: int) -> int:  # 43% time, 67% memory
        if n == 0:
            return len(tasks)

        occurrences = Counter(tasks)
        # make it a max heap
        occurrences = [-n for n in occurrences.values()]

        heapq.heapify(occurrences)  # O(26)

        usedTime = 0
        idle_stack = []  # keeping track of idle tasks [task count, time to reintroduce]
        while occurrences or idle_stack:
            usedTime += 1
            if occurrences:
                occurrence = heapq.heappop(occurrences)
                if occurrence != -1:
                    # remember that occurrence is a negative number so this is decrementing it (in terms of magnitude)
                    # if occurrence is -1 then that task has finished being processed entirely
                    idle_stack.append([occurrence + 1, usedTime + n])
            if idle_stack and idle_stack[0][1] == usedTime:
                # if the idle stack has a task that can be readded to the heap again because the cooldown time elapsed
                # do so:
                heapq.heappush(occurrences, idle_stack.pop(0)[0])
        return usedTime


leastInterval = Solution()
leastInterval.leastInterval(["A", "A", "A", "B", "B", "B"], 2)

8

**355. Design twitter**


In [None]:
class Twitter:
    # 57% time, 60% memory
    def __init__(self):
        self.numPosts = 0
        self.tweetMap = defaultdict(list)  # userID -> list of [order posted, tweetID]
        self.followMap = defaultdict(set)  # userID -> set of following IDs

    def postTweet(self, userId: int, tweetId: int) -> None:
        self.tweetMap[userId].append([self.numPosts, tweetId])
        self.numPosts -= 1  # decrementing so we can use a maxHeap

    def getNewsFeed(self, userId: int) -> List[int]:
        result = []
        minHeap = []

        # one time thing: has to be a follower of themself
        if userId not in self.followMap[userId]:
            self.followMap[userId].add(userId)

        for followeeId in self.followMap[userId]:
            if followeeId in self.tweetMap:
                index = len(self.tweetMap[followeeId]) - 1
                # index of the last tweet in that tweet array
                count, tweetId = self.tweetMap[followeeId][index]
                minHeap.append([count, tweetId, followeeId, index - 1])
                # index - 1 is the index of the next tweet in the tweet array for that followeeId

        heapq.heapify(minHeap)

        while minHeap and len(result) < 10:
            count, tweetId, followeeId, index = heapq.heappop(minHeap)
            result.append(tweetId)
            if index >= 0:  # equal because index 0 has a tweet as well
                count, tweetId = self.tweetMap[followeeId][index]
                heapq.heappush(minHeap, [count, tweetId, followeeId, index - 1])
        return result

    def follow(self, followerId: int, followeeId: int) -> None:
        self.followMap[followerId].add(followeeId)

    def unfollow(self, followerId: int, followeeId: int) -> None:
        try:
            self.followMap[followerId].remove(followeeId)
        except KeyError:
            pass


# Your Twitter object will be instantiated and called as such:
# obj = Twitter()
# obj.postTweet(userId,tweetId)
# param_2 = obj.getNewsFeed(userId)
# obj.follow(followerId,followeeId)
# obj.unfollow(followerId,followeeId)

**295. Find Median from Data Stream**


In [None]:
class MedianFinder:  # 77% time, 8% memory
    """
    You should aim for a solution with
    - O(logn) time for addNum(),
    - O(1) time for findMedian(), and
    - O(n) space

    where n is the current number of elements.

    The use l_left and l_right really messes up my memory usage and probaby time too since it's way more operations
    """

    def __init__(self):
        # store the values <= median in a max heap
        self.left_heap = []
        self.l_left = 0
        # store the values >= median in a min heap
        self.right_heap = []
        self.l_right = 0
        # the median can then be computed from the max of and min of right

    def addNum(self, num: int) -> None:
        if not self.right_heap:
            # first number being inserted
            # negtive insertion because we want a max heap
            heapq.heappush(self.right_heap, num)
            self.l_right += 1
        else:
            if num > self.right_heap[0]:
                # number is larger than the minimum of the right side heap, so add to the right side
                heapq.heappush(self.right_heap, num)
                self.l_right += 1
            else:
                # number is smaller than the minimum of the right side heap, so add to the left side
                heapq.heappush(self.left_heap, -num)
                self.l_left += 1

            # ensure that the heaps are never more than 1 different in length

        if self.l_right - 1 <= self.l_left <= self.l_right + 1:
            return
        elif self.l_left < self.l_right - 1:
            heapq.heappush(self.left_heap, -heapq.heappop(self.right_heap))
            self.l_right -= 1
            self.l_left += 1
            return
        else:
            heapq.heappush(self.right_heap, -heapq.heappop(self.left_heap))
            self.l_left -= 1
            self.l_right += 1
            return

    def findMedian(self) -> float:
        if (self.l_left + self.l_right) % 2:
            # odd length:
            if self.l_left > self.l_right:
                return -self.left_heap[0]
            return self.right_heap[0]
        return (-self.left_heap[0] + self.right_heap[0]) / 2


class MedianFinderNeetCode:  # 98.8 % time, 65 memory
    """
    Same logic but cleaner code
    """

    def __init__(self):
        # two heaps, large, small, minheap, maxheap
        # heaps should be equal size
        self.small, self.large = [], []

    def addNum(self, num: int) -> None:
        if self.large and num > self.large[0]:
            heapq.heappush(self.large, num)
        else:
            heapq.heappush(self.small, -1 * num)

        if len(self.small) > len(self.large) + 1:
            val = -1 * heapq.heappop(self.small)
            heapq.heappush(self.large, val)
        if len(self.large) > len(self.small) + 1:
            val = heapq.heappop(self.large)
            heapq.heappush(self.small, -1 * val)

    def findMedian(self) -> float:
        if len(self.small) > len(self.large):
            return -1 * self.small[0]
        elif len(self.large) > len(self.small):
            return self.large[0]
        return (-1 * self.small[0] + self.large[0]) / 2.0


# Your MedianFinder object will be instantiated and called as such:
obj = MedianFinder()
for n in [1, 2, 3, 4, 1, 1, 1, 1]:
    obj.addNum(n)
    print(obj.findMedian())

1
1.5
2
2.5
2
1.5
1
1.0


**3066. Minimum Operations to Exceed Threshold Value II**


In [None]:
class Solution:
    # 98% time, 72% memory
    def minOperationsSmarter(self, nums: List[int], k: int) -> int:
        # add everything to a heap then do your processing
        heapq.heapify(nums)
        count = 0
        while nums[0] < k:
            # there is a heapq.heapreplace method which does: Pop and return the current smallest value, and add the new item.
            count += 1
            # since the smallest value needs to get multiplied by 2 we can just put the heappop there, and then we know that nums[0] is the next smallest
            heapq.heapreplace(nums, heapq.heappop(nums) * 2 + nums[0])
        return count

    # 67% time, 38% memory
    def minOperations(self, nums: List[int], k: int) -> int:
        # add everything to a heap then do your processing
        heapq.heapify(nums)
        count = 0
        while nums[0] < k:
            count += 1
            x, y = heapq.heappop(nums), heapq.heappop(nums)
            if x > y:
                heapq.heappush(nums, y * 2 + x)
            else:
                heapq.heappush(nums, x * 2 + y)
        return count


minOperations = Solution()
print(minOperations.minOperations([2, 11, 10, 1, 3], 10))
print(minOperations.minOperations([1, 1, 2, 4, 9], 20))

2
4
