# Tutorial 2
## Shannon codes

> Arrange the messages of length N in order of decreasing probability and suppose their probabilities are
$p_1 \ge p_2 \ge  ... p_n$. Let $P_s = \sum_1^{s-1} p_i$; that is $P_s$ is the cumulative probability up to, but not including, $p_s$.
We first encode into a binary system. The binary code for message s is obtained by expanding $P_s$ as a binary number. The expansion is carried out to $m_s$ places, where $m_s$ is the integer satisfying:
$$\log_2 \frac{1}{p_s} \le m_s \le 1 + \log_2 \frac{1}{p_s}$$
Thus the messages of high probability are represented by short codes and those of low probability by long codes.

(See the proof to Theorem 9 in the [paper](https://archive.org/details/ost-engineering-shannon1948))


In [1]:
from typing import Any, List, Dict
from collections import Counter, OrderedDict
from math import log2, ceil
from numpy import cumsum

In [2]:
alphabet = "abcdefghijklmnopqrstuvwxyz0123456789"

In [3]:
def infinite_shannon() -> str:

    text = ''
    with open('shannon1948.txt', 'r') as f:
        text = f.read()
    text = text.lower()
    text = [ t for t in text if t in alphabet ]
    return text

text = infinite_shannon()

### 1. Cumulative probability

In [4]:
def get_freqs(text: str) -> Dict[chr, int]:
    counts = dict(Counter(text))
    total  = sum([ c for _,c in counts.items() ])
    freqs  = { x: c/total for x,c in counts.items() }
    
    return freqs
    
def sort_dict(d):
    return OrderedDict(sorted(d.items(), 
                              key = lambda kv: kv[1],
                              reverse = True))
                       
def cum_prob(freqs: Dict[chr, int]) -> Dict[chr, float]:
    p = sort_dict(freqs)
    f = [0] + list(cumsum(list(p.values())[:-1]))
    
    return OrderedDict(zip(p.keys(), f))


### 2. Binary encoding

In [5]:
def __bin_tests(get_binary):
    decs = [ 1/2, 3/4, 1/4, 1/8, 5/8, 7/8, 15/16 ]
    bins = [ '1000', '1100', '0100', '0010', '1010', '1110', '1111' ]
    return all([ get_binary(d, 4) == b for d,b in zip(decs, bins) ])

In [6]:
def get_binary(f: float, L: int) -> str:
    bs = []
    prob = f
    for i in range(L):
        prob = prob * 2
        bs.append(int(prob))
        prob = prob - int(prob)
    return ''.join(str(b) for b in bs)

def get_shannon_codeword(p: float, f: float) -> str:
    l = ceil(log2(1.0/p))
    return get_binary(f, l)

def shannon_code_dict(freqs: Dict[chr, int]) -> Dict[chr, str]:
    f = cum_prob(freqs)
    
    c = { k: get_shannon_codeword(freqs[k], f[k])
          for k in freqs.keys() }
    
    return c

Now we can finally encode the message:

In [7]:
def shannon_encode(text: str) -> str:
    freqs = get_freqs(text)
    codes = shannon_code_dict(freqs)
    
    return [ codes[x] for x in text ]

In [8]:
def encode(text: str, codes: Dict[chr, str]) -> str:
    
    return [ codes[x] for x in text ]

### Step 3: Binary decoding


In [9]:
def decode(coded: str, codes: Dict[chr, str]) -> str:
    inv_code = {v: k for k, v in codes.items()}
    return [ inv_code[x] for x in coded ]

### Examples 

You may want to test your implementation first. Below is an example from Shannon's paper (Section 10 _Discussion and Examples_, p18):

In [10]:
def dictify(keys, values):
    return dict(zip(keys, values))

In [46]:
def example(your_code_fn):
    alph = [ 'a', 'b', 'c', 'd' ]
    prob = [ 1/2, 1/4, 1/8, 1/8 ]
    code = [ '0', '10', '110', '111' ] 
    
    ps = dict(zip(alph, prob))
    cs = your_code_fn(ps)
    
    return all([ c == cs[k] for c, k in zip(code, alph) ])

In [47]:
example(shannon_code_dict)

True

### Testing your code
A few sanity checks... you want to check all codewords are different from each other and none is the prefix of another.

In [57]:
def __unique(codes):
    
    return sorted(codes) == sorted(set(codes))

def __prefix_free(codes):
    
    for c in codes:
        if any([d.startswith(c) for d in codes
                                if c != d]):
            return False
        
    return True

def __entropy(freqs, codes):
    
    H = sum( -p * log2(p) for p in freqs.values() )
    print(f"Entropy of the distribution is: {H}")
    L = sum( freqs[k]*len(codes[k]) for k in freqs.keys())
    print(f"Mean code length: {L}")
    
    return H < L

Now try it out on Shannon's paper!

In [58]:
ps = get_freqs(text)
fs = cum_prob(ps)
cs = shannon_code_dict(ps)

In [59]:
probs = get_freqs(text)
codes = shannon_code_dict(probs)
coded = encode(text, codes)

text2 = decode(coded, codes)

In [60]:
all([__prefix_free(codes),
     __unique(codes),
     __entropy(ps, codes)])

Entropy of the distribution is: 4.26702362984071
Mean code length: 4.755448369685245


True

In [61]:
text == text2

True

## Huffman code

### Step 1: build tree


In [62]:
class Node():
    def __init__(self, l: 'Node' = None, r: 'Node' = None,
                       p: 'Node '= None, lab: str = ''):
        self.l = l
        self.r = r
        self.p = p
        self.lab = lab

    def children(self):
        return self.l, self.r
    
    def parent(self):
        return self.p
    
    def __str__(self):
        if self.lab:
            return self.lab
        else:
            return f"(L: {str(self.l)}, R: {str(self.r)})"

In [63]:
def build_tree(freqs: Dict[chr, float]) -> 'Node':
    nodes = [(Node(lab = c), p) for c, p in freqs.items()]
    nodes = sorted(nodes, key=lambda x: x[1], reverse=True)
    while len(nodes) > 1:
        (n1, p1) = nodes[-1]
        (n2, p2) = nodes[-2]
        nodes = nodes[:-2]
        node = Node(l = n1, r = n2)
        nodes.append((node, p1 + p2)) 
        nodes = sorted(nodes, key=lambda x: x[1], reverse=True)

    return nodes[0][0]

### Step 2: find code


In [64]:
def huffman_code_dict(tree: 'Node', codeword: str = '') -> Dict[chr, str]:
    if tree.lab:
        return {tree.lab: codeword}
    (l, r) = tree.children()
    codes = dict()
    codes.update(huffman_code_dict(l, codeword + '0'))
    codes.update(huffman_code_dict(r, codeword + '1'))
    return codes

### Examples 

You may want to test your implementation first. Here is an example:

In [73]:
def example1():
    alph = [ 'a', 'b', 'c', 'd', 'e' ]
    prob = [ 1/4, 1/4, 1/5, 3/20, 3/20 ]
    code = [ '00', '10', '11', '010', '011' ] 
    
    return dictify(alph, prob), dictify(alph, code)

And another, taken directly from Huffman's paper:

In [96]:
def example2():
    alph = [ str(i) for i in range(1, 14) ]
    prob = [ 0.2, 0.18, 0.1, 0.1, 0.1, 0.06, 0.06, 0.04, 0.04, 0.04, 0.04, 0.03, 0.01 ]
    code = [ '10', '000', '011', '110', '111', '0101', '00100', 
            '00101', '01000', '01001', '00110', '001110', '001111']
    
    return dictify(alph, prob), dictify(alph, code)

In [102]:
freq, example_code = example2()
root = build_tree(freq)
encoding = huffman_code_dict(root)

# The code is not unique, but the lengths are
all(len(encoding[k]) == len(example_code[k]) for k in encoding)

True

Encoding Shannon's paper

In [103]:
freqs = get_freqs(text)
root = build_tree(freqs)
codes = huffman_code_dict(root)

In [104]:
coded = encode(text, codes)
text2 = decode(coded, codes)

In [105]:
(__prefix_free(codes), __unique(codes), __entropy(freqs, codes))

Entropy of the distribution is: 4.26702362984071
Mean code length: 4.29428705200576


(True, True, True)

In [106]:
text == text2

True