# Huffman coding

## Zadanie polega na implementacji dwóch algorytmów kompresji:
 __- statycznego algorytmu Huffmana (1 punkty),__
 
 __- dynamicznego algorytmu Huffmana (2 punkty).__
## Dla każdego z algorytmów należy wykonać następujące zadania:
 __1) Opracować format pliku przechowującego dane,__
 
 __2) Zaimplementować algorytm kompresji i dekompresji danych dla tego formatu pliku,__
 
 __3) Zmierzyć współczynnik kompresji (wyrażone w procentach: 1 - plik_skompresowany / plik_nieskompresowany) dla plików tekstowych o rozmiarach: 1kB, 10kB, 100kB, 1MB,__
 
 __4) Zmierzyć czas kompresji i dekompresji dla plików z punktu 3.__

### Imports

In [1]:
from heapq import heappop, heappush, heapify
from bitarray import bitarray
from bitarray.util import ba2int

### Statyczny algorytm Huffmana

In [2]:
class Node:
    def __init__(self, value, char=None, left=None, right=None):
        self.value = value
        self.char = char
        self.left = left
        self.right = right

    def __gt__(self, other):
        return self.value > other.value


class StaticHuffmanTree:
    def __init__(self, text):
        self.root = self.build_static_huffman_tree(text)
        self.codes = dict()
        self.create_huffman_codes(self.root, self.codes, bitarray())

    def build_static_huffman_tree(self, text):
        letters = dict()
        for char in text:
            letters[char] = letters.get(char, 0) + 1
        leafs = [Node(weight, char) for char, weight in letters.items()]
        while len(leafs) > 1:
            first, second = heappop(leafs), heappop(leafs)
            heappush(leafs, Node(first.value + second.value, left=first, right=second))
        return leafs[0]

    def create_huffman_codes(self, node, codes, code):
        if node.char is not None:
            codes[node.char] = code

        code_cpy = code.copy()
        if node.left is not None:
            code.append(0)
            self.create_huffman_codes(node.left, codes, code)

        if node.right is not None:
            code = code_cpy
            code.append(1)
            self.create_huffman_codes(node.right, codes, code)

    def encode_static_huffman_tree(self, text):
        result = bitarray()
        for char in text:  # TODO check append below
            result.extend(self.codes[char])
        return result
    
    def decode_static_huffman_tree(self, encoded_text):
        node = self.root
        decoded_text = ""
        for bit in encoded_text:
            if not bit:
                node = node.left
            else:
                node = node.right
            if not node.left and not node.right:
                decoded_text += node.char
                node = self.root
        return decoded_text

__Example test__

In [3]:
text = "Hello world. I'm Huffman encoder and decoder!"
tree = StaticHuffmanTree(text)
result = tree.encode_static_huffman_tree(text)
tree.decode_static_huffman_tree(result)

"Hello world. I'm Huffman encoder and decoder!"

### Dynamiczny algorytm Huffmana

In [4]:
class Node2:
    def __init__(self, weight=0, index=0, char=None, left=None, right=None, parent=None):
        self.weight = weight
        self.index = index
        self.char = char
        self.left = left
        self.right = right
        self.parent = parent


class AdaptiveHuffmanTree:
    def __init__(self):
        self.index = 520
        NYT = Node2(weight=0, index=self.index + 1, char='NYT')
        self.NYT = NYT
        self.root = NYT
        self.leaves = {"NYT": self.root}
        self.weights = {0: {self.root}, 1: set()}

    def add_new_node(self, char):
        node = self.NYT
        left_node = Node2(weight=0, index=self.index - 1, parent=node, char="NYT")
        node.left = left_node
        right_node = Node2(weight=1, index=self.index, parent=node, char=char)
        node.right = right_node
        node.char = None
        self.index -= 2
        self.NYT = left_node
        self.weights[0].add(left_node)
        self.weights[1].add(right_node)
        self.leaves[char] = right_node
        self.leaves["NYT"] = left_node
        self.increment_and_swap(node)

    def increment_and_swap(self, node):
        while node != self.root:
            node = node.parent
            max_index_node = max(self.weights[node.weight], key=lambda nd: nd.index)
            if node != max_index_node:
                node.index, max_index_node.index = max_index_node.index, node.index
                if node.parent == max_index_node.parent:
                    if node == node.parent.left:
                        node.parent.right = node
                        node.parent.left = max_index_node
                    else:
                        node.parent.right = max_index_node
                        node.parent.left = node
                else:
                    if node == node.parent.left:
                        node.parent.left = max_index_node
                    else:
                        node.parent.right = max_index_node
                    if max_index_node.parent.left == max_index_node:
                        max_index_node.parent.left = node
                    else:
                        max_index_node.parent.right = node
                    if node.parent != max_index_node.parent:
                        max_index_node.parent, node.parent = node.parent, max_index_node.parent
            self.weights[node.weight].remove(node)
            node.weight += 1
            if node.weight not in self.weights:
                self.weights[node.weight] = set()
            self.weights[node.weight].add(node)

    def get_code(self, char):
        node = self.leaves[char]
        code = bitarray()
        while node != self.root:
            if node == node.parent.left:
                code.append(0)
            else:
                code.append(1)
            node = node.parent
        code.reverse()
        return code

    def encode_adaptive_huffman_tree(self, text):
        coded_text = bitarray()
        for char in text:
            if char in self.leaves:
                coded_text += self.get_code(char)
                self.increment_and_swap(self.leaves[char])
            else:
                coded_char = self.get_code('NYT')
                coded_char.frombytes(char.encode("utf-8"))
                coded_text += coded_char
                self.add_new_node(char)
        end_bits = 8 - len(coded_text) % 8
        coded_text = bitarray(f"{end_bits:08b}") + coded_text + bitarray(end_bits)
        return coded_text


def decode_adaptive_huffman_tree(encoded_text):
    tree = AdaptiveHuffmanTree()
    node = tree.root
    idx = 0
    encoded_text = encoded_text[8:-ba2int(encoded_text[:8])]
    decoded_text = ""
    while idx < len(encoded_text):
        while not (node.left is None and node.right is None):
            if not encoded_text[idx]:
                node = node.left
            else:
                node = node.right
            idx += 1
        if node.char == "NYT":
            char_coded = encoded_text[idx:idx + 8]
            char_decoded = char_coded.tobytes().decode("utf-8")
            tree.add_new_node(char_decoded)
            idx += 8
        else:
            char_decoded = node.char
            tree.increment_and_swap(tree.leaves[char_decoded])
        node = tree.root
        decoded_text += char_decoded
    return decoded_text

__Example test__

In [5]:
text = "Hello world. I'm Huffman encoder and decoder!"
tree = AdaptiveHuffmanTree()
result = tree.encode_adaptive_huffman_tree(text)
print(decode_adaptive_huffman_tree(result))

Hello world. I'm Huffman encoder and decoder!


### Tests

In [6]:
from timeit import default_timer as timer
import os

__Compression test__

In [7]:
def compression_ratio(read_file, write_file):
    original_size = os.path.getsize(read_file)
    coded_size = os.path.getsize(write_file)
    return 1 - coded_size / original_size

In [8]:
def compression_test(read_file, size):
    save_file = f"output_files/compression_static_{size}.txt"
    static_tree = StaticHuffmanTree(text)
    encoded = static_tree.encode_static_huffman_tree(text)
    with open(save_file, "wb+") as f:
        encoded.tofile(f)
    static_compression = compression_ratio(read_file, save_file)
    print(f"Compression ratio for {read_file} for StaticHuffmanTree is {static_compression * 100}%.")
    
    save_file = f"output_files/compression_adaptive_{size}.txt"
    adaptive_tree = AdaptiveHuffmanTree()
    encoded = adaptive_tree.encode_adaptive_huffman_tree(text)
    with open(save_file, "wb+") as f:
        encoded.tofile(f)
    adaptive_compression = compression_ratio(read_file, save_file)
    print(f"Compression ratio for {read_file} for AdaptiveHuffmanTree is {adaptive_compression * 100}%.")

1kb file

In [9]:
compression_test("test_files/1kB.txt", "1kb")

Compression ratio for test_files/1kB.txt for StaticHuffmanTree is 97.60717846460618%.
Compression ratio for test_files/1kB.txt for AdaptiveHuffmanTree is 95.61316051844466%.


10kb file

In [10]:
compression_test("test_files/10kB.txt", "10kb")

Compression ratio for test_files/10kB.txt for StaticHuffmanTree is 99.76083707025411%.
Compression ratio for test_files/10kB.txt for AdaptiveHuffmanTree is 99.5615346287992%.


100kb file

In [11]:
compression_test("test_files/100kB.txt", "100kb")

Compression ratio for test_files/100kB.txt for StaticHuffmanTree is 99.9759896755605%.
Compression ratio for test_files/100kB.txt for AdaptiveHuffmanTree is 99.9559810718609%.


1MB file

In [12]:
compression_test("test_files/1MB.txt", "1Mb")

Compression ratio for test_files/1MB.txt for StaticHuffmanTree is 99.99760654848335%.
Compression ratio for test_files/1MB.txt for AdaptiveHuffmanTree is 99.9956120055528%.


__Time test__

In [13]:
def time_test(filename, n):
    with open(filename, "r") as f:
        text = f.read()
        
    static_tree = StaticHuffmanTree(text)    
    average_time = 0
    for _ in range(n):
        start = timer()
        result = static_tree.encode_static_huffman_tree(text)
        static_tree.decode_static_huffman_tree(result)
        end = timer()
        average_time += (end - start)
    average_time /= n
    print(f"Average time execution for {filename} for StaticHuffmanTree is {average_time} s.")
    
    average_time = 0
    for _ in range(n):
        adaptive_tree = AdaptiveHuffmanTree()
        start = timer()
        result = adaptive_tree.encode_adaptive_huffman_tree(text)
        decode_adaptive_huffman_tree(result)
        end = timer()
        average_time += (end - start)
    average_time /= n
    print(f"Average time execution for {filename} AdaptiveHuffmanTree is {average_time} s.")

1kb file

In [14]:
time_test("test_files/1kB.txt", 100)

Average time execution for test_files/1kB.txt for StaticHuffmanTree is 0.0005781292299616326 s.
Average time execution for test_files/1kB.txt AdaptiveHuffmanTree is 0.012443592740091845 s.


10kb file

In [15]:
time_test("test_files/10kB.txt", 100)

Average time execution for test_files/10kB.txt for StaticHuffmanTree is 0.005187703359988518 s.
Average time execution for test_files/10kB.txt AdaptiveHuffmanTree is 0.1282756534100372 s.


100kb file

In [16]:
time_test("test_files/100kB.txt", 50)

Average time execution for test_files/100kB.txt for StaticHuffmanTree is 0.052261491779827335 s.
Average time execution for test_files/100kB.txt AdaptiveHuffmanTree is 1.3611553820001063 s.


1MB file

In [17]:
time_test("test_files/1MB.txt", 10)

Average time execution for test_files/1MB.txt for StaticHuffmanTree is 0.4891548031000639 s.
Average time execution for test_files/1MB.txt AdaptiveHuffmanTree is 13.80819525959996 s.
