In [None]:
import numpy as np
from queue import PriorityQueue, Empty
from collections import defaultdict

In [None]:
class BitList(object):
    def __init__(self):
        self.acc = bytearray()
        self.tmp = ""
        self.sealed = False
        
    def extend(self, bits):
        assert not self.sealed
        self.tmp += bits
        while len(self.tmp) >= 8:
            self.acc.append(int(self.tmp[:8], base=2))
            self.tmp = self.tmp[8:]
            
    def seal(self):
        assert not self.sealed
        if self.tmp:
            end_byte_len = len(self.tmp)
            self.tmp += "0000000"
            self.acc.append(int(self.tmp[:8], base=2))
        else:
            end_byte_len = 8
        self.acc.append(end_byte_len)
        self.sealed = True

    def generator(self):
        assert self.sealed
        endbyte_len = self.acc[-1]
        assert 0 < endbyte_len <= 8
        for c in self.acc[:-2]:
            for b in "{:>08s}".format(bin(c)[2:]):
                yield b
        c = self.acc[-2]
        for b in "{:>08s}".format(bin(c)[2:])[:endbyte_len]:
            yield b
            
    @staticmethod
    def frombytes(b):
        res = BitList()
        res.acc = b
        res.sealed = True
        return res

In [None]:
class Node(object):
    def __init__(self, left, right):
        self.left = left
        self.right = right

    def __lt__(self, other):
        if isinstance(self, Leaf) and isinstance(other, Leaf):
            return self.value < other.value
        elif isinstance(self, Leaf):
            return True
        elif isinstance(other, Leaf):
            return False
        else:
            if self.left < other.left:
                return True
            elif self.left == other.left:
                return self.right < other.right
            else:
                return False

    def __le__(self, other):
        if isinstance(self, Leaf) and isinstance(other, Leaf):
            return self.value <= other.value
        elif isinstance(self, Leaf):
            return True
        elif isinstance(other, Leaf):
            return False
        else:
            if self.left < other.left:
                return True
            elif self.left == other.left:
                return self.right <= other.right
            else:
                return False

    def __eq__(self, other):
        if isinstance(self, Leaf) and isinstance(other, Leaf):
            return self.value == other.value
        elif isinstance(self, Leaf):
            return False
        elif isinstance(other, Leaf):
            return False
        else:
            return self.left == other.left and self.right == other.right

    def __hash__(self):
        return hash(self.left) ^ hash(self.right)
    
    def bin(self):
        data, bit_list = self._bin(bytearray(), BitList())
        bit_list.seal()
        return data, bit_list.acc

    def _bin(self, data, bit_list):
        bit_list.extend("0")
        data, bit_list = self.left._bin(data, bit_list)
        data, bit_list = self.right._bin(data, bit_list)
        return data, bit_list

    @staticmethod
    def reconstruct(step, data, structure):
        root = None
        stack = []
        j = 0
        for i in BitList.frombytes(structure).generator():
            if i == "0":
                newnode = Node(None, None)
                if stack:
                    prev = stack[-1]
                    if prev.left is None:
                        prev.left = newnode
                    else:
                        assert prev.right is None
                        prev.right = newnode
                else:
                    root = newnode
                stack.append(newnode)
            else:
                assert i == "1"
                newnode = Leaf(bytes(data[j: j+step]))
                j += step
                if stack:
                    prev = stack[-1]
                    if prev.left is None:
                        prev.left = newnode
                    else:
                        assert prev.right is None
                        prev.right = newnode
                        while stack and stack[-1].right is not None:
                            stack.pop()
                else:
                    root = newnode
        assert not stack
        assert j == len(data)
        return root

class Leaf(Node):
    def __init__(self, i):
        self.value = i

    def __hash__(self):
        return hash(self.value)

    def _bin(self, data, bit_list):
        data.extend(self.value)
        bit_list.extend("1")
        return data, bit_list

In [None]:
def pad(s, n):
    if len(s) < n:
        return s + b"\x00" * (n - len(s))
    return s

def tokenize(s, n):
    for i in range(0, len(s)//n):
        yield s[n*i:n*(i+1)]
    if len(s) % n != 0:
        yield pad(s[n*(len(s)//n):], n)

In [None]:
def encode(tokenized_text, key):
    cipher_text = BitList()
    for c in tokenized_text:
        cipher_text.extend(key[c])
    cipher_text.seal()
    return cipher_text.acc

def decode(cipher_text, node):
    text = bytearray()
    curr = node
    for b in BitList.frombytes(cipher_text).generator():
        if b == "0":
            curr = curr.left
        else:
            assert b == "1"
            curr = curr.right
        if isinstance(curr, Leaf):
            text.extend(curr.value)
            curr = node
    return bytes(text)

In [None]:
def huffman(tokens):
    dt = defaultdict(int)
    for k in tokens:
        dt[k] += 1
    q = PriorityQueue()
    for k, v in dt.items():
        q.put((v, Leaf(k)))
    try:
        while True:
            ai, a = q.get()
            bi, b = q.get_nowait()
            c = Node(a, b)
            q.put((ai+bi, c))
    except Empty:
        pass
    return a

def get_key(root_node):
    key = {}
    def add_to_key(key, node, stack=[]):
        if isinstance(node, Leaf):
            key[node.value] = "".join(stack)
        else:
            stack.append("0")
            add_to_key(key, node.left, stack)
            stack.pop()
            stack.append("1")
            add_to_key(key, node.right, stack)
            stack.pop()
    add_to_key(key, root_node)
    return key

In [None]:
import pandas as pd
from plotnine import *

In [None]:
def stat(src, i):
    dt = defaultdict(int)
    for k in tokenize(src, i):
        dt[k] += 1
    key = get_key(huffman(tokenize(src, i)))
    dt2 = {}
    for k in tokenize(src, i):
        dt2[k] = (dt[k], len(key[k]))
    arr = np.array(list(dt2.values()))
    freq = arr[:, 0]
    theo_prob = freq / sum(freq)
    huff_prob = 2. ** -arr[:, 1]
    return {
        "i": i,
        "entropy": sum(freq * -np.log2(theo_prob)),
        "document_size": sum(freq * -np.log2(huff_prob)),
        "tree_data_size": len(dt)*len(k)*8,
        "tree_rep_size": 2*len(dt)-1,
        "total_size": sum(freq * -np.log2(huff_prob)) + len(dt)*len(k)*8 + 2*len(dt)-1
    }

In [None]:
with open("data/input1.txt", "rb") as f:
    src = f.read()

In [None]:
df = pd.DataFrame([stat(src, i) for i in range(1, 15)])
df_melt = df.melt(id_vars="i", value_name="size")

In [None]:
( ggplot(data=df_melt)
 +geom_point(aes(x="i", y="size", color="variable"))
 +geom_hline(yintercept=len(src)*8, color="black") # original
 +annotate("label", 15, len(src)*8, label="Orig", size=8)
 +geom_hline(yintercept=1200*8, color="orange") # zip
 +annotate("label", 15, 1200*8, label="Zip", size=8)
 +scale_x_continuous(breaks=range(15), minor_breaks=())
 +ggtitle("Bits Required to Represent input1.txt using Huffman Coding")
 +xlab("Token Size (bytes)")
 +ylab("Representation Size (bits)")
)

In [None]:
with open("data/input2.txt", "rb") as f:
    src2 = f.read()

In [None]:
df2 = pd.DataFrame([stat(src2, i) for i in range(1, 15)])
df2_melt = df2.melt(id_vars="i", value_name="size")

In [None]:
( ggplot(data=df2_melt)
 +geom_point(aes(x="i", y="size", color="variable"))
 +geom_hline(yintercept=len(src2)*8, color="black") # original
 +annotate("label", 15, len(src2)*8, label="Orig", size=8)
 +geom_hline(yintercept=17600*8, color="orange") # zip
 +annotate("label", 15, 17600*8, label="Zip", size=8)
 +scale_x_continuous(breaks=range(15), minor_breaks=())
 +ggtitle("Bits Required to Represent input2.txt using Huffman Coding")
 +xlab("Token Size (bytes)")
 +ylab("Representation Size (bits)")
)

In [None]:
with open("data/input3.txt", "rb") as f:
    src3 = f.read()

In [None]:
df3 = pd.DataFrame([stat(src3, i) for i in range(1, 15)])
df3_melt = df3.melt(id_vars="i", value_name="size")

In [None]:
( ggplot(data=df3_melt)
 +geom_point(aes(x="i", y="size", color="variable"))
 +geom_hline(yintercept=len(src3)*8, color="black") # original
 +annotate("label", 15, len(src3)*8, label="Orig", size=8)
 +geom_hline(yintercept=56200*8, color="orange") # zip
 +annotate("label", 15, 56200*8, label="Zip", size=8)
 +scale_x_continuous(breaks=range(15), minor_breaks=())
 +ggtitle("Bits Required to Represent input3.txt using Huffman Coding")
 +xlab("Token Size (bytes)")
 +ylab("Representation Size (bits)")
)

In [None]:
root_node = huffman(tokenize(src3, 2))
data, structure = root_node.bin()
root_reconstruct = Node.reconstruct(2, data, structure)
assert root_node == root_reconstruct

In [None]:
key = get_key(root_node)

In [None]:
cip = encode(tokenize(src3, 2), key)

In [None]:
decoded = decode(cip, root_reconstruct)

In [None]:
with open("output/output3.txt", "wb") as f:
    f.write(cip)

In [None]:
with open("output/decode3.txt", "wb") as f:
    f.write(decoded)

In [None]:
len(cip)*8, len(data)*8, len(structure)*8

In [None]:
df3.iloc[1, :]