<a href="https://colab.research.google.com/github/Yahia-kilany/Algorithm-Pitch-Huffman-vs-LZW/blob/main/Algorithmic_pitch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import time
import random
import string
import matplotlib.pyplot as plt
import pandas as pd
import heapq
from collections import defaultdict
import math

In [None]:
class HuffmanNode:
    def __init__(self, char, freq):
        self.char = char
        self.freq = freq
        self.left = None
        self.right = None
    def __lt__(self, other):
        return self.freq < other.freq

def build_frequency_table(text):
    freq = defaultdict(int)
    for c in text:
        freq[c] += 1
    return freq

def build_huffman_tree(freq):
    heap = []
    for char, count in freq.items():
        heapq.heappush(heap, HuffmanNode(char, count))
    if len(heap) == 1:
        node = heapq.heappop(heap)
        root = HuffmanNode(None, node.freq)
        root.left = node
        return root
    while len(heap) > 1:
        left = heapq.heappop(heap)
        right = heapq.heappop(heap)
        merged = HuffmanNode(None, left.freq + right.freq)
        merged.left = left
        merged.right = right
        heapq.heappush(heap, merged)
    return heapq.heappop(heap)

def generate_codes(root):
    code_map = {}
    def traverse(node, current):
        if not node:
            return
        if node.char is not None:
            code_map[node.char] = current
        traverse(node.left, current + "0")
        traverse(node.right, current + "1")
    traverse(root, "")
    return code_map

def huffman_encode(text):
    freq = build_frequency_table(text)
    tree = build_huffman_tree(freq)
    codes = generate_codes(tree)
    encoded = "".join(codes[c] for c in text)
    return encoded, tree

def huffman_decode(encoded, root):
    result = []
    node = root
    for bit in encoded:
        node = node.left if bit == "0" else node.right
        if node.char:
            result.append(node.char)
            node = root
    return "".join(result)

def test_huffman():
    test_cases = {
        "small": "hello",
        "repetitive": "aaaaaaabaaaaaaabaaaaaaab",
        "random": "the quick brown fox jumps over the lazy dog",
        "numbers": "123123123123123123",
        "special_chars": "!@#!@#!@#!@#",
    }

    for name, text in test_cases.items():
        print(f"Testing: {name} (len={len(text)})")
        compressed, tree = huffman_encode(text)  # unpack tuple
        decompressed = huffman_decode(compressed, tree)  # pass encoded and tree

        print("Original:", text)
        print("Compressed:", compressed)
        print("Decompressed:", decompressed)
        print("Test Passed:", decompressed == text)
        print("-" * 50)

# Call the test function
test_huffman()



Testing: small (len=5)
Original: hello
Compressed: 0001111110
Decompressed: hello
Test Passed: True
--------------------------------------------------
Testing: repetitive (len=24)
Original: aaaaaaabaaaaaaabaaaaaaab
Compressed: 111111101111111011111110
Decompressed: aaaaaaabaaaaaaabaaaaaaab
Test Passed: True
--------------------------------------------------
Testing: random (len=43)
Original: the quick brown fox jumps over the lazy dog
Compressed: 111101110010100010010100010011011001100110001101110110101101011011100010111101010110000101101100001110011111111110001011001010101101100111101110010100011111011101111101011000100110000010110100
Decompressed: the quick brown fox jumps over the lazy dog
Test Passed: True
--------------------------------------------------
Testing: numbers (len=18)
Original: 123123123123123123
Compressed: 101101011010110101101011010110
Decompressed: 123123123123123123
Test Passed: True
--------------------------------------------------
Testing: special_chars (len=1

In [None]:
def lzw_compress(text, max_dict_size=4096):
    dictionary = {chr(i): i for i in range(256)}
    next_code = 256
    s = ""
    output = []

    for c in text:
        if s + c in dictionary:
            s = s + c
        else:
            output.append(dictionary[s])
            # Only add to dictionary if we haven't reached the limit
            if next_code < max_dict_size:
                dictionary[s + c] = next_code
                next_code += 1
            s = c

    if s:
        output.append(dictionary[s])
    return output

def lzw_decompress(encoded, max_dict_size=4096):
    dictionary = {i: chr(i) for i in range(256)}
    next_code = 256
    result = []
    prev = encoded[0]
    result.append(dictionary[prev])

    for code in encoded[1:]:
        if code in dictionary:
            entry = dictionary[code]
        else:
            entry = dictionary[prev] + dictionary[prev][0]
        result.append(entry)

        # Only add to dictionary if we haven't reached the limit
        if next_code < max_dict_size:
            dictionary[next_code] = dictionary[prev] + entry[0]
            next_code += 1
        prev = code

    return "".join(result)
def test_lzw():
    test_cases = {
        "small": "hello",
        "repetitive": "aaaaaaabaaaaaaabaaaaaaab",
        "random": "the quick brown fox jumps over the lazy dog",
        "numbers": "123123123123123123",
        "special_chars": "!@#!@#!@#!@#",
    }

    for name, text in test_cases.items():
        print(f"Testing: {name} (len={len(text)})")
        compressed = lzw_compress(text)
        decompressed = lzw_decompress(compressed)

        print("Original:", text)
        print("Compressed:", compressed)
        print("Decompressed:", decompressed)
        print("Test Passed:", decompressed == text)
        print("-" * 50)

# Call the test function
test_lzw()


Testing: small (len=5)
Original: hello
Compressed: [104, 101, 108, 108, 111]
Decompressed: hello
Test Passed: True
--------------------------------------------------
Testing: repetitive (len=24)
Original: aaaaaaabaaaaaaabaaaaaaab
Compressed: [97, 256, 257, 97, 98, 258, 257, 260, 261, 259]
Decompressed: aaaaaaabaaaaaaabaaaaaaab
Test Passed: True
--------------------------------------------------
Testing: random (len=43)
Original: the quick brown fox jumps over the lazy dog
Compressed: [116, 104, 101, 32, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 32, 106, 117, 109, 112, 115, 32, 111, 118, 101, 114, 32, 256, 258, 108, 97, 122, 121, 32, 100, 111, 103]
Decompressed: the quick brown fox jumps over the lazy dog
Test Passed: True
--------------------------------------------------
Testing: numbers (len=18)
Original: 123123123123123123
Compressed: [49, 50, 51, 256, 258, 257, 259, 262, 257]
Decompressed: 123123123123123123
Test Passed: True
---------------------------

In [None]:
def calculate_huffman_tree_size(codes):
    """
    Calculate the size needed to store the Huffman tree/codebook.
    We store it as: (character, code_length) pairs.
    - Each character: 8 bits
    - Code length: ceil(log2(max_code_length + 1)) bits per entry
    - Plus we need to store the number of entries
    """
    if not codes:
        return 0

    num_chars = len(codes)
    max_code_len = max(len(code) for code in codes.values())

    # Bits to store number of unique characters (up to 256 for ASCII)
    header_bits = 8

    # Bits per character entry: 8 bits for char + bits for code length
    bits_for_length = math.ceil(math.log2(max_code_len + 1))
    bits_per_entry = 8 + bits_for_length

    total_bits = header_bits + (num_chars * bits_per_entry)

    return total_bits

def calculate_lzw_bit_size(compressed, max_dict_size=4096):
    """
    Calculate actual bit size needed to store LZW output.
    With a fixed maximum dictionary size, we can use fixed-width codes.
    """
    if not compressed:
        return 0
    # Use fixed width based on max dictionary size for consistent encoding
    bits_per_code = math.ceil(math.log2(max_dict_size))
    return len(compressed) * bits_per_code

def generate_tests(num_variants=5):
    """Generate test cases with multiple random variants and increasing sizes"""
    tests = {}

    # Define target sizes in characters (all test cases will have same size)
    target_sizes = [1000, 5000, 10000, 20000, 35000, 50000, 75000, 100000, 150000, 200000][:num_variants]

    for idx, target_size in enumerate(target_sizes):
        suffix = f"_size{idx+1}"
        size_kb = round(target_size / 1024, 1)

        # 1. Structured data (HTML-like) - LZW dominates
        tags = ["div", "span", "p", "section", "article"]
        classes = ["container", "content", "wrapper", "box"]
        tag = random.choice(tags)
        cls = random.choice(classes)

        # Calculate repetitions needed to reach target size
        pattern = f"<{tag} class='{cls}'><p>Content here</p></{tag}>\n"
        reps = target_size // len(pattern)
        tests[f"structured_data{suffix}"] = pattern * reps

        # 2. Skewed frequency distribution - Huffman's theoretical strength
        tests[f"skewed_freq{suffix}"] = (
            "a" * (target_size // 2) +
            "b" * (target_size // 4) +
            "c" * (target_size // 8) +
            "".join(random.choice("defghijklmnop") for _ in range(target_size // 8))
        )

        # 3. Uniform random - worst case for both
        tests[f"uniform_random{suffix}"] = "".join(
            random.choice(string.ascii_letters + string.digits)
            for _ in range(target_size)
        )

    return tests

def benchmark_algorithms(num_variants=5, lzw_max_dict=4096, output_dir="compression_results"):
    """
    Run benchmarks and save results to CSV files.

    Args:
        num_variants: Number of size variants to generate (default: 5)
        lzw_max_dict: Maximum LZW dictionary size (default: 4096)
        output_dir: Directory to save CSV files (default: "compression_results")
    """
    import os

    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    tests = generate_tests(num_variants)
    results = []

    for name, text in tests.items():
        print(f"Testing: {name} (len={len(text)})")

        # Original size in bits (8 bits per character)
        original_bits = len(text) * 8

        # Huffman
        start = time.time()
        encoded, tree = huffman_encode(text)
        codes = generate_codes(tree)
        huffman_time = time.time() - start
        huffman_data_bits = len(encoded)
        huffman_tree_bits = calculate_huffman_tree_size(codes)
        huffman_total_bits = huffman_data_bits + huffman_tree_bits

        # LZW with limited dictionary
        start = time.time()
        compressed = lzw_compress(text, max_dict_size=lzw_max_dict)
        lzw_time = time.time() - start
        lzw_total_bits = calculate_lzw_bit_size(compressed, max_dict_size=lzw_max_dict)

        # Calculate compression ratios (using total size including overhead)
        huffman_ratio = (1 - huffman_total_bits / original_bits) * 100
        lzw_ratio = (1 - lzw_total_bits / original_bits) * 100

        results.append({
            "test": name,
            "original_bits": original_bits,
            "original_KB": round(original_bits / 8192, 2),
            "huffman_data": huffman_data_bits,
            "huffman_tree": huffman_tree_bits,
            "huffman_total": huffman_total_bits,
            "huffman_KB": round(huffman_total_bits / 8192, 2),
            "lzw_total": lzw_total_bits,
            "lzw_KB": round(lzw_total_bits / 8192, 2),
            "huffman_ratio_%": round(huffman_ratio, 2),
            "lzw_ratio_%": round(lzw_ratio, 2),
            "huffman_time_ms": round(huffman_time * 1000, 3),
            "lzw_time_ms": round(lzw_time * 1000, 3)
        })

    df = pd.DataFrame(results)

    # Save overall results
    overall_csv = os.path.join(output_dir, "all_results.csv")
    df.to_csv(overall_csv, index=False)
    print(f"\n✓ Saved overall results to: {overall_csv}")

    # Save separate CSV for each test case type
    test_types = ["structured_data", "skewed_freq", "uniform_random"]
    for test_type in test_types:
        test_df = df[df['test'].str.contains(test_type)]
        if not test_df.empty:
            test_csv = os.path.join(output_dir, f"{test_type}.csv")
            test_df.to_csv(test_csv, index=False)
            print(f"✓ Saved {test_type} results to: {test_csv}")

    return df

# LZW dictionary limited to 4096 entries (12 bits per code)
# Results saved to CSV files in 'compression_results' folder
df = benchmark_algorithms(num_variants=10, lzw_max_dict=4096, output_dir="compression_results")

print("\n" + "="*80)
print("COMPRESSION COMPARISON SUMMARY")
print("="*80)
print(df.to_string(index=False))
print("\n✓ All results saved to CSV files in 'compression_results' folder")

Testing: structured_data_size1 (len=990)
Testing: skewed_freq_size1 (len=1000)
Testing: uniform_random_size1 (len=1000)
Testing: structured_data_size2 (len=4950)
Testing: skewed_freq_size2 (len=5000)
Testing: uniform_random_size2 (len=5000)
Testing: structured_data_size3 (len=9996)
Testing: skewed_freq_size3 (len=10000)
Testing: uniform_random_size3 (len=10000)
Testing: structured_data_size4 (len=19992)
Testing: skewed_freq_size4 (len=20000)
Testing: uniform_random_size4 (len=20000)
Testing: structured_data_size5 (len=34968)
Testing: skewed_freq_size5 (len=35000)
Testing: uniform_random_size5 (len=35000)
Testing: structured_data_size6 (len=49995)
Testing: skewed_freq_size6 (len=50000)
Testing: uniform_random_size6 (len=50000)
Testing: structured_data_size7 (len=74970)
Testing: skewed_freq_size7 (len=75000)
Testing: uniform_random_size7 (len=75000)
Testing: structured_data_size8 (len=99975)
Testing: skewed_freq_size8 (len=100000)
Testing: uniform_random_size8 (len=100000)
Testing: struc