# Static and adaptive Huffman coding

## Importing relevant modules

In [158]:
from queue import PriorityQueue
from bitarray import bitarray
from time import perf_counter
from collections import deque
import os

## Static Huffman coding

In [159]:
class SHTNode:
    def __init__(self, weight, char = None, left = None, right = None):
        self.char = char
        self.weight = weight
        self.left = left
        self.right = right
        
    def get_code_mapping(self, code_map = {}, code = bitarray()):
        if self.char:
            code_map[self.char] = code
        
        if self.left:
            left_code = code.copy()
            left_code.append(0)
            self.left.get_code_mapping(code_map, left_code)
        
        if self.right:
            right_code = code.copy()
            right_code.append(1)
            self.right.get_code_mapping(code_map, right_code)
            
        return code_map
        
    def __lt__(self, other):
        return self.weight < other.weight


class StaticHuffmanTree:
    def __init__(self, text):
        self.text = text
        self._chars = self._count_occurences(text)
        self.root = self.build_tree()
        self.code_map = self.root.get_code_mapping()
        self.encoded = None
    
    def _count_occurences(self, text):
        count = {}
        for c in text:
            if c not in count:
                count[c] = 0
            count[c] += 1
        
        return count 
    
    def build_tree(self):
        priorityQueue = PriorityQueue()
        for c, weight in self._chars.items():
            priorityQueue.put(SHTNode(weight, c))
        
        while priorityQueue.qsize() > 1:
            node1 = priorityQueue.get()
            node2 = priorityQueue.get()
            
            merged_node = SHTNode(weight = node1.weight + node2.weight, left = node1, right = node2)
            priorityQueue.put(merged_node)
        
        return priorityQueue.get()
    
    def encode(self):
        encoded = bitarray()
        for c, counter in self._chars.items():
            char = bitarray()
            char.frombytes(c.encode())
            weight = bitarray()
            weight.frombytes(counter.to_bytes(4, byteorder = 'big'))
            
            encoded += char + weight
        
        tab_len = bitarray()
        tab_len.frombytes(len(encoded).to_bytes(4, byteorder = 'big'))
        encoded = tab_len + encoded
        for char in self.text:
            encoded += self.code_map[char]
        
        self.encoded = encoded
        return encoded
    
    def decode(self):
        encoded_len = int.from_bytes(self.encoded[:32], byteorder = 'big')
        chars = {}
        for i in range(32, encoded_len, 40):
            char = self.encoded[i: i + 8].tobytes().decode()
            
            weight = int.from_bytes(self.encoded[i + 8: i + 40], byteorder = 'big')
            chars[char] = weight
        
        text = ""
        i = encoded_len + 32
        while i < len(self.encoded):
            node = self.root
            while node.left and node.right:
                if self.encoded[i]:
                    node = node.right
                else:
                    node = node.left
                i += 1
                
            text += node.char
        
        return text

## Adaptive Huffman coding

In [160]:
class AHTNode:
    def __init__(self, weight, char = None, left = None, right = None, parent = None, counter = None):
        self.weight = weight
        self.char = char
        self.left = left
        self.right = right
        self.parent = parent
        self.counter = counter
    
    def get_code_mapping(self, code_map = {}, code = bitarray()):
        if self.char:
            code_map[self.char] = code
        
        if self.left:
            left_code = code.copy()
            left_code.append(0)
            self.left.get_code_mapping(code_map, left_code)
        
        if self.right:
            right_code = code.copy()
            right_code.append(1)
            self.right.get_code_mapping(code_map, right_code)
            
        return code_map
    
    def __lt__(self, other):
        return self.weight < other.weight
    
    
class AdaptiveHuffmanTree:
    def __init__(self, text, encoded = None):
        self.text = text
        self.root = AHTNode(weight = 0, char = "\0")
        self.nyt = self.root
        self.code_map = self.root.get_code_mapping()
        self.nodes = []
        self.leaves = {}
        self.encoded = encoded
        
    def swap(self, node1, node2):
        if node1 is node2 or node1 is node2.parent or node2 is node1.parent:
            return
        
        if node1.parent is node2.parent:
            if node1.parent.left is node1:
                node1.parent.left, node1.parent.right = node2, node1
            else:
                node1.parent.left, node1.parent.right = node1, node2
            
            return
        
        p_node1, p_node2 = node1.parent, node2.parent
        if node1 is p_node1.left:
            p_node1.left = node2
        else:
            p_node1.right = node2
        
        if node2 is p_node2.left:
            p_node2.left = node1
        else:
            p_node2.right = node1
        
        node1.parent, node2.parent = p_node2, p_node1
    
    def get_leader(self, node):
        counter = node.counter - 1
        while counter >= 0 and self.nodes[counter].weight < node.weight:
            counter -= 1
        
        return self.nodes[counter + 1]
    
    def split_nyt(self, node):
        nyt = AHTNode(0, char = "\0")
        self.nyt.char, self.nyt.left, self.nyt.right = None, nyt, node
        node.parent, nyt.parent = self.nyt, self.nyt
        self.nyt = nyt
    
    def update_counters(self):
        self.nodes = []
        q = deque()
        q.append(self.root)
        
        counter = 0
        while q:
            node = q.popleft()
            self.nodes.append(node)
            node.counter = counter
            if node.right:
                q.append(node.right)
            if node.left:
                q.append(node.left)
            
            counter += 1
        
    def increment(self, node):
        updated = False
        while node:
            node.weight += 1
            leader = self.get_leader(node)
            if leader != node and node.parent != leader and leader.parent != node:
                self.swap(node, leader)
                self.update_counters()
                updated = True
            
            node = node.parent
        
        return updated
    
    def update(self, char):
        updated = False
        if char not in self.leaves:
            node = AHTNode(weight = 1, char = char)
            self.leaves[char] = node
            self.split_nyt(node)
            p = node.parent
            p.counter = len(self.nodes)
            self.nodes.append(p)
            node.counter = len(self.nodes)
            self.nodes.append(node)
            self.increment(p)
            updated = True
        else:
            updated = self.increment(self.leaves[char])
        
        if updated:
            self.root.get_code_mapping(self.code_map)
            
        return updated
    
    def get_code_map(self):
        if self.root.left or self.root.right:
            return self.code_map
        
        temp = bitarray()
        temp.append(0)
        return {self.root.char: temp}

    def encode(self):
        encoded = bitarray()
        
        curr_code_map = self.get_code_map().copy()
        for c in self.text:
            if self.update(c):
                encoded += curr_code_map["\0"]
                curr_code_map = self.get_code_map().copy()
                
                char = bitarray()
                char.frombytes(c.encode())
                encoded += char
            encoded += curr_code_map[c]
        
        self.encoded = encoded
        return encoded
        
    def decode(self):
        text = ""
        updated = False
        
        i = 1
        while i < len(self.encoded):
            node = self.root
            while node.left and node.right:
                if self.encoded[i]:
                    node = node.right
                else:
                    node = node.left
                i += 1
            
            if node.char == '\0':
                self.update(self.encoded[i: i + 8].tobytes().decode())
                updated = True
                i += 8
            else:
                if not updated:
                    self.update(node.char)
                updated = False
                text += node.char
            
        return text

## Testing function

In [161]:
def test_compression(file_name):
    output_file_name = file_name[:-4] + "_compressed.txt"
    file_size = os.path.getsize("files/" + file_name)
    
    print("-" * 125)
    print("\t\tStatic Huffman Coding\n")
    with open("files/" + file_name, "r") as input_file:
        text = input_file.read()
    
    with open("files/" + output_file_name, "wb") as output_file:
        SHT = StaticHuffmanTree(text)
        
        start = perf_counter()
        encoded = SHT.encode()
        encoded.tofile(output_file)
        print("Encode took: %.5f" % (perf_counter() - start), "seconds")
        
        start = perf_counter()
        decoded_text = SHT.decode()
        print("Decode took: %.5f" % (perf_counter() - start), "seconds")
        print("Test if decoded correctly:", text == decoded_text)
        
    compressed_file_size = os.path.getsize("files/" + output_file_name)
    print("Uncompressed file size:", file_size, "B")
    print("Compressed file size:", compressed_file_size, "B")
    print("Compression:", round((1 - (compressed_file_size / file_size)) * 100, 2), "%")
    print("-" * 125)
    
    print("\t\tAdaptive Huffman Coding\n")
    adaptive_file_name = output_file_name[:-4] + "_adaptive.txt"
    with open("files/" + adaptive_file_name, "wb") as output_file:
        AHT = AdaptiveHuffmanTree(text)
        
        start = perf_counter()
        encoded = AHT.encode()
        encoded.tofile(output_file)
        print("Encode took: %.5f" % (perf_counter() - start), "seconds")
        
        AHT = AdaptiveHuffmanTree(text, AHT.encoded)
        start = perf_counter()
        decoded_text = AHT.decode()
        print("Decode took: %.5f" % (perf_counter() - start), "seconds")
        print("Test if decoded correctly:", text == decoded_text)
    
    compressed_dynamically_file_size = os.path.getsize("files/" + adaptive_file_name)
    print("Uncompressed file size:", file_size, "B")
    print("Compressed file size:", compressed_dynamically_file_size, "B")
    print("Compression:", round((1 - (compressed_dynamically_file_size / file_size)) * 100, 2), "%")   
    print("-" * 125)

## Test 1 - 1kB

In [162]:
test_compression("1kB.txt")

-----------------------------------------------------------------------------------------------------------------------------
		Static Huffman Coding

Encode took: 0.00014 seconds
Decode took: 0.00128 seconds
Test if decoded correctly: True
Uncompressed file size: 1000 B
Compressed file size: 397 B
Compression: 60.3 %
-----------------------------------------------------------------------------------------------------------------------------
		Adaptive Huffman Coding

Encode took: 0.00491 seconds
Decode took: 0.00645 seconds
Test if decoded correctly: True
Uncompressed file size: 1000 B
Compressed file size: 463 B
Compression: 53.7 %
-----------------------------------------------------------------------------------------------------------------------------


## Test 2 - 10kB

In [163]:
test_compression("10kB.txt")

-----------------------------------------------------------------------------------------------------------------------------
		Static Huffman Coding

Encode took: 0.00116 seconds
Decode took: 0.01190 seconds
Test if decoded correctly: True
Uncompressed file size: 10000 B
Compressed file size: 3572 B
Compression: 64.28 %
-----------------------------------------------------------------------------------------------------------------------------
		Adaptive Huffman Coding

Encode took: 0.03741 seconds
Decode took: 0.05151 seconds
Test if decoded correctly: True
Uncompressed file size: 10000 B
Compressed file size: 3720 B
Compression: 62.8 %
-----------------------------------------------------------------------------------------------------------------------------


## Test 3 - 100kB

In [164]:
test_compression("100kB.txt")

-----------------------------------------------------------------------------------------------------------------------------
		Static Huffman Coding

Encode took: 0.01106 seconds
Decode took: 0.11253 seconds
Test if decoded correctly: True
Uncompressed file size: 100000 B
Compressed file size: 35322 B
Compression: 64.68 %
-----------------------------------------------------------------------------------------------------------------------------
		Adaptive Huffman Coding

Encode took: 0.34509 seconds
Decode took: 0.47951 seconds
Test if decoded correctly: True
Uncompressed file size: 100000 B
Compressed file size: 35528 B
Compression: 64.47 %
-----------------------------------------------------------------------------------------------------------------------------


## Test 4 - 1MB

In [165]:
test_compression("1MB.txt")

-----------------------------------------------------------------------------------------------------------------------------
		Static Huffman Coding

Encode took: 0.10744 seconds
Decode took: 1.11233 seconds
Test if decoded correctly: True
Uncompressed file size: 1000000 B
Compressed file size: 352826 B
Compression: 64.72 %
-----------------------------------------------------------------------------------------------------------------------------
		Adaptive Huffman Coding

Encode took: 3.43498 seconds
Decode took: 4.70471 seconds
Test if decoded correctly: True
Uncompressed file size: 1000000 B
Compressed file size: 353829 B
Compression: 64.62 %
-----------------------------------------------------------------------------------------------------------------------------
