In [1]:
from queue import PriorityQueue
from printTree import display_tree
from bitarray import bitarray
import os

# Statyczny algorytm Huffmana

In [2]:
class Huffman_tree:
    class Node:
        def __init__(self, letter, freq, left = None, right = None):
            self.letter = letter
            self.freq = freq
            self.left = left
            self.right = right
            
            self.code = ''
        
        def __lt__(self, other):
            return self.freq < other.freq
        
    def __init__(self, text):
        self.freq = {}
        
        for l in text:
            if l in self.freq:
                self.freq[l] += 1
            else:
                self.freq[l] = 1
        q = PriorityQueue()
        
        for l, f in self.freq.items():
            q.put((f, self.Node(l, f)))
        
        while q.qsize() > 1:
            _, left = q.get()
            _, right = q.get()
            
            new_node = self.Node(None, left.freq + right.freq, left, right)
            
            q.put((new_node.freq, new_node))

        _, self.root = q.get()
        
        self.code = {}
        self.get_code(self.root, "", "")
    
    
    def get_code(self, node, prev_code, symbol):
        node.code = prev_code + symbol
        
        if node.letter is not None:
            self.code[node.letter] = node.code
        
        if node.left is not None:
            self.get_code(node.left, node.code, "0")
        
        if node.right is not None:
            self.get_code(node.right, node.code, "1")
    
    def print(self):
        def _to_print(root):
            if root.letter is not None:
                return " " + str(root.letter) + ":" + root.code
            return "_"
        display_tree(self.root, whatToPrint=_to_print)

In [3]:
text = "aardvark"

huff = Huffman_tree(text)

In [4]:
huff.print()


      ___________________      
     /                   \     
   _____           _________   
  /     \         /         \  
 k:00  r:01     ______     a:11
               /      \        
             d:100  v:101      



In [5]:
print(huff.code)

{'k': '00', 'r': '01', 'd': '100', 'v': '101', 'a': '11'}


In [6]:
def encode(text, code):
    res = bitarray()
    
    for l in text:
        res += bitarray(code[l])
    return res

In [7]:
def decode(code, tree):
    res = ""
    node = tree.root

    for c in code:
        if not c:
            node = node.left
        else:
            node = node.right
        
        if node.left is None and node.right is None:
            res += node.letter
            node = tree.root
        
    return res

In [8]:
encoded = encode(text, huff.code)
print(encoded)

bitarray('111101100101110100')


In [9]:
decoded = decode(encoded, huff)
print(decoded)
print(text == decoded)
huff.print()

aardvark
True

      ___________________      
     /                   \     
   _____           _________   
  /     \         /         \  
 k:00  r:01     ______     a:11
               /      \        
             d:100  v:101      



# Dynamiczny algorytm Huffmana

In [10]:
class Adaptive_Huffman_tree:
    NYT = "NYT"
    class Node:
        def __init__(self, letter, freq = 0, left = None, right = None, parent = None):
            self.letter = letter
            self.freq = freq
            self.left = left
            self.right = right
            self.parent = parent

        def __lt__(self, other):
            return self.freq < other.freq
        
    def __init__(self):
        self.root = self.Node(self.NYT)
        self.letters = {self.NYT : self.root}
    
    
    def encode(self, text):
        res = bitarray()
        for letter in text:
            if letter not in self.letters:
                tmp = bitarray()
                tmp.frombytes(letter.encode("utf-8"))
                res += bitarray(self.get_path()) + tmp
            else:
                res += bitarray(self.get_path(self.letters[letter]))

            self.update(letter)
        res += bitarray(self.get_path()) #pseudo_eof
        return res

    def decode(self, text):
        res = ""
        node = self.root
        i = 0
        while i < len(text):
            c = text[i]      
            if i != 0:
                if not c:
                    node = node.left
                else:
                    node = node.right

            if node.left is None and node.right is None:
                if node.letter == self.NYT:
                    if i != 0:
                        i += 1
                    if i + 8 >= len(text): #pseudo_eof
                        break
                    letter = text[i:i + 8].tobytes().decode("utf-8", errors="ignore")
                    
                    i += 8-1
                else:
                    letter = node.letter
               
                res += letter
                self.update(letter)
                node = self.root
 
            i += 1
        return res
    
    def update(self, letter):
        if letter not in self.letters:
            nyt_node = self.letters[self.NYT]

            node = self.Node(letter)

            internal_node = self.Node(None, left = nyt_node, right = node, parent = nyt_node.parent)
            nyt_node.parent = internal_node
            node.parent = internal_node

            self.letters[letter] = node

            if nyt_node is self.root:
                self.root = internal_node
            else:
                internal_node.parent.left = internal_node
        else:
            node = self.letters[letter]

        while node is not None:
            node.freq += 1
            self.fix(node)
            node = node.parent  
                
    def fix(self, node):
        if node.parent is None:
            return
        node = node.parent
        
        if node.left > node.right:
            node.left, node.right = node.right, node.left
    
    def get_path(self, node = None):
        if node is None:
            node = self.letters[self.NYT]
        res = ""
        while node.parent:
            if node.parent.left is node:
                res += "0"
            else:
                res += "1"
            node = node.parent
        return res[::-1]
     
    
    def print(self):
        def _to_print(root):
            if root.letter is not None:
                return " " + str(root.letter) + ":" + str(root.freq) 
            return "_" + str(root.freq) + "_"
        display_tree(self.root, whatToPrint=_to_print)

In [11]:
tree_encode = Adaptive_Huffman_tree()
encoded = tree_encode.encode("aardvark")
tree_encode.print()
print(encoded)


   __8______                             
  /         \                            
 a:3      __5______                      
         /         \                     
        r:2      __3_______________      
                /                  \     
               d:1           ______2___  
                            /          \ 
                         ___1___      v:1
                        /       \        
                      NYT:0    k:1       

bitarray('01100001100111001000011001000000111011001011000110101111100')


In [12]:
tree_decode = Adaptive_Huffman_tree()
decoded = tree_decode.decode(encoded)
tree_decode.print()
print(decoded)


   __8______                             
  /         \                            
 a:3      __5______                      
         /         \                     
        r:2      __3_______________      
                /                  \     
               d:1           ______2___  
                            /          \ 
                         ___1___      v:1
                        /       \        
                      NYT:0    k:1       

aardvark


In [13]:
def save_bits_to_file(bits, file):
    with open(file+"_encoded", 'wb') as f:
        bits.tofile(f)
        
def read_bits_from_file(file):
    with open(file+"_encoded", 'rb') as f:
        encoded = bitarray()
        encoded.fromfile(f)
        return encoded

In [14]:
def test_classic(file):
    with open(file) as f:
        print("test classic", file)
        text = f.read()
        huff = Huffman_tree(text)
        encoded = encode(text, huff.code)
        print("encoding time:")
        %timeit encoded = encoded = encode(text, huff.code)
        
        save_bits_to_file(encoded, file)

        print("przed kompresja", os.path.getsize(file))
        print("po kompresjii", os.path.getsize(file + "_encoded"))
        print("współczynnik kompresji", 1 - os.path.getsize(file + "_encoded") / os.path.getsize(file))
        
        decoded = decode(encoded, huff)
        print("decoding time:")
        %timeit decoded = decode(encoded, huff)
        
        print("equal = ", decoded == text)
        print()

In [15]:
def test_adaptive(file):
    with open(file) as f:
        print("test adaptive", file)
        text = f.read()
        tree_encode = Adaptive_Huffman_tree()
        encoded = tree_encode.encode(text)
        print("encoding time:")
        %timeit encoded = tree_encode.encode(text)
        
        save_bits_to_file(encoded, file)

        print("przed kompresja", os.path.getsize(file))
        print("po kompresjii", os.path.getsize(file + "_encoded"))
        print("współczynnik kompresji", 1 - os.path.getsize(file + "_encoded") / os.path.getsize(file))

        encoded = read_bits_from_file(file)

        tree_decode = Adaptive_Huffman_tree()
        
        decoded = tree_decode.decode(encoded)
        print("decoding time:")
        %timeit decoded = tree_decode.decode(encoded)
        
        print("equal = ", decoded == text)
        print()

In [16]:
files = ["1kB.txt", "10kB.txt", "100kB.txt", "1MB.txt"]

In [17]:
for file in files[:]:
    test_classic(file)
    print()

test classic 1kB.txt
encoding time:
239 µs ± 5.79 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
przed kompresja 1354
po kompresjii 426
współczynnik kompresji 0.6853766617429837
decoding time:
290 µs ± 1.15 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
equal =  True


test classic 10kB.txt
encoding time:
2.45 ms ± 57 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
przed kompresja 13512
po kompresjii 4298
współczynnik kompresji 0.6819123741859088
decoding time:
3.25 ms ± 191 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
equal =  True


test classic 100kB.txt
encoding time:
24.8 ms ± 1.22 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
przed kompresja 135091
po kompresjii 42691
współczynnik kompresji 0.6839833889748392
decoding time:
31 ms ± 1.21 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
equal =  True


test classic 1MB.txt
encoding time:
243 ms ± 6.33 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
przed kompresja 13

In [18]:
for file in files:
    test_adaptive(file)
    print()

test adaptive 1kB.txt
encoding time:
4.21 ms ± 169 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
przed kompresja 1354
po kompresjii 847
współczynnik kompresji 0.3744460856720827
decoding time:
4.69 ms ± 203 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
equal =  True


test adaptive 10kB.txt
encoding time:
38.4 ms ± 636 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
przed kompresja 13512
po kompresjii 8336
współczynnik kompresji 0.3830669034931913
decoding time:
41.9 ms ± 1.4 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
equal =  True


test adaptive 100kB.txt
encoding time:
386 ms ± 12.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
przed kompresja 135091
po kompresjii 82157
współczynnik kompresji 0.3918395748051313
decoding time:
415 ms ± 11.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
equal =  True


test adaptive 1MB.txt
encoding time:
4.47 s ± 136 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
przed kompresja 135088