In [1]:
import os
import random
from bitarray import bitarray
from os import remove
from time import time
from collections import defaultdict
from queue import Queue
from os.path import getsize

## Utworzenie węzła

In [2]:
class Node:
    def __init__(self, w=0, v=None):
        self.value = v
        self.parent = self
        self.children = {}
        self.weight = w        
        
    def gen_parent(self, node1, node2):
        self.children[1] = node1
        self.children[0] = node2
        self.weight = node1.weight + node2.weight
        node1.parent = self
        node2.parent = self

## Funkcja pomocnicza do zliczania liter w słowniku

In [3]:
def letter_counter(text):
    letter_dict = defaultdict(int)
    for t in text:
        letter_dict[t] += 1
        
    return letter_dict

## Statyczny kod Huffmana

In [4]:
def huffman_static(text):
    letter_counts = defaultdict(int)
    
    for letter in text:
        letter_counts[letter] += 1
        
    nodes = []
    for letter, count in letter_counts.items():
        nodes.append(Node(count, letter))
        
    leaf_nodes = sorted(nodes, key=lambda n: n.weight)
    nodes_inter = []
    while len(leaf_nodes) + len(nodes_inter) > 1:
        node_list = []
        for _ in range (2):
            if len(leaf_nodes) == 0:
                node_list.append(nodes_inter.pop(0))
            elif len(nodes_inter) == 0 or leaf_nodes[0].weight <= nodes_inter[0].weight:
                node_list.append(leaf_nodes.pop(0))
            else:
                node_list.append(nodes_inter.pop(0))
                
        new_internal_node = Node()
        new_internal_node.gen_parent(node_list[0], node_list[1])
        nodes_inter.append(new_internal_node)
        
    return nodes_inter[0]  

## Dynamiczny kod Huffmana

#### Funkcje pomocnicze

In [5]:
def get_elem(val, tree_root):
    node_queue = Queue()
    current_node = tree_root
    node_queue.put(current_node)
    
    while not node_queue.empty():
        current_node = node_queue.get()
        
        if current_node.weight == val:
            return current_node
        if current_node.value == None:
            node_queue.put(current_node.children[1])
            node_queue.put(current_node.children[0])
            
    return None

In [6]:
def increment_weight(node, tree_root):
    if tree_root == node:
        node.weight += 1
        return
    
    weight_node = get_elem(node.weight, tree_root)
    if weight_node != node and weight_node != node.parent:
        parent = node.parent
        weight_parent = weight_node.parent
        if parent.children[0] == node:
            parent.children[0] = weight_node
        else:
            parent.children[1] = weight_node
        if weight_parent.children[0] == weight_node:
            weight_parent.children[0] = node
        else:
            weight_parent.children[1] = node
        weight_node.parent = parent
        node.parent = weight_parent
        
    node.weight += 1
    increment_weight(node.parent, tree_root)

In [7]:
def huffman_dynamic(text):
    elements = {}
    elements["##"] = Node(v="##")
    tree_root = elements["##"]
    
    for t in text:
        if t in elements:
            node = elements[t]
            increment_weight(node, tree_root)
        else:
            node_new = elements["##"]
            node_new.value = None
            node = Node(v=t)
            
            del elements["##"]
            zero = Node(v="##")
            node_new.gen_parent(node, zero)
            node.weight = 1
            elements[t] = node
            elements["##"] = zero
            increment_weight(node_new, tree_root)
            
    return tree_root

## Enkodowanie i dekodowanie

In [8]:
def huffman_tree_to_library(node, library, key=""):
    if node.value is not None:
        if node.value == "##":
            return
        library[node.value] = key
    else:
        huffman_tree_to_library(node.children[0], library, key + "0")
        huffman_tree_to_library(node.children[1], library, key + "1")

In [9]:
def encode(text, encoding_type=1, huffman_tree=None):
    if huffman_tree is None:
        if encoding_type:
            huffman_tree = huffman_static(text)
        else:
            huffman_tree = huffman_dynamic(text)

    library = {}
    huffman_tree_to_library(huffman_tree, library)
    
    encoded = bitarray()
    for letter in text:
        encoded += bitarray(library[letter])
    
    library_enc = bitarray("1")
    for letter, code in library.items():
        code_len = len(code)
        code_len_bits = bin(code_len)[2:]
        padding_len = 8 - len(code_len_bits)
        code_len_bits = "0" * padding_len + code_len_bits
        library_enc.frombytes(bytes(letter, "utf-16"))
        library_enc += bitarray(code_len_bits)
        library_enc += bitarray(code)
    library_enc += bitarray("0" * 32)
    
    padding_bits = 8 - ((len(library_enc) + len(encoded)) % 8)
    if padding_bits == 8:
        padding_bits = 0
    result = bitarray("0" * padding_bits) + library_enc + encoded
    return result

In [10]:
def tree_insert(tree, letter, c):
    node = tree
    
    for b in c:
        if b:
            if 1 not in node.children:
                node.children[1] = Node()
                node.children[1].parent = node
                
            node = node.children[1]
        else:
            if 0 not in node.children:
                node.children[0] = Node()
                node.children[0].parent = node
            node = node.children[0]
    node.value = letter

In [11]:
def decode(enc):
    root_node = Node()
    
    i = 0
    while not enc[i]:
        i += 1
        
    i += 1
    val = 32
    bits_l = enc[i:i+val]
    i += val
    
    while bits_l != bitarray("0" * val):
        lett = bits_l.tobytes().decode("utf-16")
        kb_bits = enc[i:i+8]
        kb = ""
        for bit in kb_bits:
            if bit:
                kb += "1"
            else:
                kb += "0"
        k = int(kb, 2)
        i += 8
        code_bits = enc[i:i+k]
        i += k
        tree_insert(root_node, lett, code_bits)
        bits_l = enc[i:i+val]
        i += val
        
    res = ""
    node = root_node
    for b in enc[i:]:
        if b:
            node = node.children[1]
        else:
            node = node.children[0]
        if node.value is not None:
            res += node.value
            node = root_node
            
    return res

## Testy

In [12]:
def test_static(path):
    file = open(path, "r", encoding="utf8")
    text = file.read()
    file.close()
    
    file = open("tmp.txt", "w", encoding="utf8")
    file.write(text)
    file.close()
    
    size_original = getsize("tmp.txt")
    remove("tmp.txt")
    
    start = time()
    encoded = encode(text, encoding_type=1)
    end = time()
    encode_static_time = end - start
    
    f = open("temp.txt", "wb")
    f.write(encoded.tobytes())
    f.close()
    
    size_encoded = getsize("temp.txt")
    
    f = open("temp.txt", "rb")
    txt = f.read()
    f.close()
    encoded2 = bitarray()
    encoded2.frombytes(txt)
    remove("temp.txt")
    
    start = time()
    decoded = decode(encoded2)
    end = time()
    decode_time = end - start
    
    print("Statyczny kod Huffmana")
    print("-"*30)
    print(f"Rozmiar oryginalny pliku: {size_original}")
    print(f"Rozmiar skompresowanego pliku: {size_encoded}")
    print(f"Wspolczynnik kompresji: {100*(1-size_encoded/size_original)}")
    print(f"Czas kompresji: {encode_static_time}")
    print(f"Czas dekompresji: {decode_time}")

In [13]:
def test_dynamic(path):
    file = open(path, "r", encoding="utf8")
    text = file.read()
    file.close()
    
    file = open("tmp.txt", "w", encoding="utf8")
    file.write(text)
    file.close()
    
    size_original = getsize("tmp.txt")
    remove("tmp.txt")
    
    start = time()
    encoded = encode(text, encoding_type=0)
    end = time()
    encode_dynamic_time = end - start
    
    f = open("temp.txt", "wb")
    f.write(encoded.tobytes())
    f.close()
    
    size_encoded = getsize("temp.txt")
    
    print("Dynamiczny kod Huffmana")
    print("-"*30)
    print(f"Rozmiar oryginalny pliku: {size_original}")
    print(f"Rozmiar skompresowanego pliku: {size_encoded}")
    print(f"Wspolczynnik kompresji: {100*(1-size_encoded/size_original)}")
    print(f"Czas kompresji: {encode_dynamic_time}")

---

In [14]:
test_static("gutenberg1kB.txt")

Statyczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 734
Rozmiar skompresowanego pliku: 753
Wspolczynnik kompresji: -2.588555858310637
Czas kompresji: 0.0
Czas dekompresji: 0.0


In [15]:
test_dynamic("gutenberg1kB.txt")

Dynamiczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 734
Rozmiar skompresowanego pliku: 753
Wspolczynnik kompresji: -2.588555858310637
Czas kompresji: 0.32991957664489746


In [16]:
test_static("gutenberg10kB.txt")

Statyczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 9716
Rozmiar skompresowanego pliku: 5564
Wspolczynnik kompresji: 42.73363524083985
Czas kompresji: 0.004333019256591797
Czas dekompresji: 0.004210472106933594


In [17]:
test_dynamic("gutenberg10kB.txt")

Dynamiczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 9716
Rozmiar skompresowanego pliku: 5565
Wspolczynnik kompresji: 42.72334293948127
Czas kompresji: 4.218203783035278


In [18]:
test_static("gutenberg100kB.txt")

Statyczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 102045
Rozmiar skompresowanego pliku: 52338
Wspolczynnik kompresji: 48.71086285462296
Czas kompresji: 0.03607916831970215
Czas dekompresji: 0.05614113807678223


In [19]:
test_dynamic("gutenberg100kB.txt")

Dynamiczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 102045
Rozmiar skompresowanego pliku: 52341
Wspolczynnik kompresji: 48.70792297515801
Czas kompresji: 39.986825466156006


In [20]:
test_static("gutenberg1MB.txt")

Statyczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 1023890
Rozmiar skompresowanego pliku: 534964
Wspolczynnik kompresji: 47.751809276387114
Czas kompresji: 0.3996086120605469
Czas dekompresji: 0.6996674537658691


---

In [21]:
test_static("linux1kB.txt")

Statyczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 963
Rozmiar skompresowanego pliku: 1037
Wspolczynnik kompresji: -7.684319833852538
Czas kompresji: 0.0
Czas dekompresji: 0.0


In [22]:
test_dynamic("linux1kB.txt")

Dynamiczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 963
Rozmiar skompresowanego pliku: 1038
Wspolczynnik kompresji: -7.78816199376946
Czas kompresji: 0.5148036479949951


In [23]:
test_static("linux10kB.txt")

Statyczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 10020
Rozmiar skompresowanego pliku: 6856
Wspolczynnik kompresji: 31.57684630738523
Czas kompresji: 0.005766630172729492
Czas dekompresji: 0.0022056102752685547


In [24]:
test_dynamic("linux10kB.txt")

Dynamiczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 10020
Rozmiar skompresowanego pliku: 6856
Wspolczynnik kompresji: 31.57684630738523
Czas kompresji: 5.305941343307495


In [25]:
test_static("linux100kB.txt")

Statyczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 101850
Rozmiar skompresowanego pliku: 65320
Wspolczynnik kompresji: 35.86647029945998
Czas kompresji: 0.04030776023864746
Czas dekompresji: 0.06339073181152344


In [26]:
test_dynamic("linux100kB.txt")

Dynamiczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 101850
Rozmiar skompresowanego pliku: 65320
Wspolczynnik kompresji: 35.86647029945998
Czas kompresji: 56.391067028045654


In [27]:
test_static("linux1MB.txt")

Statyczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 1023500
Rozmiar skompresowanego pliku: 651440
Wspolczynnik kompresji: 36.35173424523693
Czas kompresji: 0.42556262016296387
Czas dekompresji: 0.7046546936035156


---

In [28]:
def generate_file(n, size):
    with open(f"random{size}.txt", "w", encoding="utf8") as f:
        for i in range(n):
            byte = random.randint(0, 255)
            f.write(chr(byte))
    f.close()

In [29]:
generate_file(68, "1kB")

In [30]:
generate_file(6800, "10kB")

In [31]:
generate_file(68000, "100kB")

In [32]:
generate_file(681000, "1MB")

---

In [33]:
test_static("random1kB.txt")

Statyczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 103
Rozmiar skompresowanego pliku: 417
Wspolczynnik kompresji: -304.8543689320389
Czas kompresji: 0.0
Czas dekompresji: 0.0


In [34]:
test_dynamic("random1kB.txt")

Dynamiczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 103
Rozmiar skompresowanego pliku: 418
Wspolczynnik kompresji: -305.8252427184466
Czas kompresji: 0.044270992279052734


In [35]:
test_static("random10kB.txt")

Statyczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 10246
Rozmiar skompresowanego pliku: 8325
Wspolczynnik kompresji: 18.748780011711887
Czas kompresji: 0.010044336318969727
Czas dekompresji: 0.001997232437133789


In [36]:
test_dynamic("random10kB.txt")

Dynamiczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 10246
Rozmiar skompresowanego pliku: 8327
Wspolczynnik kompresji: 18.729260199102093
Czas kompresji: 22.827004194259644


In [37]:
test_static("random100kB.txt")

Statyczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 102538
Rozmiar skompresowanego pliku: 69465
Wspolczynnik kompresji: 32.25438374066199
Czas kompresji: 0.03859257698059082
Czas dekompresji: 0.0766897201538086


In [38]:
test_static("random1MB.txt")

Statyczny kod Huffmana
------------------------------
Rozmiar oryginalny pliku: 1026958
Rozmiar skompresowanego pliku: 681855
Wspolczynnik kompresji: 33.604392779451544
Czas kompresji: 0.3555150032043457
Czas dekompresji: 0.7702476978302002
