In [1]:
from typing import Optional, List
class LinkedListNode:
    def __init__(self, key=None, value=0, next=None, prev=None):
        self.key = key
        self.value = value
        self.next = next
        self.prev = prev
    
    def __repr__(self):
        return str(self.value)

class LinkedList:
    def __init__(self, iterable: List[int] = []):
        self.head = None
        self.tail = None
        self.size = 0

        if iterable:
            for value in iterable:
                self.append(LinkedListNode(value))
                self.size += 1
        
    def __bool__(self):
        return self.size != 0

    def __repr__(self):
        nodes = []
        current_node = self.head
        while current_node:
            nodes.append(str(current_node.value))
            current_node = current_node.next
        return ' <-> '.join(nodes)

    def append(self, new_node: LinkedListNode|int):
        if type(new_node) is int:
            new_node = LinkedListNode(new_node)
            
        if self.head is None:
            self.head = new_node
            self.tail = new_node
        else:
            self.tail.next = new_node
            new_node.prev = self.tail
            self.tail = new_node
        self.size += 1

    def remove(self, node: LinkedListNode):
        """
        Removes a given node from the linked list
        """

        if not node:
            return 
        
        if self.size == 1:
            self.head = None
            self.tail = None
        elif node == self.head:
            self.head = self.head.next
            self.head.prev = None
        elif node == self.tail:
            self.tail = self.tail.prev
            self.tail.next = None
        else:
            node.prev.next = node.next
            node.next.prev = node.prev
    
        self.size -= 1
        return
    

    def find_key(self, key: int) -> Optional[LinkedListNode]:
        """
        Returns a pointer to the first node with the given value
        """

        current_node = self.head
        while current_node:
            if current_node.key == key:
                return current_node
            current_node = current_node.next

        return None

In [2]:
class ClosedAddressingHashTable:

    """
    Closed Addressing Hash Table implementation from scratch.
    Closed addressing is a technique for resolving collisions in hash tables that involves
    using a fixed-size array to store the hash table entries and chaining to resolve collisions.

    Chaining:
        - Each slot in the hash table contains some data structure of items that have the same hash value.
            - Today I will use a linked list. Dynamic arrays, or BSTs can also be used.
        - When an item is inserted into the hash table, it is added to the end of the linked list along with the key. This way we can look for it properly later.
            
    Dynamic resizing:
        - When the load factor exceeds the max load factor, the hash table is resized by doubling its capacity.


    """
    def __init__(self, capacity: int = 1999, load_factor: float = 0.75):
        self.size = 0
        self.capacity = capacity
        self.table = [LinkedList() for _ in range(capacity)]
        self.current_load_factor = 0
        self.max_load_factor = load_factor
   
    def __contains__(self, key) -> bool:
        table_idx = hash(key) % self.capacity
        ll = self.table[table_idx]
        result = ll.find_key(key)
        return result is not None
    
    def __len__(self):
        return self.size
    
    def __getitem__(self, key):
        return self.get(key)
    
    def __setitem__(self, key, value):
        self.set(key, value)
        
    def __delitem__(self, key):
        self.remove(key)

    def __repr__(self):
        result = ''
        for ll in self.table:
            if ll:
                result += f"{ll}\n"
        return result
    
    """Helper methods"""

    def _check_load_factor(self):
        if self.current_load_factor >= self.max_load_factor:
            self._resize(self.capacity * 2)
        self.current_load_factor = self.size/self.capacity
        return self.current_load_factor
        
    
    def _resize(self, new_capacity: int):
        number_of_linkedlists_to_add = new_capacity - self.capacity
        self.capacity = new_capacity
        self.table += [LinkedList() for _ in range(number_of_linkedlists_to_add)]

    """Functionality"""

    def set(self, key, value):
        # Check if the key is in the hashtable
        table_idx = hash(key) % self.capacity
        ll = self.table[table_idx]
        node = ll.find_key(key)
        # If the key is not found, append new node
        if node is None:
            self._check_load_factor()
            node = LinkedListNode(key, value)
            ll.append(node)
            self.size += 1
            
        # If it is, update the value
        else:
            node.value = value

    def remove(self, key):
        table_idx = hash(key) % self.capacity
        ll = self.table[table_idx]
        node = ll.find_key(key)
        if node is None:
            return False
        else:
            ll.remove(node)
            self.size -= 1
            return True
        
    def get(self, key):
        table_idx = hash(key) % self.capacity
        ll = self.table[table_idx]
        node = ll.find_key(key)

        if node is None:
            raise KeyError(f"{key} not found")
        else:
            return node.value

d = ClosedAddressingHashTable(4)
d[1] = 1
d[2] = 2
d[33] = 33
d[2] = 2.5
d[3] = 3
d[4] = 4
print(d)
print(1 in d)
print(32 in d)
print(d.size, d.current_load_factor, d.capacity)
        

4
1 <-> 33
2.5
3

True
False
5 0.5 8


In [3]:
class OpenAddressingHashTable:
    """
    Open Addressing Hash Table implementation from scratch.
    Open addressing also known as probing is a technique for resolving collisions in hash tables.
    There are two main types of open addressing: linear probing and quadratic probing.
    Probing is used to resolve collisions by finding the next available slot in the hash table, and slotting in the element there.

    Linear probing:
        h(key) = (hash(key) + i) % capacity
        i = 0, 1, 2, ...

    Quadratic probing:
        h(key) = (hash(key) + i**2) % capacity
        i = 0, 1, 2, ...
    """

    def __init__(self, capacity: int = 1999, probe_type: str = 'linear', load_factor: float = 0.75):
        self.size = 0
        self.capacity = capacity 
        self.table = [(None, None)] * capacity
        self.current_load_factor = 0
        self.max_load_factor = load_factor

        probe_types = {
            'linear': self._linear_probe,
            'quadratic': self._quadratic_probe,
        }
        
        if probe_type not in probe_types:
            raise ValueError(f"Invalid probe type: {probe_type}. Valid probe types are: {list(probe_types.keys())}")
        else:
            self.probe_function = probe_types[probe_type]

   
    def __contains__(self, key) -> bool:
        result = self.get(key)
        return result is not None
    
    def __len__(self):
        return self.size
    
    def __getitem__(self, key):
        return self.get(key)
    
    def __setitem__(self, key, value):
        self.set(key, value)

    def __delitem__(self, key):
        self.remove(key)

    def __repr__(self):
        result = ''
        for i, item in enumerate(self.table):
            if item[1] is not None:
                result += f"{i}: {item}\n"
        return result

    """Probing methods"""

    def _linear_probe(self, idx, i):
        return (idx + i) % self.capacity
        
    def _quadratic_probe(self, idx, i):
        return (idx + i**2) % self.capacity
    
    """Helper methods"""

    def _check_load_factor(self):
        if self.current_load_factor >= self.max_load_factor:
            self._resize(self.capacity * 2)
        self.current_load_factor = self.size/self.capacity
        return self.current_load_factor 
    
    def _resize(self, new_capacity: int):
        number_of_spaces_to_add = new_capacity - self.capacity
        self.capacity = new_capacity
        self.table += [(None, None)] * number_of_spaces_to_add

    """Functionality"""

    def set(self, key, value):
        self._check_load_factor()
        table_idx = hash(key) % self.capacity
        k, v = self.table[table_idx]
        n_collisions = 0
        while k is not None and k != key and k != -1:
            table_idx = self.probe_function(table_idx, n_collisions)
            k, v = self.table[table_idx]
            n_collisions += 1
        if k is None or k == -1:
            self.size += 1
        self.table[table_idx] = (key, value)

    def remove(self, key) -> bool:
        table_idx = hash(key) % self.capacity
        k, v = self.table[table_idx]
        n_collisions = 0
        while k is not None:
            if k == key:
                self.table[table_idx] = (-1, None)
                self.size -= 1
                return True
            table_idx = self.probe_function(table_idx, n_collisions)
            k, v = self.table[table_idx]
            n_collisions += 1
        
        # If not found
        return False
        
    def get(self, key) -> Optional[int]:
        table_idx = hash(key) % self.capacity
        n_collisions = 0
        k, v = self.table[table_idx]
        while k is not None and n_collisions < self.capacity:
            if k == key:
                return v
            
            table_idx = self.probe_function(table_idx, n_collisions)
            k, v = self.table[table_idx]
            n_collisions += 1
        return None


d = OpenAddressingHashTable(32, 'quadratic')
d[1] = 1
d[2] = 2
d[33] = 33
d[2] = 2.5
d[3] = 3
d[4] = 4
d[9] = 9
print(d)
print(1 in d)
print(32 in d)
print(d.size, d.current_load_factor, d.capacity)
        


1: (1, 1)
2: (2, 2.5)
3: (3, 3)
4: (4, 4)
6: (33, 33)
9: (9, 9)

True
False
6 0.15625 32


## Tests

### Methods
    - We'll test these hashtables on numerical data (ints) and text data (strings).
    - We'll measure the memory footprint and CPU time across a series of insertions, retrievals, and deletions.

We'll try running the different hashtables across sizes of 1e2, 1e3, 1e4, and 1e5
and track the memory usage and runtime of each for different data types. Then we can aggregate and plot the data to investigate the results!

In [4]:
%pip install tqdm

Note: you may need to restart the kernel to use updated packages.


In [5]:
import tracemalloc
import time
import random
import string
from collections import defaultdict
from tqdm import tqdm
import pandas as pd

In [6]:
class Test:
    def __init__(self):
        self.n_trials = 100
        self.separate_chaining = ClosedAddressingHashTable()
        self.open_addressing_linear = OpenAddressingHashTable()
        self.open_addressing_quadratic = OpenAddressingHashTable(probe_type='quadratic')
        self.results_strings = []
        self.results_numbers = []
        

    def profile_hashtable(self, table, items: List[List]):
        # Generate the data
        tracemalloc.start()
        start = time.time()
        d = {}
        for key, value in items:
            table[key] = value
            d[key] = value
        for key, value in items:
            assert table[key] == d[key], f"Error at key {key}: Value {table[key]} is not equal to {d[key]}"
        for key, value in items:
            if key in table:
                del table[key]
                del d[key]

        end = time.time()
        _, peak = tracemalloc.get_traced_memory()
        tracemalloc.stop()
        return peak, end - start
    
    def test1(self, dataset_sizes: List[int] = [100, 1_000, 10_000, 100_000]):

        # Test on integers
        for size in dataset_sizes:
            print(f'Testing dataset of size {size}')
            items = [[random.randint(0, 1000), random.randint(0, 1000)] for _ in range(size)]
            for _ in tqdm(range(self.n_trials)):
                peak, runtime = self.profile_hashtable(self.separate_chaining, items)
                self.results_numbers.append(('Separate Chaining', size, peak, runtime))
                peak, runtime = self.profile_hashtable(self.open_addressing_linear, items)
                self.results_numbers.append(('Open Addressing Linear', size, peak, runtime))
                peak, runtime = self.profile_hashtable(self.open_addressing_quadratic, items)
                self.results_numbers.append(('Open Addressing Quadratic', size, peak, runtime))

        # Test on strings
        for size in dataset_sizes:
            print(f'Testing dataset of size {size}')
            items = [[random.choice(string.ascii_letters), random.choice(string.ascii_letters)] for _ in range(size)]
            for _ in tqdm(range(self.n_trials)):
                peak, runtime = self.profile_hashtable(self.separate_chaining, items)
                self.results_strings.append(('Separate Chaining', size, peak, runtime))
                peak, runtime = self.profile_hashtable(self.open_addressing_linear, items)
                self.results_strings.append(('Open Addressing Linear', size, peak, runtime))
                peak, runtime = self.profile_hashtable(self.open_addressing_quadratic, items)
                self.results_strings.append(('Open Addressing Quadratic', size, peak, runtime))

        return
    
    


In [7]:
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
test = Test()
test.test1()

Testing dataset of size 100


100%|██████████| 100/100 [00:00<00:00, 230.78it/s]


Testing dataset of size 1000


100%|██████████| 100/100 [00:03<00:00, 25.17it/s]


Testing dataset of size 10000


 41%|████      | 41/100 [00:42<01:01,  1.04s/it]

NoneType