### Hybrid Join

In [4]:
import random
import time
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Any, Optional
from collections import defaultdict


class SkipNode:
    """A node in the Skip List"""
    
    def __init__(self, key: Any, value: Any, level: int):
        self.key = key
        self.value = value  # Could be a list of values if there are duplicates
        self.forward = [None] * (level + 1)  # Array of pointers for each level
        

class SkipList:
    """Skip list implementation with search, insert, and delete operations"""
    
    def __init__(self, max_level: int = 16, p: float = 0.5):
        self.max_level = max_level  # Maximum level of the skip list
        self.p = p  # Probability of promoting to next level
        self.level = 0  # Current maximum level of skip list
        
        # Create head node with key set to None (will be smaller than all real keys)
        self.head = SkipNode(None, None, max_level)
    
    def random_level(self) -> int:
        """Randomly determine the level for a new node"""
        level = 0
        while random.random() < self.p and level < self.max_level:
            level += 1
        return level
    
    def search(self, key: Any) -> Optional[Any]:
        """Search for a key in the skip list"""
        current = self.head
        
        # Start from the highest level and work down
        for i in range(self.level, -1, -1):
            # Move forward at the current level as far as possible
            while current.forward[i] and current.forward[i].key < key:
                current = current.forward[i]
        
        # Move to the node right after the last smaller key
        current = current.forward[0]
        
        # Return the value if the key matches, otherwise None
        if current and current.key == key:
            return current.value
        return None
    
    def insert(self, key: Any, value: Any):
        """Insert a new key-value pair into the skip list"""
        # Array to track updates at each level
        update = [None] * (self.max_level + 1)
        current = self.head
        
        # Find the position to insert the new node
        for i in range(self.level, -1, -1):
            while current.forward[i] and current.forward[i].key < key:
                current = current.forward[i]
            update[i] = current
        
        # Move to the next node
        current = current.forward[0]
        
        # If key already exists, update the value
        if current and current.key == key:
            if isinstance(current.value, list):
                current.value.append(value)
            else:
                current.value = [current.value, value]
            return
        
        # Generate a random level for the new node
        new_level = self.random_level()
        
        # Update the skip list's level if the new level is higher
        if new_level > self.level:
            for i in range(self.level + 1, new_level + 1):
                update[i] = self.head
            self.level = new_level
        
        # Create a new node
        new_node = SkipNode(key, value, new_level)
        
        # Insert the new node by updating the forward links
        for i in range(new_level + 1):
            new_node.forward[i] = update[i].forward[i]
            update[i].forward[i] = new_node


class HybridSkipHashIndex:
    """
    A hybrid index that uses hash table for initial bucketing and skip lists within each bucket.
    This combines O(1) lookup of hash tables with the ordered structure benefits of skip lists.
    """
    
    def __init__(self, bucket_bits: int = 8, max_level: int = 16, p: float = 0.5):
        self.bucket_bits = bucket_bits  # Determines number of buckets (2^bucket_bits)
        self.num_buckets = 1 << bucket_bits  # 2^bucket_bits
        self.buckets = [SkipList(max_level, p) for _ in range(self.num_buckets)]
    
    def _hash_key(self, key: Any) -> int:
        """Hash function to determine the bucket"""
        # Use Python's hash function and mask to get bucket index
        hash_value = hash(key) & 0x7FFFFFFF  # Ensure positive value
        return hash_value % self.num_buckets
    
    def insert(self, key: Any, value: Any):
        """Insert a key-value pair into the appropriate bucket's skip list"""
        bucket_idx = self._hash_key(key)
        self.buckets[bucket_idx].insert(key, value)
    
    def search(self, key: Any) -> Optional[Any]:
        """Search for a key in the appropriate bucket's skip list"""
        bucket_idx = self._hash_key(key)
        return self.buckets[bucket_idx].search(key)
    
    def build_index(self, data: List[Dict], key_field: str):
        """Build the hybrid index from a list of dictionaries"""
        for record in data:
            key = record[key_field]
            self.insert(key, record)


def hybrid_skip_hash_join(left_data: List[Dict], right_data: List[Dict], 
                         left_key: str, right_key: str) -> List[Dict]:
    """
    Perform an inner join using a hybrid hash/skip list index on the right table.
    This approach combines the constant-time initial lookup of hash tables
    with the ordered structure benefits of skip lists.
    """
    # Build hybrid index on the right table
    start_build = time.time()
    
    # Determine appropriate bucket bits based on table size
    # More buckets for larger tables to keep skip lists small
    bucket_bits = min(12, max(8, int(np.log2(len(right_data) / 100))))
    index = HybridSkipHashIndex(bucket_bits=bucket_bits)
    
    for record in right_data:
        key = record[right_key]
        index.insert(key, record)
    
    build_time = time.time() - start_build
    print(f"Hybrid index build time: {build_time:.6f} seconds")
    
    # Perform the join by searching the hybrid index for each record in the left table
    start_join = time.time()
    result = []
    
    for left_record in left_data:
        left_val = left_record[left_key]
        right_records = index.search(left_val)
        
        if right_records:
            # Handle case where we have multiple matches
            if isinstance(right_records, list):
                for right_record in right_records:
                    joined_record = {**left_record, **right_record}
                    result.append(joined_record)
            else:
                joined_record = {**left_record, **right_records}
                result.append(joined_record)
    
    join_time = time.time() - start_join
    print(f"Hybrid join time: {join_time:.6f} seconds")
    print(f"Total hybrid join time: {build_time + join_time:.6f} seconds")
    
    return result


def multi_threaded_hash_join(left_data: List[Dict], right_data: List[Dict], 
                            left_key: str, right_key: str, num_threads: int = 4) -> List[Dict]:
    """
    Perform a hash join using multiple threads for parallelization.
    This is included for comparison with the hybrid approach.
    """
    from concurrent.futures import ThreadPoolExecutor
    import threading
    
    # Build hash table index on the right table
    start_build = time.time()
    hash_table = {}
    
    for record in right_data:
        key = record[right_key]
        if key in hash_table:
            hash_table[key].append(record)
        else:
            hash_table[key] = [record]
    
    build_time = time.time() - start_build
    print(f"MT Hash table build time: {build_time:.6f} seconds")
    
    # Split left data for parallel processing
    chunk_size = (len(left_data) + num_threads - 1) // num_threads
    chunks = [left_data[i:i+chunk_size] for i in range(0, len(left_data), chunk_size)]
    
    # Shared result list with lock for thread safety
    result = []
    result_lock = threading.Lock()
    
    def process_chunk(chunk):
        """Process a chunk of the left table and add results to the shared result list"""
        local_result = []
        for left_record in chunk:
            left_val = left_record[left_key]
            if left_val in hash_table:
                for right_record in hash_table[left_val]:
                    joined_record = {**left_record, **right_record}
                    local_result.append(joined_record)
        
        # Add local results to shared result list
        with result_lock:
            result.extend(local_result)
    
    # Perform the join in parallel
    start_join = time.time()
    
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        executor.map(process_chunk, chunks)
    
    join_time = time.time() - start_join
    print(f"MT Hash join time: {join_time:.6f} seconds")
    print(f"Total MT hash join time: {build_time + join_time:.6f} seconds")
    
    return result


def bloom_filter_hash_join(left_data: List[Dict], right_data: List[Dict], 
                          left_key: str, right_key: str) -> List[Dict]:
    """
    Perform a hash join using a Bloom filter to quickly filter out non-matching records.
    This is another optimization strategy for comparison.
    """
    from bitarray import bitarray
    import hashlib
    
    # Parameters for Bloom filter
    m = 1 << 20  # Size of bit array (1 million bits)
    k = 7  # Number of hash functions
    
    def bloom_hash(key, seed):
        """Hash function for Bloom filter"""
        key_str = str(key) + str(seed)
        hash_val = int(hashlib.md5(key_str.encode()).hexdigest(), 16)
        return hash_val % m
    
    # Build Bloom filter and hash table
    start_build = time.time()
    
    # Create Bloom filter
    bloom = bitarray(m)
    bloom.setall(0)
    
    # Add right keys to Bloom filter
    right_keys = set()
    for record in right_data:
        key = record[right_key]
        right_keys.add(key)
        for i in range(k):
            bloom[bloom_hash(key, i)] = 1
    
    # Build hash table for right table
    hash_table = {}
    for record in right_data:
        key = record[right_key]
        if key in hash_table:
            hash_table[key].append(record)
        else:
            hash_table[key] = [record]
    
    build_time = time.time() - start_build
    print(f"Bloom filter + hash table build time: {build_time:.6f} seconds")
    
    # Perform the join using Bloom filter for initial filtering
    start_join = time.time()
    result = []
    bloom_hits = 0
    actual_matches = 0
    
    for left_record in left_data:
        left_val = left_record[left_key]
        
        # Check Bloom filter first
        is_potential_match = True
        for i in range(k):
            if not bloom[bloom_hash(left_val, i)]:
                is_potential_match = False
                break
        
        if is_potential_match:
            bloom_hits += 1
            # Potential match, check hash table
            if left_val in hash_table:
                actual_matches += 1
                for right_record in hash_table[left_val]:
                    joined_record = {**left_record, **right_record}
                    result.append(joined_record)
    
    join_time = time.time() - start_join
    print(f"Bloom filter join time: {join_time:.6f} seconds")
    print(f"Total Bloom filter join time: {build_time + join_time:.6f} seconds")
    print(f"Bloom filter hits: {bloom_hits}, Actual matches: {actual_matches}, False positives: {bloom_hits - actual_matches}")
    
    return result


def generate_test_data(size_left: int, size_right: int, 
                      key_range: int, seed: int = 42) -> Tuple[List[Dict], List[Dict]]:
    """
    Generate test data for join operations
    """
    random.seed(seed)
    np.random.seed(seed)
    
    left_data = []
    right_data = []
    
    # Generate left table
    for i in range(size_left):
        record = {
            'id': i,
            'join_key': random.randint(1, key_range),
            'value_left': f"left_value_{i}"
        }
        left_data.append(record)
    
    # Generate right table
    for i in range(size_right):
        record = {
            'id': i,
            'join_key': random.randint(1, key_range),
            'value_right': f"right_value_{i}"
        }
        right_data.append(record)
    
    return left_data, right_data


def test_advanced_joins():
    """
    Test advanced join algorithms with various data sizes and distributions
    """
    print("=== Medium Dataset Test ===")
    left_data, right_data = generate_test_data(10000, 10000, 5000)
    
    hybrid_result = hybrid_skip_hash_join(left_data, right_data, 'join_key', 'join_key')
    mt_hash_result = multi_threaded_hash_join(left_data, right_data, 'join_key', 'join_key')
    try:
        bloom_result = bloom_filter_hash_join(left_data, right_data, 'join_key', 'join_key')
        print(f"Bloom filter join result size: {len(bloom_result)}")
    except ImportError:
        print("Bloom filter join requires the 'bitarray' package. Skipping this test.")
    
    print(f"Hybrid join result size: {len(hybrid_result)}")
    print(f"Multi-threaded hash join result size: {len(mt_hash_result)}")
    
    print("\n=== Large Dataset Test ===")
    left_data, right_data = generate_test_data(100000, 100000, 50000)
    
    hybrid_result = hybrid_skip_hash_join(left_data, right_data, 'join_key', 'join_key')
    mt_hash_result = multi_threaded_hash_join(left_data, right_data, 'join_key', 'join_key')
    try:
        bloom_result = bloom_filter_hash_join(left_data, right_data, 'join_key', 'join_key')
        print(f"Bloom filter join result size: {len(bloom_result)}")
    except ImportError:
        print("Bloom filter join requires the 'bitarray' package. Skipping this test.")
    
    print(f"Hybrid join result size: {len(hybrid_result)}")
    print(f"Multi-threaded hash join result size: {len(mt_hash_result)}")
    
    # Test with very skewed data distribution
    print("\n=== Highly Skewed Data Distribution Test ===")
    
    # Create skewed data with 90% of keys in a tiny range
    left_skewed = []
    right_skewed = []
    
    for i in range(10000):
        skewed_key = random.randint(1, 10) if random.random() < 0.9 else random.randint(11, 5000)
        left_record = {
            'id': i,
            'join_key': skewed_key,
            'value_left': f"left_value_{i}"
        }
        left_skewed.append(left_record)
        
        skewed_key = random.randint(1, 10) if random.random() < 0.9 else random.randint(11, 5000)
        right_record = {
            'id': i,
            'join_key': skewed_key,
            'value_right': f"right_value_{i}"
        }
        right_skewed.append(right_record)
    
    hybrid_result = hybrid_skip_hash_join(left_skewed, right_skewed, 'join_key', 'join_key')
    mt_hash_result = multi_threaded_hash_join(left_skewed, right_skewed, 'join_key', 'join_key')
    try:
        bloom_result = bloom_filter_hash_join(left_skewed, right_skewed, 'join_key', 'join_key')
        print(f"Bloom filter join result size: {len(bloom_result)}")
    except ImportError:
        print("Bloom filter join requires the 'bitarray' package. Skipping this test.")
    
    print(f"Hybrid join result size: {len(hybrid_result)}")
    print(f"Multi-threaded hash join result size: {len(mt_hash_result)}")


if __name__ == "__main__":
    test_advanced_joins()

=== Medium Dataset Test ===
Hybrid index build time: 0.051599 seconds
Hybrid join time: 0.016645 seconds
Total hybrid join time: 0.068244 seconds
MT Hash table build time: 0.001468 seconds
MT Hash join time: 0.006358 seconds
Total MT hash join time: 0.007826 seconds
Bloom filter + hash table build time: 0.185776 seconds
Bloom filter join time: 0.082973 seconds
Total Bloom filter join time: 0.268749 seconds
Bloom filter hits: 8712, Actual matches: 8712, False positives: 0
Bloom filter join result size: 20346
Hybrid join result size: 20346
Multi-threaded hash join result size: 20346

=== Large Dataset Test ===
Hybrid index build time: 0.307271 seconds
Hybrid join time: 0.332649 seconds
Total hybrid join time: 0.639920 seconds
MT Hash table build time: 0.017525 seconds
MT Hash join time: 0.118315 seconds
Total MT hash join time: 0.135840 seconds
Bloom filter + hash table build time: 0.852079 seconds
Bloom filter join time: 1.007549 seconds
Total Bloom filter join time: 1.859628 seconds
Bl