# Labolatorium 3 - Kompresja tekstu

In [1]:
from collections import defaultdict
from string import printable
from random import choice
from bitarray import bitarray
from os.path import getsize
from os import remove
from time import time
from queue import Queue

## Dane tekstowe

In [2]:
f = open ("krzyzacy.txt", 'r')
guttenberg = f.read ()
f.close ()

In [3]:
gutt_1kB = guttenberg[:1024]
gutt_10kB = guttenberg[:10240]
gutt_100kB = guttenberg[:102400]
gutt_1MB = guttenberg[:1024*1024]

In [4]:
f = open ("git.txt", 'r')
github = f.read ()
f.close ()
github *= 4

In [5]:
git_1kB = github[:1024]
git_10kB = github[:10240]
git_100kB = github[:102400]
git_1MB = github[:1024*1024]

In [6]:
def make_text (n):
    result = ""
    for _ in range (n):
        letter = choice(printable)
        result += letter
    return result

In [7]:
rand_1kB = make_text(1024)
rand_10kB = make_text(10240)
rand_100kB = make_text(102400)
rand_1MB = make_text(1024*1024)

## Struktura węzła

In [8]:
class Node:
    def __init__ (self, weight = 0, val = None):
        self.value = val
        self.weight = weight        
        self.parent = self
        self.children = {}
        
    def make_parent (self, n1, n2):
        self.children[1] = n1
        self.children[0] = n2
        self.weight = n1.weight + n2.weight
        n1.parent = self
        n2.parent = self

## Statystyczny Huffman

In [9]:
def count_letter (text):
    count = defaultdict(int)
    for letter in text:
        count[letter] += 1
    return count

def huffman (text):
    letter_counts = count_letter (text)
    nodes = []
    for letter, weight in letter_counts.items():
        nodes.append(Node(weight, letter))
    internal_nodes = []
    leafs = sorted(nodes, key=lambda n: n.weight)
    while len(leafs) + len(internal_nodes) > 1:
        elements = []
        for _ in range (2):
            if len(leafs) == 0:
                elements.append(internal_nodes[0])
                del (internal_nodes[0])
            elif len (internal_nodes) == 0 or leafs[0].weight <= internal_nodes[0].weight:
                elements.append(leafs[0])
                del (leafs[0])
            else:
                elements.append(internal_nodes[0])
                del (internal_nodes[0])
        new = Node()
        new.make_parent(elements[0], elements[1])
        internal_nodes.append(new)
    return internal_nodes[0]  

## Dynamiczny Huffman

In [10]:
def find_node (root, w):
    node = root
    qu = Queue ()
    qu.put(node)
    while not qu.empty():
        node = qu.get()
        if node.weight == w:
            return node
        if node.value == None:
            qu.put(node.children[1])
            qu.put(node.children[0])
    return None

def increment (node, root):
    if node == root:
        node.weight += 1
        return
    w = node.weight
    w_node = find_node (root, w)
    if w_node != node and w_node != node.parent:
        parent = node.parent
        w_parent = w_node.parent
        if parent.children[0] == node:
            parent.children[0] = w_node
        else:
            parent.children[1] = w_node
        if w_parent.children[0] == w_node:
            w_parent.children[0] = node
        else:
            w_parent.children[1] = node
        w_node.parent = parent
        node.parent = w_parent
    node.weight += 1
    increment (node.parent, root)
        
def dynamic_huffman (text):
    nodes = {}
    nodes["##"] = Node (val = "##")
    root = nodes["##"]
    for letter in text:
        if letter in nodes:
            node = nodes[letter]
            increment(node, root)
        else:
            updated_node = nodes["##"]
            updated_node.value = None
            node = Node(val = letter)
            del nodes["##"]
            zero_node = Node(val = "##")
            updated_node.make_parent (node, zero_node)
            node.weight = 1
            nodes[letter] = node
            nodes["##"] = zero_node
            increment(updated_node, root)
    return root    

## Funkcje kodujące i dekodujące tekst

Struktura pliku wygląda następująco: najpierw pojawiają się zera które ewentualnie byłyby wstawione na koniec pliku aby liczba bitów była podzielna przez 8, następnie jedynka i po tym słownik postaci: 32 bity (UTF-16) na literkę/znak, 8 bitów na ilość bitów w kodzie huffmana, kod huffmana itd., koniec słownika rozpoznajemy po tym że kod literki to "0"*32 bo jest to znak NULL. Po tym kodzie następuje zakodowany tekst.

In [11]:
def library_from_huffman (node, lib, key = ""):
    if node.value != None:
        #print (node.value, "->", key)
        if node.value == "##": return
        lib[node.value] = key
    else:
        library_from_huffman (node.children[0], lib, key + "0")
        library_from_huffman (node.children[1], lib, key + "1")

In [12]:
def encode (text, typ = 1, tree = None):
    if tree == None:
        if typ:
            tree = huffman(text)
        else:
            tree = dynamic_huffman (text)
    library = {}
    library_from_huffman(tree, library)
    #kodowanie tekstu
    encode_text = bitarray()
    for letter in text:
        encode_text += bitarray(library[letter])
    #kodowanie słownika
    encode_library = bitarray("1")
    for letter, code in library.items():
        k = len (code)
        kb = bin(k)[2:]
        t = 8 - len (kb)
        kb = "0"*t + kb
        encode_library.frombytes(bytes(letter, "utf-16"))
        encode_library += bitarray(kb)
        encode_library += bitarray(code)
    encode_library += bitarray("0"*32)
    
    k = 8 - ((len(encode_library) + len(encode_text)) % 8)
    if k == 8: k = 0
    result = bitarray("0"*k) + encode_library + encode_text
    return result

In [13]:
def add_to_tree (tree, letter, code): #budowanie drzewa do dekodowania tekstu
    node = tree
    for bit in code:
        if bit:
            if 1 not in node.children:
                node.children[1] = Node ()
                node.children[1].parent = node
            node = node.children[1]
        else:
            if 0 not in node.children:
                node.children[0] = Node ()
                node.children[0].parent = node
            node = node.children[0]
    node.value = letter

def decode (code):
    tree = Node()
    #odczyt początkowych zer które ignorujemy
    i = 0
    while not code[i]: i+=1
    i+=1
    #odczyt alfabetu dopóki znajdziemy letter = '0'*8
    #w trakcie odczytu budowa drzewa
    q = 32
    letter = code[i:i+q]
    i+=q
    while letter != bitarray("0"*q):
        letter = letter.tobytes().decode("utf-16")
        kb = code[i:i+8]
        kbb = ""
        for bit in kb:
            if bit:
                kbb += "1"
            else:
                kbb += "0"
        k = int (kbb, 2)
        i+=8
        cd = code[i:i+k]
        i+=k
        add_to_tree (tree, letter, cd)
        letter = code[i:i+q]
        i+=q
    #dekodowanie wykorzystując drzewo
    result = ""
    node = tree
    for bit in code[i:]:
        if bit:
            node = node.children[1]
        else:
            node = node.children[0]
        if node.value != None:
            result += node.value
            node = tree
    return result            

## Zapis i odczyt z pliku

In [14]:
def write_to_file (text, filepath = "tmp.txt"):
    f = open (filepath, 'wb')
    f.write(text.tobytes())
    f.close()
    
def read_from_file (filepath = "tmp.txt"):
    f = open (filepath, 'rb')
    text = f.read()
    f.close()
    result = bitarray ()
    result.frombytes(text)
    return result    

## Pomiar czasu

In [15]:
def measure_time (function, *data):
    start_time = time()
    res = function (*data)
    end_time = time()
    elapsed_time = end_time - start_time
    return elapsed_time, res

## Test jednostkowy

In [16]:
def test_compression (text):
    f = open ("temporary.txt", 'w')
    f.write(text)
    f.close()
    size = getsize("temporary.txt")
    remove ("temporary.txt")
    time_encode, code = measure_time (encode, text, 1)
    time_stat = time_encode
    write_to_file (code)
    new_size = getsize ("tmp.txt")
    new_code = read_from_file ()
    remove ("tmp.txt")
    time_decode, result = measure_time (decode, new_code)
    print ("Test kompresji tekstu dla pliku o rozmiarze", size, "Bajtów i statystycznego drzewa Huffmana")
    print ("Rozmiar pliku po kompresji wyniósł", new_size, "Bajtów")
    print ("Współczynnik kompresji wynosi", (1-new_size/size)*100)
    print ("Czas kompresji wyniósł", time_encode, "natomiast czas dekompresji", time_decode)
    if result == text:
        print ("Odczyt powiódł się, tekst otrzymany jest równy wyjściowemu")
    else:
        print ("Utrata tekstu")
    print ()
    time_encode, code = measure_time (encode, text, 0)
    time_dyn = time_encode
    write_to_file (code)
    new_size = getsize ("tmp.txt")
    new_code = read_from_file ()
    remove ("tmp.txt")
    time_decode, result = measure_time (decode, new_code)
    print ("Test kompresji tekstu dla pliku o rozmiarze", size, "Bajtów i dynamicznego drzewa Huffmana")
    print ("Rozmiar pliku po kompresji wyniósł", new_size, "Bajtów")
    print ("Współczynnik kompresji wynosi", (1-new_size/size)*100)
    print ("Czas kompresji wyniósł", time_encode, "natomiast czas dekompresji", time_decode)
    if result == text:
        print ("Odczyt powiódł się, tekst otrzymany jest równy wyjściowemu")
    else:
        print ("Utrata tekstu")
    print ()
    print ("Czas kodowania dla algorytmu statystycznego wynosi", time_stat, "natomiast dla dynamicznego", time_dyn)
    print ("Algorytm statystyczny jest", time_dyn/time_stat, "razy szybszy od dynamicznego")
    print ()
    print ("*****************************************************************")
    print ()

## Testy

In [17]:
test_compression("abracadabra")
string = "ab" *1024 *1024
test_compression(string)

Test kompresji tekstu dla pliku o rozmiarze 11 Bajtów i statystycznego drzewa Huffmana
Rozmiar pliku po kompresji wyniósł 34 Bajtów
Współczynnik kompresji wynosi -209.0909090909091
Czas kompresji wyniósł 7.867813110351562e-05 natomiast czas dekompresji 6.532669067382812e-05
Odczyt powiódł się, tekst otrzymany jest równy wyjściowemu

Test kompresji tekstu dla pliku o rozmiarze 11 Bajtów i dynamicznego drzewa Huffmana
Rozmiar pliku po kompresji wyniósł 34 Bajtów
Współczynnik kompresji wynosi -209.0909090909091
Czas kompresji wyniósł 0.0009222030639648438 natomiast czas dekompresji 5.841255187988281e-05
Odczyt powiódł się, tekst otrzymany jest równy wyjściowemu

Czas kodowania dla algorytmu statystycznego wynosi 7.867813110351562e-05 natomiast dla dynamicznego 0.0009222030639648438
Algorytm statystyczny jest 11.72121212121212 razy szybszy od dynamicznego

*****************************************************************

Test kompresji tekstu dla pliku o rozmiarze 2097152 Bajtów i statyst

In [None]:
test_compression(gutt_1kB)
test_compression(gutt_10kB)
test_compression(gutt_100kB)
test_compression(gutt_1MB)

Test kompresji tekstu dla pliku o rozmiarze 1024 Bajtów i statystycznego drzewa Huffmana
Rozmiar pliku po kompresji wyniósł 1073 Bajtów
Współczynnik kompresji wynosi -4.78515625
Czas kompresji wyniósł 0.0009694099426269531 natomiast czas dekompresji 0.001154184341430664
Odczyt powiódł się, tekst otrzymany jest równy wyjściowemu

Test kompresji tekstu dla pliku o rozmiarze 1024 Bajtów i dynamicznego drzewa Huffmana
Rozmiar pliku po kompresji wyniósł 1074 Bajtów
Współczynnik kompresji wynosi -4.8828125
Czas kompresji wyniósł 0.5927977561950684 natomiast czas dekompresji 0.0010905265808105469
Odczyt powiódł się, tekst otrzymany jest równy wyjściowemu

Czas kodowania dla algorytmu statystycznego wynosi 0.0009694099426269531 natomiast dla dynamicznego 0.5927977561950684
Algorytm statystyczny jest 611.5036891293655 razy szybszy od dynamicznego

*****************************************************************

Test kompresji tekstu dla pliku o rozmiarze 10240 Bajtów i statystycznego drzewa H

In [None]:
test_compression(git_1kB)
test_compression(git_10kB)
test_compression(git_100kB)
test_compression(git_1MB)

In [None]:
test_compression(rand_1kB)
test_compression(rand_10kB)
test_compression(rand_100kB)
test_compression(rand_1MB)

Wnioski:

1) Dla krótkich plików tekst skompesowany zajmuje więcej miejsca niż nieskompresowany, wynika to z potrzeby zapisu słownika.

2) Najlepsze efekty kompresji uzyskujemy dla tekstu pochodzącego z portalu guttenberg, ponieważ jest to tekst języka naturalnego, więc rozkład znaków nie jest jednostajny i używane są tylko litery i nieliczne symbole.

3) Dla tekstu z rozkładu jednostajnego kompresja jest niewielka, jednakże występuje z faktu nie używania wszystkich możliwych znaków.

4) Dla tekstu składającego się tylko z liter 'a' i 'b' kompresja sięga 87%

5) Statystyczny Huffman jest wielokrotnie szybszy od dynamicznego, wynika to użycia czasochłonnego BFSa.

6) Współczynnik kompresji jest podobny dla obydwu algorytmów