In [49]:
# Original Credit: https://www.programiz.com/dsa/b-tree
# Howerver, the original code was not working properly. Specifically, during deletion, there could
# be a case where the last child is merged, and the current index is out of bounds. -> line 88
# Modified by: Fabian Schuller

# Searching a key on a B-tree in Python

# Create a node
class BTreeNode:
    def __init__(self, leaf=False):
        self.leaf = leaf
        self.keys = []
        self.child = []

# Tree
class BTree:
    def __init__(self, t):
        self.root = BTreeNode(True)
        self.t = t

    # Insert a key
    def insert(self, k):
        root = self.root
        if len(root.keys) == (2 * self.t) - 1:
            temp = BTreeNode()
            self.root = temp
            temp.child.insert(0, root)
            self.split_child(temp, 0)
            self.insert_non_full(temp, k)
        else:
            self.insert_non_full(root, k)

    # Insert non full
    def insert_non_full(self, x, k):
        i = len(x.keys) - 1
        if x.leaf:
            x.keys.append((None, None))
            while i >= 0 and k[0] < x.keys[i][0]:
                x.keys[i + 1] = x.keys[i]
                i -= 1
            x.keys[i + 1] = k
        else:
            while i >= 0 and k[0] < x.keys[i][0]:
                i -= 1
            i += 1
            if len(x.child[i].keys) == (2 * self.t) - 1:
                self.split_child(x, i)
                if k[0] > x.keys[i][0]:
                    i += 1
            self.insert_non_full(x.child[i], k)

    # Split the child
    def split_child(self, x, i):
        t = self.t
        y = x.child[i]
        z = BTreeNode(y.leaf)
        x.child.insert(i + 1, z)
        x.keys.insert(i, y.keys[t - 1])
        z.keys = y.keys[t: (2 * t) - 1]
        y.keys = y.keys[0: t - 1]
        if not y.leaf:
            z.child = y.child[t: 2 * t]
            y.child = y.child[0: t]

    # Delete a node
    def delete(self, x, k):
        t = self.t
        i = 0
        while i < len(x.keys) and k[0] > x.keys[i][0]:
            i += 1
        if x.leaf:
            # Case 1: Node is a leaf
            if i < len(x.keys) and x.keys[i][0] == k[0]:
                x.keys.pop(i)
                return
            else:
                print(f"Key {k[0]} not found in the tree.")
                return
        else:
            # Case 2: Key is found in an internal node
            if i < len(x.keys) and x.keys[i][0] == k[0]:
                return self.delete_internal_node(x, k, i)
            # Case 3: Key is not in node, go to the proper child
            if len(x.child) > i:  # Ensure the child exists
                if len(x.child[i].keys) < t:
                    self.fill(x, i)

                if len(x.child) == i: # fill causes i to be out of bounds
                    i -= 1
                self.delete(x.child[i], k)
            else:
                print(f"Key {k[0]} not found in the tree.")
                return


    def delete_internal_node(self, x, k, i):
        t = self.t
        # Case 2a: Predecessor has enough keys
        if len(x.child[i].keys) >= t:
            pred_key = self.get_predecessor(x, i)
            x.keys[i] = pred_key
            self.delete(x.child[i], pred_key)
        # Case 2b: Successor has enough keys
        elif len(x.child[i + 1].keys) >= t:
            succ_key = self.get_successor(x, i)
            x.keys[i] = succ_key
            self.delete(x.child[i + 1], succ_key)
        # Case 2c: Both children have fewer than t keys
        else:
            self.merge(x, i)
            self.delete(x.child[i], k)

    def get_predecessor(self, x, i):
        cur = x.child[i]
        while not cur.leaf:
            cur = cur.child[len(cur.child) - 1]
        return cur.keys[len(cur.keys) - 1]

    def get_successor(self, x, i):
        cur = x.child[i + 1]
        while not cur.leaf:
            cur = cur.child[0]
        return cur.keys[0]

    # Merge function to merge two children
    def merge(self, x, i):
        t = self.t
        child = x.child[i]
        sibling = x.child[i + 1]
        # Merge key from x to child
        child.keys.append(x.keys[i])
        child.keys.extend(sibling.keys)

        if not child.leaf:
            child.child.extend(sibling.child)
        # Remove the key from x and delete sibling from child list
        x.keys.pop(i)
        x.child.pop(i + 1)
        # If root becomes empty, reduce the height of the tree
        if len(x.keys) == 0:
            self.root = child

    # Fill function to ensure child has at least t keys
    def fill(self, x, i):
        t = self.t
        # Borrow from the previous sibling
        if i != 0 and len(x.child[i - 1].keys) >= t:
            self.borrow_from_prev(x, i)
        # Borrow from the next sibling
        elif i != len(x.child) - 1 and len(x.child[i + 1].keys) >= t:
            self.borrow_from_next(x, i)
        # Merge with sibling
        else:
            if i != len(x.child) - 1:
                self.merge(x, i)
            else:
                self.merge(x, i - 1)

    def borrow_from_prev(self, x, i):
        child = x.child[i]
        sibling = x.child[i - 1]
        # Move the last key from sibling to x
        child.keys.insert(0, x.keys[i - 1])
        x.keys[i - 1] = sibling.keys.pop()
        # Move the last child of sibling to child if sibling is not a leaf
        if not child.leaf:
            child.child.insert(0, sibling.child.pop())

    def borrow_from_next(self, x, i):
        child = x.child[i]
        sibling = x.child[i + 1]
        # Move the first key from sibling to x
        child.keys.append(x.keys[i])
        x.keys[i] = sibling.keys.pop(0)
        # Move the first child of sibling to child if sibling is not a leaf
        if not child.leaf:
            child.child.append(sibling.child.pop(0))

    # Print the tree
    def print_tree(self, x, l=0):
        print("Level ", l, " ", len(x.keys), end=":")
        for i in x.keys:
            print(i, end=" ")
        print()
        l += 1
        if len(x.child) > 0:
            for i in x.child:
                self.print_tree(i, l)



In [None]:

# Example usage
B = BTree(3)
for i in range(100):
    B.insert((i, 2 * i))
B.print_tree(B.root)
for i in range(30):
    print(f"Delete {i * 3}")
    B.delete(B.root, (i * 3,))
for i in range(1000):
    B.insert((i * 2, 2 * i))
for i in range(300):
    B.insert((i * 3, 2 * i))
B.print_tree(B.root)
print("\n")
B.print_tree(B.root)

In [93]:
class BTreeCursor:
    def __init__(self, btree):
        self.tree = btree
        self.stack = [] 
        
        if self.tree.root and len(self.tree.root.keys) > 0:
            self._push_leftmost(self.tree.root)

    def _push_rightmost(self, node):
        curr = node
        while True:
            if curr.leaf:
                self.stack.append([curr, len(curr.keys) - 1])
                break
            else:
                self.stack.append([curr, len(curr.keys)])
                curr = curr.child[len(curr.keys)]


    def _push_leftmost(self, node):
        curr = node
        while True:
            self.stack.append([curr, 0]) # Start at index 0 for every node entered
            if curr.leaf:
                break
            curr = curr.child[0]

    def is_valid(self):
        return len(self.stack) > 0

    def current(self):
        if not self.is_valid():
            return None
        
        node, idx = self.stack[-1]
        return node.keys[idx]

    def move_to_start(self):
        self.stack = []
        if self.tree.root and len(self.tree.root.keys) > 0:
            self._push_leftmost(self.tree.root)

    def move_to_end(self):
        self.stack = []
        if self.tree.root and len(self.tree.root.keys) > 0:
            self._push_rightmost(self.tree.root)

    def advance(self):
        if not self.stack:
            return
        node, idx = self.stack[-1]

        if node.leaf:
            self.stack[-1][1] += 1
        else:
            self.stack[-1][1] += 1
            right_child = node.child[idx + 1]
            self._push_leftmost(right_child)
            return

        while self.stack:
            node, idx = self.stack[-1]
            if idx < len(node.keys):
                return
            else:
                self.stack.pop()

    def decrease(self):
        if not self.stack:
            return
        
        node, idx = self.stack[-1]
        
        if node.leaf:
            self.stack[-1][1] -= 1
        else:
            left_child = node.child[idx]
            self._push_rightmost(left_child)
            return

        while self.stack:
            node, idx = self.stack[-1]
            if idx >= 0:
                return
            else:
                self.stack.pop()
                if self.stack:
                    self.stack[-1][1] -= 1

    def go_to(self, k):
        """
        Moves the cursor to the key k if it exists (exact match).
        If k does not exist, the cursor becomes invalid.
        """
        self.stack = []
        if not self.tree.root or len(self.tree.root.keys) == 0:
            return

        curr = self.tree.root
        
        while True:
            i = 0
            while i < len(curr.keys) and k > curr.keys[i][0]:
                i += 1
            
            if i < len(curr.keys) and curr.keys[i][0] == k:
                self.stack.append([curr, i])
                return
                
            if curr.leaf:
                self.stack = [] # Invalidate
                return
            
            self.stack.append([curr, i])
            curr = curr.child[i]


    def go_to_less_than_equal(self, k):
        """
        Moves the cursor to the key k if it exists. 
        If k does not exist, moves the cursor to the largest key strictly less than k.
        If k is smaller than the smallest key, the cursor becomes invalid.
        """
        self.stack = []
        if not self.tree.root or len(self.tree.root.keys) == 0:
            return

        curr = self.tree.root
        found = False
        
        while True:
            i = 0
            while i < len(curr.keys) and k > curr.keys[i][0]:
                i += 1
            
            self.stack.append([curr, i])
            
            if i < len(curr.keys) and curr.keys[i][0] == k:
                found = True
                break
                
            if curr.leaf:
                break
            
            curr = curr.child[i]

        if not found:
            _, i = self.stack[-1]

            if i > 0:
                self.stack[-1][1] = i - 1
            else:
               self.stack = []

    def __iter__(self):
        return self

    def __next__(self):
        if not self.is_valid():
            raise StopIteration
        val = self.current()
        self.advance()
        return val


In [None]:
tree = BTree(3)

root = tree.root
root.leaf = False
root.keys = [10]

left_child = BTreeNode(leaf=True)
left_child.keys = [1, 5]

right_child = BTreeNode(leaf=True)
right_child.keys = [15, 20]

root.child = [left_child, right_child]

cursor = BTreeCursor(tree)

print("Traversing BTree:")
while cursor.is_valid():
    print(f"Value: {cursor.current()}")
    cursor.advance()

Traversing BTree:
Value: 1
Value: 5
Value: 10
Value: 15
Value: 20


In [None]:
import unittest
import random
class TestBTreeCursorStress(unittest.TestCase):
    def setUp(self, t = 3):
        self.t_param = t
        self.tree = BTree(self.t_param)

    def collect_cursor_values(self, tree):
        cursor = BTreeCursor(tree)
        values = []
        while cursor.is_valid():
            values.append(cursor.current())
            cursor.advance()
        return values

    def test_01_basic_insert(self):
        print("\n[Test] Basic Insert")
        keys = [10, 20, 5, 6, 12, 30, 7, 17]
        for k in keys:
            self.tree.insert((k, k))

        cursor_vals = self.collect_cursor_values(self.tree)
        expected = [(k, k) for k in sorted(keys)]
        self.assertEqual(
            cursor_vals, expected, "Cursor did not traverse in sorted order"
        )

    def test_02_stress_insert(self):
        print("\n[Test] Stress Insert (1000 items)")
        count = 1000
        random_keys = [random.randint(0, 10000) for _ in range(count)]
        unique_keys = list(set(random_keys))

        for k in unique_keys:
            self.tree.insert((k, k))

        cursor_vals = self.collect_cursor_values(self.tree)
        expected = [(k, k) for k in sorted(unique_keys)]

        self.assertEqual(len(cursor_vals), len(expected), "Cursor missed some keys")
        self.assertEqual(cursor_vals, expected, "Stress insert: Cursor order incorrect")

    def test_03_stress_delete(self):
        print("\n[Test] Stress Delete (Insert 500, Delete 250)")
        count = 500
        keys = list(range(count))
        random.shuffle(keys)

        for k in keys:
            self.tree.insert((k, k))

        to_delete = keys[:250]
        remaining = [(k, k) for k in sorted(keys[250:])]

        for k in to_delete:
            self.tree.delete(self.tree.root, (k, ))

        cursor_vals = self.collect_cursor_values(self.tree)
        self.assertEqual(
            cursor_vals, remaining, "Cursor data incorrect after mass deletion"
        )

    def test_04_cursor_validity_during_empty(self):
        print("\n[Test] Empty Tree")
        cursor = BTreeCursor(self.tree)
        self.assertFalse(cursor.is_valid())
        self.assertIsNone(cursor.current())
        self.tree.insert((10, 10))
        self.tree.delete(self.tree.root, (10,))

        cursor = BTreeCursor(self.tree)
        self.assertFalse(
            cursor.is_valid(), "Cursor should be invalid after deleting last node"
        )
    
    def test_05_bidirectional_traversal(self):
        print("\n[Test] Bidirectional Traversal")
        keys = [10, 20, 5, 15, 25, 30]
        for k in keys:
            self.tree.insert((k, k))
        expected = [(k, k) for k in sorted(keys)]
        cursor = BTreeCursor(self.tree)
        
        cursor.move_to_end()
        self.assertEqual(cursor.current(), expected[-1])
      
        reverse_collected = []
        while cursor.is_valid():
            reverse_collected.append(cursor.current())
            cursor.decrease()

        cursor_vals = self.collect_cursor_values(self.tree)
        self.assertEqual(cursor_vals, expected, "Regular Traversal: Cursor incorrect")
        self.assertEqual(reverse_collected, expected[::-1], "Reverse traversal failed")

    def test_06_zigzag_movement(self):
        print("\n[Test] Zigzag Movement")
        keys = [1, 2, 3, 4, 5]
        for k in keys:
            self.tree.insert((k, k))
            
        cursor = BTreeCursor(self.tree)
        cursor.move_to_start() # At 1
        
        self.assertEqual(cursor.current(), (1, 1))
        cursor.advance() # At 2
        self.assertEqual(cursor.current(), (2, 2))
        cursor.advance() # At 3
        self.assertEqual(cursor.current(), (3, 3))
        cursor.decrease() # Back to 2
        self.assertEqual(cursor.current(), (2, 2))
        cursor.decrease() # Back to 1
        self.assertEqual(cursor.current(), (1, 1))
        cursor.decrease() # Invalid (before 1)
        self.assertFalse(cursor.is_valid())
    
    def test_07_large_random_zigzag(self):
        print("\n[Test] Large Random Zigzag")
        N = 200
        keys = list(range(N))
        random.shuffle(keys)
        for k in keys:
            self.tree.insert((k, k))
        
        expected = [(k, k) for k in sorted(keys)]
        cursor = BTreeCursor(self.tree)
        curr_idx = 0
        M = 1000
        
        for _ in range(M):
            direction = random.choice([-1, 1])
            steps = random.randint(1, 5)
            
            if direction == 1:
                steps_possible = (N - 1) - curr_idx
                real_steps = min(steps, steps_possible) if steps_possible >= 0 else 0
                
                for _ in range(real_steps):
                    cursor.advance()
                curr_idx += real_steps
            else:
                real_steps = min(steps, curr_idx)
                
                for _ in range(real_steps):
                    cursor.decrease()
                curr_idx -= real_steps
                
            if 0 <= curr_idx < N:
                self.assertTrue(cursor.is_valid(), f"Cursor became invalid unexpectedly at idx {curr_idx}")
                self.assertEqual(cursor.current(), expected[curr_idx], 
                                 f"Mismatch after zig-zag. Expected {expected[curr_idx]}, got {cursor.current()} at idx {curr_idx}")

    def test_08_boundary_zigzag(self):
        """Test zigzagging specifically near start and end."""
        print("\n[Test] Boundary Zigzag")
        keys = [0, 1, 2, 3, 4]
        for k in keys:
            self.tree.insert((k, k))
            
        cursor = BTreeCursor(self.tree)
        
        cursor.advance()
        cursor.decrease()
        self.assertEqual(cursor.current(), (0, 0))
    
        cursor.decrease() # Invalid
        self.assertFalse(cursor.is_valid())
        
        cursor.move_to_start()
        self.assertEqual(cursor.current(), (0, 0))
        
        cursor.move_to_end()
        self.assertEqual(cursor.current(), (4, 4))
        cursor.decrease()
        cursor.advance()
        self.assertEqual(cursor.current(), (4, 4))
        
        cursor.advance()
        self.assertFalse(cursor.is_valid())
    
    def test_09_go_to_less_than_equal_found(self):
        print("\n[Test] go_to_less_than_equal (Found Key)")
        keys = [10, 20, 5, 15, 25, 30] # Tree structure: [10, 20] split at root
        for k in keys:
            self.tree.insert((k, k))
        
        cursor = BTreeCursor(self.tree)
        
        cursor.go_to_less_than_equal(5)
        self.assertTrue(cursor.is_valid())
        self.assertEqual(cursor.current(), (5, 5))
        
        cursor.go_to_less_than_equal(20)
        self.assertTrue(cursor.is_valid())
        self.assertEqual(cursor.current(), (20, 20))
        
        cursor.go_to_less_than_equal(30)
        self.assertTrue(cursor.is_valid())
        self.assertEqual(cursor.current(), (30, 30))

    def test_10_go_to_less_than_equal_not_found_predecessor(self):
        print("\n[Test] go_to_less_than_equal (Not Found - Predecessor)")
        keys = [10, 20, 30]
        for k in keys:
            self.tree.insert((k, k)) # Tree is small: [10, 20, 30] in root

        cursor = BTreeCursor(self.tree)

        cursor.go_to_less_than_equal(15)
        self.assertTrue(cursor.is_valid())
        self.assertEqual(cursor.current(), (10, 10))
        cursor.advance() # Check next item
        self.assertEqual(cursor.current(), (20, 20))

        cursor.go_to_less_than_equal(35)
        self.assertTrue(cursor.is_valid())
        self.assertEqual(cursor.current(), (30, 30))
        cursor.advance() # Should go invalid
        self.assertFalse(cursor.is_valid())

    def test_11_go_to_less_than_equal_not_found_invalid(self):
        print("\n[Test] go_to_less_than_equal (Not Found - Invalid)")
        keys = [10, 20, 30]
        for k in keys:
            self.tree.insert((k, k))
        
        cursor = BTreeCursor(self.tree)

        cursor.go_to_less_than_equal(5)
        self.assertFalse(cursor.is_valid())
        
        cursor.move_to_start()
        self.assertTrue(cursor.is_valid())
        self.assertEqual(cursor.current(), (10, 10))

    def test_12_go_to_found(self):
        print("\n[Test] go_to (Exact Match)")
        keys = [10, 20, 5, 15, 25, 30]
        for k in keys:
            self.tree.insert((k, k))
        
        cursor = BTreeCursor(self.tree)
        
        cursor.go_to(5)
        self.assertTrue(cursor.is_valid())
        self.assertEqual(cursor.current(), (5, 5))
        
        cursor.go_to(20)
        self.assertTrue(cursor.is_valid())
        self.assertEqual(cursor.current(), (20, 20))
        
        cursor.go_to(30)
        self.assertTrue(cursor.is_valid())
        self.assertEqual(cursor.current(), (30, 30))

    def test_13_go_to_not_found_invalid(self):
        print("\n[Test] go_to & go_to_less_than_equal (Invalid Cases)")
        keys = [10, 20, 30]
        for k in keys:
            self.tree.insert((k, k))
        
        cursor = BTreeCursor(self.tree)

        cursor.go_to(15)
        self.assertFalse(cursor.is_valid())
        
        cursor.go_to_less_than_equal(5)
        self.assertFalse(cursor.is_valid())
        
        cursor.go_to(5)
        self.assertFalse(cursor.is_valid())
        
        cursor.go_to_less_than_equal(10)
        self.assertTrue(cursor.is_valid())
        self.assertEqual(cursor.current(), (10, 10))

        cursor.go_to(20)
        self.assertTrue(cursor.is_valid())
        self.assertEqual(cursor.current(), (20, 20))

    def test_14_stress_go_to_and_traversal(self):
        print("\n[Test] Stress go_to and Traversal")
        N = 1500
        M = 500
        
        keys_to_insert = list(range(1000, 1000 + N))
        random.shuffle(keys_to_insert)
        for k in keys_to_insert:
            self.tree.insert((k, k))
        
        expected = [(k, k) for k in sorted(keys_to_insert)]
        
        cursor = BTreeCursor(self.tree)
        
        for _ in range(M):
            target_key = random.choice(expected)[0]
            
            target_idx = expected.index((target_key, target_key))

            cursor.go_to(target_key)
            self.assertTrue(cursor.is_valid(), f"go_to failed to find key {target_key}")
            self.assertEqual(cursor.current()[0], target_key, f"go_to positioned incorrectly for key {target_key}")
            
            decrease_steps = random.randint(1, 5)
            steps_to_move_back = min(decrease_steps, target_idx)
            
            for _ in range(steps_to_move_back):
                cursor.decrease()
            
            expected_idx_after_decrease = target_idx - steps_to_move_back
            
            if expected_idx_after_decrease >= 0:
                self.assertTrue(cursor.is_valid())
                self.assertEqual(cursor.current(), expected[expected_idx_after_decrease], 
                                 f"Decrease failed after go_to. Expected {expected[expected_idx_after_decrease]}, got {cursor.current()}")
            
            current_idx = expected_idx_after_decrease
            
            advance_steps = 10
            steps_to_move_forward = min(advance_steps, N - 1 - current_idx)

            for _ in range(steps_to_move_forward):
                cursor.advance()
            
            expected_idx_after_advance = current_idx + steps_to_move_forward
            
            if expected_idx_after_advance < N:
                self.assertTrue(cursor.is_valid())
                self.assertEqual(cursor.current(), expected[expected_idx_after_advance], 
                                 f"Advance failed after decrease. Expected {expected[expected_idx_after_advance]}, got {cursor.current()}")
            else:
                cursor.advance()
                self.assertFalse(cursor.is_valid(), "Cursor should be invalid after advancing past the last element")



In [108]:
test = TestBTreeCursorStress()
test.setUp()
test.test_01_basic_insert()
test.setUp()
test.test_02_stress_insert()
test.setUp()
test.test_03_stress_delete()
test.setUp()
test.test_04_cursor_validity_during_empty()
test.setUp()
test.test_05_bidirectional_traversal()
test.setUp()
test.test_06_zigzag_movement()
test.setUp()
test.test_07_large_random_zigzag()
test.setUp()
test.test_08_boundary_zigzag()
test.setUp()
test.test_09_go_to_less_than_equal_found()
test.setUp()
test.test_10_go_to_less_than_equal_not_found_predecessor()
test.setUp()
test.test_11_go_to_less_than_equal_not_found_invalid()
test.setUp()
test.test_12_go_to_found()
test.setUp()
test.test_13_go_to_not_found_invalid()
for i in range(3, 8):
    test.setUp(t=i)
    test.test_14_stress_go_to_and_traversal()


[Test] Basic Insert

[Test] Stress Insert (1000 items)

[Test] Stress Delete (Insert 500, Delete 250)

[Test] Empty Tree

[Test] Bidirectional Traversal

[Test] Zigzag Movement

[Test] Large Random Zigzag

[Test] Boundary Zigzag

[Test] go_to_less_than_equal (Found Key)

[Test] go_to_less_than_equal (Not Found - Predecessor)

[Test] go_to_less_than_equal (Not Found - Invalid)

[Test] go_to (Exact Match)

[Test] go_to & go_to_less_than_equal (Invalid Cases)

[Test] Stress go_to and Traversal

[Test] Stress go_to and Traversal

[Test] Stress go_to and Traversal

[Test] Stress go_to and Traversal

[Test] Stress go_to and Traversal
