# Set up the environment

## Import packages

In [1]:
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Optional, List, Tuple
from itertools import count
import heapq
import numpy as np

## Helping functions

In [2]:
def print_tree(node, prefix="", is_last=True):
    """Print tree structure as text"""
    if node is None:
        return
    
    # Print current node
    connector = "└── " if is_last else "├── "
    if node.symbol:
        print(f"{prefix}{connector}{node.symbol} ({node.weight})")
    else:
        print(f"{prefix}{connector}* ({node.weight})")
    
    # Update prefix for children
    new_prefix = prefix + ("    " if is_last else "│   ")
    
    # Print children
    if node.left or node.right:
        if node.left:
            print_tree(node.left, new_prefix, not bool(node.right))
        if node.right:
            print_tree(node.right, new_prefix, True)

# Introduction

## Motivation
This notebook is inpired by information theory [lectures by David McKay](https://www.youtube.com/playlist?list=PLN3p8NUNcClDu1hc2m5cVp8FOEmuF3vRy). I want to implement [Huffman coding algorithm](https://en.wikipedia.org/wiki/Huffman_coding) for data compression and use it to compress [human genome](https://en.wikipedia.org/wiki/Human_genome) sequence. Why doing it? Well, I'm greately facinated by [Arithmetic coding](https://en.wikipedia.org/wiki/Arithmetic_coding) algorithm for data compression and Huffman's algorithm would be great as a beseline to compare to

## Brief overview of related concepts

### Symbol coding problem

In symbol coding problem we have a set of symbols $S = \{s_1, s_2, \ldots, s_n\}$ with probabilities $P = \{p_1, p_2, \ldots, p_n\}$ and we want to encode them using binary strings (codewords) $C = \{c_1, c_2, \ldots, c_n\}$ such that the expected codeword length is minimized. The expected codeword length $L$ is given by:

$$
L = \sum_{i=1}^{n} p_i \cdot l(c_i), \tag{1}
$$

where $l(c_i)$ is the length of codeword $c_i$. Let's look at an example to illustrate this.

#### Example 1
Let's say we have a set of symbols $S = \{A, B, C, D\}$ with probabilities $P = \{0.5, 0.25, 0.125, 0.125\}$. We want to find the optimal codewords $C$ that minimize the expected codeword length.

a) One possible solution is to assign the following codewords:
- A: 1000
- B: 0100
- C: 0010
- D: 0001

The expected codeword length $L$ is calculated as follows:
$$
L = \sum_{i=1}^{n} p_i \cdot l(c_i) = 0.5 \cdot 4 + 0.25 \cdot 4 + 0.125 \cdot 4 + 0.125 \cdot 4 = 4
$$

b) Another possible solution is to assign the following codewords:
- A: 1
- B: 01
- C: 001
- D: 0001

The expected codeword length $L$ is calculated as follows:
$$
L = \sum_{i=1}^{n} p_i \cdot l(c_i) = 0.5 \cdot 1 + 0.25 \cdot 2 + 0.125 \cdot 3 + 0.125 \cdot 4 = 1.875
$$

c) Third possible solution is to assign the following codewords:
- A: 1
- B: 00
- C: 010
- D: 10

The expected codeword length $L$ is calculated as follows:
$$
L = \sum_{i=1}^{n} p_i \cdot l(c_i) = 0.5 \cdot 1 + 0.25 \cdot 2 + 0.125 \cdot 3 + 0.125 \cdot 4 = 1.625
$$


Another thing that is important to mention is that we want our coding to have several useful properties:
- **Uniquely decodable**: for any string $x$ and $y$ such that $x \neq y$ codewords $C(x)$ and $C(y)$ must be different $C(x) \neq C(y)$. In plain english this means that we can always decode a string of codewords back to the original symbols without ambiguity. In this light, solution (c) in example 1 is not uniquely decodable because the both string `DC` and `ABD` are encoded as $C(DC)=C(ABD) = 10010$.
- **Minimal expected codeword length**: we want to minimize the expected codeword length $L$.

**Note:** [ASCII](https://en.wikipedia.org/wiki/ASCII) code is another interesting example of symbol coding.

### Source coding theorem

A question arises: what is the minimum expected codeword length $L$ that we can achieve? The answer is given by [Shannon's source coding theorem](https://en.wikipedia.org/wiki/Shannon%27s_source_coding_theorem) which states that the minimum expected codeword length $L$ is bounded by the entropy $H$ of the source:

$$
L \geq H, \tag{2}
$$

where the entropy $H$ is defined as:

$$
H = -\sum_{i=1}^{n} p_i \log_2 p_i, \tag{3}
$$

This means that no lossless compression scheme can achieve an expected codeword length less than the entropy of the source.

By comparing equations (1) and (2) we can see that equality $L = H$ holds when codeword length is equal to $l(c_i) = -\log_2 p_i$ for all symbols $s_i$. However, this is not always possible because codeword lengths must be integers. Therefore, we can only achieve $L$ that is close to $H$.

### Huffman Coding

[Huffman coding](https://en.wikipedia.org/wiki/Huffman_coding) is a popular algorithm used for lossless data compression. The basic idea is to assign variable-length codes to input characters, with shorter codes assigned to more frequent characters. This results in a prefix-free binary code, meaning no code is a prefix of any other, which allows for efficient encoding and decoding.

The algorithm works by building a binary tree where each leaf node represents a symbol and its probability/frequency. The tree is constructed by repeatedly merging the two nodes with the lowest probabilities until only one node remains, which becomes the root of the tree. The code for each symbol is then determined by traversing the tree from the root to the leaf, assigning a '0' for a left branch and a '1' for a right branch.

#### Example 2

Consider the "alphabet" from previous example, where symbol probabilities are

```text
A: 0.5
B: 0.25
C: 0.125
D: 0.125
```

Then the binary tree for this alphabet would look like this:

```text
        *
       / \
      /   \
     *     A(0.5)
    / \
   /   \
B(0.25) *
       / \
      /   \
  C(0.125) D(0.125)
```

The algorithm assigns codes by traversing from root to leaf: left=0, right=1. So the resulting code for each symbol would be:
"
```text
A: 1
B: 01
C: 001
D: 000
```

An average code length $L$ can be calculated as follows:

$$
L = \sum_{i=1}^4 p_i \cdot l_i = \frac{1}{2} \cdot 1 + \frac{1}{4} \cdot 2 + \frac{1}{8} \cdot 3 + \frac{1}{8} \cdot 3  = 1.75
$$

For comparison, entropy $H$ can be calculated as follows:

$$
H = - \sum_{i=1}^4 p_i \cdot \log_2p_i = \frac{1}{2} \log_22 + \frac{1}{4} \log_24 + \frac{1}{8} \log_28 + \frac{1}{8} \log_28 = 1.75
$$

As we can see, the average code length $L$ is equal to the entropy $H$ for this particular example. This is a special case and may not hold for all probability distributions, but it illustrates the relationship between Huffman coding and entropy.


### Heap

A heap is a specialized tree-based data structure that satisfies the heap property. In a min-heap, for example, the key at a parent node is always less than or equal to the keys of its children, and the smallest key is at the root. This property makes heaps useful for implementing priority queues, where the highest (or lowest) priority element can be efficiently accessed.

#### Operation cost

The time complexity for the main operations on a heap is as follows:

- Insertion: O(log n)
- Deletion (removing the root): O(log n)
- Accessing the minimum element: O(1)

#### Example 3

Let's use a min-heap to illustrate the heap property. Consider the following sequence of numbers: 5, 3, 8, 1, 4. We can insert these numbers into a min-heap as follows:
```text
1. Insert 5:       5
2. Insert 3:       3
                  /
                 5
3. Insert 8:       3
                  / \
                 5   8
4. Insert 1:       1
                  / \
                 3   8
                /
               5
5. Insert 4:       1
                  / \
                 3   4
                / \
               5   8
6. Remove 1:       3
                  / \
                 5   4
                /
               8
```
In this min-heap, the smallest element (1) is at the root, and the heap property is maintained at each level.

Let's use `heapq` in Python to demonstrate heap operations.

In [3]:
import heapq

# Create a min-heap
heap = []

# Insert elements into the heap
heapq.heappush(heap, 5)
heapq.heappush(heap, 3)
heapq.heappush(heap, 8)
heapq.heappush(heap, 1)
heapq.heappush(heap, 4)

# Display the heap
print("Heap:", heap)

# Remove the smallest element
smallest = heapq.heappop(heap)
print("Removed smallest element:", smallest)
print("Heap after removal:", heap)

Heap: [1, 3, 8, 5, 4]
Removed smallest element: 1
Heap after removal: [3, 4, 8, 5]


# Let's play around

## Huffman coding algorithm

### Description 

Huffman coding algorithm follows these steps:

1. Build a priority queue (min-heap) of nodes, where each node represents a symbol and its frequency.
2. While there is more than one node in the queue:
   a. Remove the two nodes of lowest frequency from the queue.
   b. Create a new internal node with these two nodes as children and a frequency equal to the sum of their frequencies.
   c. Insert the new node back into the queue.
3. The remaining node is the root of the Huffman tree.
4. Traverse the tree to generate the Huffman codes for each symbol.


### Implement a binary-tree class

Binary tree is an essential data structure for this algorithm. It is a hierarchical structure where each node has at most two children. In the context of Huffman coding, each leaf node represents a symbol and its probability/frequency, while internal nodes represent the combined frequency of their child nodes.

In [4]:
@dataclass
class Node:
    weight: float                   # weight of the node (frequency or probability)
    symbol: Optional[str] = None    # symbol for leaf nodes
    left: Optional["Node"] = None   # left child
    right: Optional["Node"] = None  # right child

Let's build a huffman tree from example above


```text
        *
       / \
      /   \
     *     A(0.5)
    / \
   /   \
B(0.25) *
       / \
      /   \
  C(0.125) D(0.125)
```

In [5]:
# Create leaf nodes
node_A = Node(weight=0.5, symbol="A")
node_B = Node(weight=0.25, symbol="B") 
node_C = Node(weight=0.125, symbol="C")
node_D = Node(weight=0.125, symbol="D")

# Create internal nodes (bottom-up)
# Right internal node: combines C and D
right_internal = Node(weight=0.25, left=node_C, right=node_D)

# Left internal node: combines B and the right internal node  
left_internal = Node(weight=0.5, left=node_B, right=right_internal)

# Root node: combines left internal node and A
root = Node(weight=1.0, left=left_internal, right=node_A)

# Visualise
print("Tree structure:")
print_tree(root)

Tree structure:
└── * (1.0)
    ├── * (0.5)
    │   ├── B (0.25)
    │   └── * (0.25)
    │       ├── C (0.125)
    │       └── D (0.125)
    └── A (0.5)


### Build a Huffman Tree

To efficiently manage the nodes, we'll use a priority queue (min-heap) to store and retrieve the nodes based on their frequencies. This allows us to quickly access the least frequent nodes when building the tree. The algorithm works by repeatedly merging the two least frequent nodes until only one node remains, which becomes the root of the Huffman tree. Schematic representation of the process:

#### Example 4

Let's use the following symbols and their frequencies:

- A: 0.5
- B: 0.25
- C: 0.125
- D: 0.125

1. Create all leaf nodes for our future tree
    ```python
    Node(A, 0.5)
    Node(B, 0.25)
    Node(C, 0.125)
    Node(D, 0.125)
    ```
2. Create a priority queue (min-heap) and insert all leaf nodes:
    ```python
    min_heap = [Node(A, 0.5), Node(B, 0.25), Node(C, 0.125), Node(D, 0.125)]
    ```

3. Extract the two least frequent nodes:
    ```python
    left = extract_min(min_heap) # Node(C, 0.125)
    right = extract_min(min_heap) # Node(D, 0.125)
    ```

4. Create a new internal node with these two nodes as children and their combined frequency:
    ```python
    new_node = Node(None, left.frequency + right.frequency, left, right)
    ```

5. Insert the new node back into the priority queue:
    ```python
    insert(min_heap, new_node) # [Node(B, 0.25), Node(A, 0.5), Node(None, 0.25, Node(C, 0.125), Node(D, 0.125))]
    ```

6. Repeat steps 3-5 until only one node remains in the priority queue. This node will be the root of the Huffman tree.

In [6]:
def build_huffman_tree(weights: Dict[str, float]) -> Optional[Node]:
    """
    Build a Huffman tree from the given symbol weights
    """
    # Check that weights are > 0
    items = [(float(w), str(s)) for s, w in weights.items() if float(w) > 0.0]
    if not items:
        return None

    # Sort items by weight to make code deterministic (important for decoding)
    items.sort(key=lambda x: (x[0], x[1]))

    # Build the tree using a min-heap
    uid = count()                                   # unique integer 0,1,2,...
    heap = [(w, next(uid), Node(weight=w, symbol=s))   # (weight, tie_id, tree)
            for (w, s) in items]
    heapq.heapify(heap)

    # Handle single-symbol edge case
    if len(heap) == 1:
        w, _, leaf = heapq.heappop(heap)
        return Node(weight=w, left=leaf)

    # Build the tree
    while len(heap) > 1:
        w1, _, n1 = heapq.heappop(heap)             # smallest
        w2, _, n2 = heapq.heappop(heap)             # 2nd smallest
        parent = Node(weight=w1 + w2, left=n1, right=n2)
        heapq.heappush(heap, (parent.weight, next(uid), parent))

    return heap[0][2]

Now we can apply the algorithm to build the Huffman tree and generate the codes for each symbol

In [7]:
probabilities = {
    'A': 0.5,
    'B': 0.25,
    'C': 0.125,
    'D': 0.125
}

root = build_huffman_tree(probabilities)
print("Huffman Tree structure:")
print_tree(root)

Huffman Tree structure:
└── * (1.0)
    ├── A (0.5)
    └── * (0.5)
        ├── B (0.25)
        └── * (0.25)
            ├── C (0.125)
            └── D (0.125)


### Derive codes

To derive codes from Huffman tree we will use a recursive traversal of the tree. Starting from the root, we will traverse to the left child by appending '0' to the current code and to the right child by appending '1'. When we reach a leaf node, we will record the symbol and its corresponding code. This is essentially a depth-first search (DFS) of the tree.

Schematic representation of the process:

1. Start at the root with an empty code:
   ```python
   current_code = ""
   ```

2. Traverse the tree:
   - If the current node is a leaf, record the symbol and its code:
     ```python
     if node is leaf:
         codes[node.symbol] = current_code
     ```

   - If the current node is not a leaf, traverse left and right:
     ```python
     traverse(node.left, current_code + "0")
     traverse(node.right, current_code + "1")
     ```

The final codes dictionary will contain the Huffman codes for all symbols.

In [8]:
def codes_from_tree(root: Optional[Node]) -> Dict[str, str]:
    """
    Derive binary codes from the Huffman tree
    """
    # Return empty dictionary if root is None
    if root is None:
        return {}

    # Create a dictionary to hold the codes
    codes: Dict[str, str] = {}

    # Depth-first search (DFS) to traverse the tree
    def dfs(n: Node, path: str):
        if n.symbol is not None:                 # leaf node found
            codes[n.symbol] = path or "0"        # assign code
            return
        if n.left:  dfs(n.left,  path + "0") # go left, add "0"
        if n.right: dfs(n.right, path + "1") # go right, add "1"

    # Start DFS traversal from the root
    dfs(root, "")
    return codes

Now we can apply this algorithm to generate the Huffman codes for each symbol from the constructed Huffman tree

In [9]:
codes = codes_from_tree(root)
print("Huffman Codes:", codes)

Huffman Codes: {'A': '0', 'B': '10', 'C': '110', 'D': '111'}


As you can see the code is very similar to the one we got in [Exercise 2](#example-2)

### Compare entropy and expected length

Although we have already compared entropy and expected length in Exercise 2. Let's create functions to compute both values for future examples.

In [10]:
def entropy(probabilities: Dict[str, float]) -> float:
    """
    Calculate the entropy of a set of symbol probabilities
    """
    p = np.array(list(probabilities.values()))
    return -np.sum(p * np.log2(p, where=(p > 0)))

def expected_length(coding: Dict[str, str], probabilities: Dict[str, float]) -> float:
    """
    Calculate the expected length of a set of symbol codes
    """
    keys = coding.keys()
    p = np.array([probabilities.get(symbol, 0) for symbol in keys])
    length = np.array([len(coding[symbol]) for symbol in keys])
    return np.sum(length * p)

Now let's print the summary for our example

In [11]:
print("Symbol Probabilities:", probabilities)
print("Huffman Codes:", codes)
print("Huffman tree from weights:")
print_tree(root)
print("Entropy:", entropy(probabilities))
print("Expected Length:", expected_length(codes, probabilities))

Symbol Probabilities: {'A': 0.5, 'B': 0.25, 'C': 0.125, 'D': 0.125}
Huffman Codes: {'A': '0', 'B': '10', 'C': '110', 'D': '111'}
Huffman tree from weights:
└── * (1.0)
    ├── A (0.5)
    └── * (0.5)
        ├── B (0.25)
        └── * (0.25)
            ├── C (0.125)
            └── D (0.125)
Entropy: 1.75
Expected Length: 1.75


### Convert a string of charachters to bytes

Let's create a function to convert a string to bytes and compare the size of our byte representation to ASCII and UTF-8 encodings.


In [12]:
def bits2bytes(bits: str):
    """
    Convert a string of bits to bytes
    """
    # Pad the bit string to make its length a multiple of 8
    paddlen = (8 - len(bits) % 8) % 8
    padded_bits = bits + '0' * paddlen

    # Convert each group of 8 bits to a byte
    byte_array = bytearray()
    for i in range(0, len(padded_bits), 8):
        byte = padded_bits[i:i + 8]
        byte_array.append(int(byte, 2))
    return bytes(byte_array)

Now let's generate a string of characters from our distribution and generate a bit and byte string representations:

```text
A: 0.5
B: 0.25
C: 0.125
D: 0.125
```

In [13]:
# fix random seed
np.random.seed(4)

# generate string
n = 1000
char_list = list(probabilities.keys())
prob_list = list(probabilities.values())
string = "".join(np.random.choice(char_list, p=prob_list, size=n))
print(f"Generated string: {string[:50]}...")

# calculate frequency
freq = {char: string.count(char) for char in char_list}
print("Character counts:", freq)
print("Character frequencies:", {char: freq[char] / n for char in char_list})

Generated string: DBDBBADAAACACDABAAADADCCAABABABDBAABBBABABCAABBABB...
Character counts: {'A': 481, 'B': 264, 'C': 120, 'D': 135}
Character frequencies: {'A': 0.481, 'B': 0.264, 'C': 0.12, 'D': 0.135}


In [14]:
# encode the string using different encoding schemes
huffman_bits = "".join([codes.get(char) for char in string])
huffman_bytes = bits2bytes(huffman_bits)
utf8_bytes = string.encode('utf-8')
utf16_bytes = string.encode('utf-16')
ascii_bytes = string.encode('ascii')

# print sizes
print("Huffman encoding size (bits):", len(huffman_bits))
print("Huffman encoding size (bytes):", len(huffman_bytes))
print("UTF-8 encoding size (bytes):", len(utf8_bytes))
print("UTF-16 encoding size (bytes):", len(utf16_bytes))
print("ASCII encoding size (bytes):", len(ascii_bytes))

Huffman encoding size (bits): 1774
Huffman encoding size (bytes): 222
UTF-8 encoding size (bytes): 1000
UTF-16 encoding size (bytes): 2002
ASCII encoding size (bytes): 1000
