# Statyczny algorytm Huffmana

In [2]:
class Node:
    def __init__(self, letter, weight, child1=None, child2=None):
        self.weight = weight
        self.letter = letter
        self.left = child1
        self.right = child2
        self.bit = None
    
    # methods display and _display_aux from 
    # https://stackoverflow.com/questions/34012886/print-binary-tree-level-by-level-in-python/40885162
    def display(self):
        lines, *_ = self._display_aux()
        for line in lines:
            print(line)

    def _display_aux(self):
        
        # No child.
        if self.right is None and self.left is None:
            line = '%s' % str(self.weight) + "-" + str( self.letter)
            width = len(line)
            height = 1
            middle = width // 2
            return [line], width, height, middle

        # Only left child.
        if self.right is None:
            lines, n, p, x = self.left._display_aux()
            s = '%s' % self.weight
            u = len(s)
            first_line = (x + 1) * ' ' + (n - x - 1) * '_' + s
            second_line = x * ' ' + '/' + (n - x - 1 + u) * ' '
            shifted_lines = [line + u * ' ' for line in lines]
            return [first_line, second_line] + shifted_lines, n+u, p+2, n+u//2

        # Only right child.
        if self.left is None:
            lines, n, p, x = self.right._display_aux()
            s = '%s' % self.weight
            u = len(s)
            first_line = s + x * '_' + (n - x) * ' '
            second_line = (u + x) * ' ' + '\\' + (n - x - 1) * ' '
            shifted_lines = [u * ' ' + line for line in lines]
            return [first_line, second_line] + shifted_lines, n+u, p+2, u//2

        # Two children.
        left, n, p, x = self.left._display_aux()
        right, m, q, y = self.right._display_aux()
        s = '%s' % self.weight
        u = len(s)
        first_line = (x + 1) * ' ' + (n - x - 1) * '_' + s+y *'_'+ (m-y) * ' '
        second_line = x * ' ' + '/' + (n - x - 1 + u+y)*' '+'\\'+(m-y-1)*' '
        if p < q:
            left += [n * ' '] * (q - p)
        elif q < p:
            right += [m * ' '] * (p - q)
        zipped_lines = zip(left, right)
        lines = [first_line, second_line] + [a+u*' '+b for a, b in zipped_lines]
        return lines, n + m + u, max(p, q) + 2, n + u // 2

    
    

# w internal_nodes są zawsze elementy posortowane rosnąco
def get_min(nodes, internal_nodes):
    node1 = None
    node2 = None
    
    if len(internal_nodes) == 1 and len(nodes)==1:
        node1 = nodes[0]
        node2 = internal_nodes[0]
        internal_nodes.pop(0)
        nodes.pop(0)
        return node1, node2
    
    if not internal_nodes and len(nodes)>1:
        node1 = nodes[0]
        node2 = nodes[1]
        nodes.pop(0)
        nodes.pop(0)
        return node1, node2
    
    if not nodes and len(internal_nodes)>1:
        node1 = internal_nodes[0]
        node2 = internal_nodes[1]
        internal_nodes.pop(0)
        internal_nodes.pop(0)
        return node1, node2
    
    if len(nodes)==1:
        if nodes[0].weight < internal_nodes[1].weight:
            node1 = nodes[0]
            node2 = internal_nodes[0]
            nodes.pop(0)
            internal_nodes.pop(0)
            return node1, node2
        else:
            node1 = internal_nodes[0]
            node2 = internal_nodes[1]
            internal_nodes.pop(0)
            internal_nodes.pop(0)
            return node1, node2
         
    if len(internal_nodes)==1:
        if internal_nodes[0].weight < nodes[1].weight:
            node1 = internal_nodes[0]
            node2 = nodes[0]
            internal_nodes.pop(0)
            nodes.pop(0)
            return node1, node2
        else:
            node1 = nodes[0]
            node2 = nodes[1]
            nodes.pop(0)
            nodes.pop(0)
            return node1, node2
    i = 0
    ans = [None, None]
    while i<2:
        if nodes[0].weight < internal_nodes[0].weight:
            ans[i] = nodes[0]
            nodes.pop(0)
        else:
            ans[i] = internal_nodes[0]
            internal_nodes.pop(0)
        i+=1
    return ans[0], ans[1]


def letter_dict(text):
    letter_counts = dict()
    for l in text:
        if l in letter_counts.keys():
            letter_counts[l]+=1
        else:
            letter_counts[l]=1
    return letter_counts



def static_Huffman(text):
    letter_counts = letter_dict(text)
    nodes = []
    for a, weight in letter_counts.items():
        nodes.append(Node(a, weight, None, None))
    internal_nodes = []
    leafs = sorted(nodes, key = lambda n: n.weight)
    while (len(leafs) + len(internal_nodes) > 1):
        element1, element2 = get_min(leafs, internal_nodes)
        if element1.weight < element2.weight:
            internal_nodes.append(Node("", element1.weight+element2.weight,element1, element2))
        else:
            internal_nodes.append(Node("", element1.weight+element2.weight,element2, element1))
    
    return internal_nodes[0] #root

In [3]:
root = static_Huffman("abracadabra")
root.display()

  _11_______        
 /          \       
5-a      ___6___    
        /       \   
       _2_     _4_  
      /   \   /   \ 
     1-d 1-c 2-r 2-b


### Kompresja i dekompresja

In [5]:
def get_letter_code(root, curr, dct):
    if not root.left and not root.right:
        dct[root.letter] = curr
    
    if root.left:
        get_letter_code(root.left, curr + "0", dct)
        
    if root.right:
        get_letter_code(root.right, curr + "1", dct)       
    

In [45]:
def write_tree(root, ans):
    if not root.left and not root.right:
        ans.append(root.letter)
    else:
        ans.append("1")
        write_tree(root.left, ans)
        write_tree(root.right, ans)
    return ans

In [46]:
root = static_Huffman("abracadabra")
root.display()

ans = write_tree(root, [])
print(ans)

  _11_______        
 /          \       
5-a      ___6___    
        /       \   
       _2_     _4_  
      /   \   /   \ 
     1-d 1-c 2-r 2-b
['1', 'a', '1', '1', 'd', 'c', '1', 'r', 'b']


In [47]:
def read_tree(stream):
    if stream:
        next_bit = stream.pop(0)
        if next_bit == "1":
            root = Node("", 0)
            root.left = read_tree(stream)
            if root.left:
                root.left.bit = "0"
            root.right = read_tree(stream)
            if root.right:
                root.right.bit = "1"
        else:
            root = Node(next_bit, 0)

        return root
    else:
        return None
    

In [48]:
root = read_tree(ans)

In [49]:
root.display()

  _0_______        
 /         \       
0-a     ___0___    
       /       \   
      _0_     _0_  
     /   \   /   \ 
    0-d 0-c 0-r 0-b


In [50]:
def compression(original_file, destination_file):
    text = open(original_file, "r")
    text_str = text.read()
    root = static_Huffman(text_str)
    #root.display()
    saved_tree = write_tree(root, [])
    text.close()
    
    file = open(destination_file, "w")
    for i in saved_tree:
        file.write(i)
    file.write("#")
    
    dct = dict()
    get_letter_code(root, "", dct)
    i = 0
    for letter in text_str:
        file.write(dct[letter])
        if i >50: # dodaję \n co 50 linijek, bo nie radzi sobie z wczytaniem 
            i = 0
            file.write("\n")
        i +=1
    file.close()

In [51]:
def decompression(compressed_file, decompressed_file):
    
    tree = []
    file = open(compressed_file, "r")
    result = open(decompressed_file, "w")
    
    char = file.read(1)
    tree.append(char)
    while char!="#":
        char = file.read(1)
        tree.append(char)
    
    root = read_tree(tree)
    char = file.read(1)
    curr = root
    while char:
        while curr.left and curr.right:
            if curr.left.bit == char:
                curr = curr.left
            else:
                curr = curr.right
            char = file.read(1)
            if char == "\n":
                char = file.read(1)
        result.write(curr.letter)
        curr = root
    
    file.close()
    result.close()
    
    

In [52]:
decompression("test1.txt", "test1_de.txt")

### Czas kompresji (1kB, 10kB, 100kB, 1MB)

In [54]:
import time
start = time.time()
compression("text1.txt", "text1_co.txt")
end = time.time()
print("Statyczny Huffman - czas kompresji 1kB: ", end - start)

start = time.time()
decompression("text1_co.txt", "text1_de.txt")
end = time.time()
print("Statyczny Huffman - czas dekompresji 1kB: ", end - start)

start = time.time()
compression("text2.txt", "text2_co.txt")
end = time.time()
print("Statyczny Huffman - czas kompresji 10kB: ", end - start)

start = time.time()
decompression("text2_co.txt", "text2_de.txt")
end = time.time()
print("Statyczny Huffman - czas dekompresji 10kB: ", end - start)


start = time.time()
compression("text3.txt", "text3_co.txt")
end = time.time()
print("Statyczny Huffman - czas kompresji 100kB: ", end - start)

start = time.time()
decompression("text3_co.txt", "text3_de.txt")
end = time.time()
print("Statyczny Huffman - czas dekompresji 100kB: ", end-start)


start = time.time()
compression("text4.txt", "text4_co.txt")
end = time.time()
print("Statyczny Huffman - czas kompresji 1MB: ", end - start)

start = time.time()
decompression("text4_co.txt", "text4_de.txt")
end = time.time()
print("Statyczny Huffman - czas dekompresji 1MB: ", end-start)


Statyczny Huffman - czas kompresji 1kB:  0.010809183120727539
Statyczny Huffman - czas dekompresji 1kB:  0.0018053054809570312
Statyczny Huffman - czas kompresji 10kB:  0.005388975143432617
Statyczny Huffman - czas dekompresji 10kB:  0.01900482177734375
Statyczny Huffman - czas kompresji 100kB:  0.02809309959411621
Statyczny Huffman - czas dekompresji 100kB:  0.11828827857971191
Statyczny Huffman - czas kompresji 1MB:  0.2772092819213867
Statyczny Huffman - czas dekompresji 1MB:  1.3059625625610352


### Współczynnik kompresji

W oryginalnych plikach wszystkie litery mają rozmiar 7 bitów, w "skompresowanym" pliku 0 i 1 są zapisane jako stringi stąd ich rozmiar w bajtach jest dużo większy od orygialnego pliku.
Żeby obliczyć współczynnik kompresji mnożę rozmiar oryginalnego pliku przez 7 (zamiast zamieniać wszystkie litery na ich kod ASCII).

In [55]:
from pathlib import Path
file = Path() / 'text1.txt'
orig_size = 7*file.stat().st_size
file = Path() / 'text1_co.txt'
comp_size = file.stat().st_size
print("Statyczny Huffman - współczynnik kompresji 1kB: ", 1-comp_size/orig_size)


file = Path() / 'text2.txt'
orig_size = 7*file.stat().st_size
file = Path() / 'text2_co.txt'
comp_size = file.stat().st_size

print("Statyczny Huffman - współczynnik kompresji 10kB: ", 1-comp_size/orig_size)
      

file = Path() / 'text3.txt'
orig_size = 7*file.stat().st_size
file = Path() / 'text3_co.txt'
comp_size = file.stat().st_size

print("Statyczny Huffman - współczynnik kompresji 100kB: ", 1-comp_size/orig_size)
      

file = Path() / 'text4.txt'
orig_size = 7*file.stat().st_size
file = Path() / 'text4_co.txt'
comp_size = file.stat().st_size

print("Statyczny Huffman - współczynnik kompresji 1MB: ", 1-comp_size/orig_size)      


Statyczny Huffman - współczynnik kompresji 1kB:  0.37207207207207205
Statyczny Huffman - współczynnik kompresji 10kB:  0.3678315058767847
Statyczny Huffman - współczynnik kompresji 100kB:  0.3726181761137837
Statyczny Huffman - współczynnik kompresji 1MB:  0.37407928033679705


# Dynamiczny algorytm Huffmana 

In [93]:
from queue import Queue
class Node:
    def __init__(self, letter, weight, idx=-1, child1=None, child2=None, parent = None):
        self.weight = weight
        self.letter = letter
        self.left = child1
        self.right = child2
        self.bit = -1
        self.parent = parent
        self.index = idx
    
    # methods display and _display_aux from
    # https://stackoverflow.com/questions/34012886/print-binary-tree-level-by-level-in-python/40885162
    def display(self):
        lines, *_ = self._display_aux()
        for line in lines:
            print(line)

    def _display_aux(self):
        # No child.
        if self.right is None and self.left is None:
            line = '%s' % str(self.weight) + "-" + str( self.letter)
            width = len(line)
            height = 1
            middle = width // 2
            return [line], width, height, middle

        # Only left child.
        if self.right is None:
            lines, n, p, x = self.left._display_aux()
            s = '%s' % self.weight
            u = len(s)
            first_line = (x + 1) * ' ' + (n - x - 1) * '_' + s
            second_line = x * ' ' + '/' + (n - x - 1 + u) * ' '
            shifted_lines = [line + u * ' ' for line in lines]
            return [first_line, second_line] + shifted_lines, n+u, p+2,n+u//2

        # Only right child.
        if self.left is None:
            lines, n, p, x = self.right._display_aux()
            s = '%s' % self.weight
            u = len(s)
            first_line = s + x * '_' + (n - x) * ' '
            second_line = (u + x) * ' ' + '\\' + (n - x - 1) * ' '
            shifted_lines = [u * ' ' + line for line in lines]
            return [first_line, second_line] + shifted_lines, n+u, p+2,u//2

        # Two children.
        left, n, p, x = self.left._display_aux()
        right, m, q, y = self.right._display_aux()
        s = '%s' % self.weight
        u = len(s)
        first_line = (x + 1) * ' ' + (n - x - 1) * '_' + s+y*'_'+(m-y)*' '
        second_line = x * ' ' + '/' + (n -x-1+u+y)* ' '+'\\'+ (m-y-1)* ' '
        if p < q:
            left += [n * ' '] * (q - p)
        elif q < p:
            right += [m * ' '] * (p - q)
        zipped_lines = zip(left, right)
        lines = [first_line, second_line] + [a+u*' '+b for a, b in zipped_lines]
        return lines, n + m + u, max(p, q) + 2, n + u // 2
    
    
    
    def increment(self, root, nodes):
        
        self.weight +=1
        
        leader = find_leader(self, nodes)
        if not (leader == self or self.parent == leader or leader.parent == self):
            # SWAP
            if self.parent == leader.parent:
                if self.parent.left == self:
                    self.parent.left = leader
                    self.parent.right = self
                else:
                    self.parent.left = self
                    self.parent.right = leader
            else:
                self_parent = self.parent
                leader_parent = leader.parent
                if self == self_parent.left:
                    self_parent.left = leader
                else:
                    self_parent.right = leader
                
                if leader == leader_parent.left:
                    leader_parent.left = self
                else:
                    leader_parent.right = self
                    
                self.parent = leader_parent
                leader.parent = self_parent
            nodes = update_indexes(root)

        if self.parent:
            self = self.parent
            self.increment(root, nodes)
            
            
    def add_child(self, node):
        if not self.left:
            self.left = node
            return
        if not self.right:
            self.right = node
            return
        
    def code(self):
        ans = ""
        curr = self
        while curr.parent:
            if curr.parent.left == curr:
                ans += "0"
            else:
                ans += "1"
            curr = curr.parent
            
        return ans[::-1]
        

In [76]:
def update_indexes(root):
    nodes = []
    queue = Queue()
    queue.put(root)
    index = 0
    while not queue.empty():
        node = queue.get()
        nodes.append(node)
        node.index = index
        if node.right is not None:
            queue.put(node.right)
        if node.left is not None:
            queue.put(node.left)
        index += 1
    return nodes

In [95]:
def find_leader(node, nodes):
    index = node.index - 1
    while 0 <= index and nodes[index].weight < node.weight:
        index -= 1
    return nodes[index + 1]

In [102]:
from collections import defaultdict

def dynamic_Huffman(text, file):
    count = defaultdict(int)
    nodes = {"#": Node("#", 0, idx=0)}
    root = nodes["#"]
    
    text = open(text, "r")
    file = open(file, "w")
    
    indexed_nodes = [root]
    
    letter = text.read(1)
    next_letter = text.read(1)
    i = 0
    while letter:
        if letter in nodes.keys():
            node = nodes[letter]
            file.write(node.code())
            node.increment(root, indexed_nodes) 
            i +=5
        else: 
            updated_node = nodes["#"]
            file.write(updated_node.code())
            let = "{0:b}".format(ord(letter))
            
            # wszystkie kody ASCII muszą być 7 bitowe
            
            if len(let)==6:
                let = "0" + let
            if len(let)==5:
                let = "00" + let
            if len(let)==4:
                let = "000" + let
                
            file.write(let)
            node = Node(letter, 1, parent = updated_node)
            nodes[letter] = node
            del nodes["#"]
            zero_node = Node("#", parent = updated_node, weight = 0)
            updated_node.add_child(zero_node)
            updated_node.add_child(node)      
            nodes["#"] = zero_node
            updated_node.increment(root, indexed_nodes)
            
            i+=7
            
        indexed_nodes = update_indexes(root)
        letter = next_letter
        
        if i>100:
            file.write('\n')
            i=0
            
        
        next_letter = text.read(1)
        if not next_letter:
            break
        
    file.close()
    text.close()
    
    return root
            
    

In [103]:
def decode_dynamic_Huffman(file, result):
    file = open(file, "r")
    result = open(result, "w")
    nodes = {"#": Node("#", 0, idx=0)}
    root = nodes["#"]
    indexed_nodes = [root]
    curr = root
    bit = file.read(1)
    while bit:
        code = ""
        while curr.left and curr.right:
            code += bit
            if bit == "0":
                curr = curr.left
            else:
                curr = curr.right
            
            bit = file.read(1)
            if bit == '\n':
                bit = file.read(1)
        
        if curr.letter == "#":
            let = bit + file.read(6)
            letter = text_from_bits(let)
            result.write(letter)
            bit = file.read(1)
            if bit == '\n':
                bit = file.read(1)
        else:
            letter = curr.letter
            result.write(letter)
        
        # UPDATE
        
        if letter in nodes.keys():
            node = nodes[letter]
            node.increment(root, indexed_nodes) 
        else: #
            updated_node = nodes["#"]
            node = Node(letter, 1, parent = updated_node)
            nodes[letter] = node
            del nodes["#"]
            zero_node = Node("#", parent = updated_node, weight = 0)
            updated_node.add_child(zero_node) # najpierw dodaję po lewej
            updated_node.add_child(node)      # poźniej prawe dziecko
            nodes["#"] = zero_node
            updated_node.increment(root, indexed_nodes)
        indexed_nodes = update_indexes(root)
        curr = root
    
    file.close()
    result.close()
    

In [108]:
# https://stackoverflow.com/questions/7396849/convert-binary-to-ascii-and-vice-versa
import binascii

def int2bytes(i):
    hex_string = '%x' % i
    n = len(hex_string)
    return binascii.unhexlify(hex_string.zfill(n + (n & 1)))

def text_from_bits(bits, encoding='utf-8', errors='surrogatepass'):
    n = int(bits, 2)
    return int2bytes(n).decode(encoding, errors)


lett_a = ("{0:b}".format(ord("a")))
print(lett_a)

1100001


In [111]:
root = dynamic_Huffman("test0.txt", "text0d_co.txt")
root.display()
decode_dynamic_Huffman("text0d_co.txt", "text0d_de.txt")

                             ___________________________40_____                         
                            /                                  \                        
                ___________16_______________                 _24____                    
               /                            \               /       \                   
        _______8___              ___________8_______      11-A    _13___                
       /           \            /                   \            /      \               
    ___4___       _4___        _4_______         ___4_          6-v    _7_______        
   /       \     /     \      /         \       /     \               /         \       
  _2_     _2_   2-E   _2_    2-r     ___2_     _2_   2-U             3-a     ___4___    
 /   \   /   \       /   \          /     \   /   \                         /       \   
1-  1-N 1-k 1-O     1-f 1-Q        _1_   1-D 1-d 1-G                       _2_     _2_  
                     

In [106]:
start = time.time()
dynamic_Huffman("text1.txt", "text1d_co.txt")
end = time.time()
print("Dynamiczny Huffman - czas kompresji 1kB: ", end - start)

start = time.time()
decode_dynamic_Huffman("text1d_co.txt", "text1d_de.txt")
end = time.time()
print("Dynamiczny Huffman - czas dekompresji 1kB: ", end - start)

start = time.time()
dynamic_Huffman("text2.txt", "text2d_co.txt")
end = time.time()
print("Dynamiczny Huffman - czas kompresji 10kB: ", end - start)

start = time.time()
decode_dynamic_Huffman("text2d_co.txt", "text2d_de.txt")
end = time.time()
print("Dynamiczny Huffman - czas dekompresji 10kB: ", end - start)


start = time.time()
dynamic_Huffman("text3.txt", "text3d_co.txt")
end = time.time()
print("Dynamiczny Huffman - czas kompresji 100kB: ", end - start)

start = time.time()
decode_dynamic_Huffman("text3d_co.txt", "text3d_de.txt")
end = time.time()
print("Dynamiczny Huffman - czas dekompresji 100kB: ", end-start)


start = time.time()
#dynamic_Huffman("text4.txt", "text4d_co.txt")
end = time.time()
print("Dynamiczny Huffman - czas kompresji 1MB: ", ">120s")

start = time.time()
#decode_dynamic_Huffman("text4d_co.txt", "text4d_de.txt")
end = time.time()
print("Dynamiczny Huffman - czas dekompresji 1MB: ", ">120s")

Dynamiczny Huffman - czas kompresji 1kB:  0.3932020664215088
Dynamiczny Huffman - czas dekompresji 1kB:  0.39952945709228516
Dynamiczny Huffman - czas kompresji 10kB:  4.120493412017822
Dynamiczny Huffman - czas dekompresji 10kB:  3.8496525287628174
Dynamiczny Huffman - czas kompresji 100kB:  42.24323582649231
Dynamiczny Huffman - czas dekompresji 100kB:  37.47110724449158
Dynamiczny Huffman - czas kompresji 1MB:  >120s
Dynamiczny Huffman - czas dekompresji 1MB:  >120s


In [107]:
file = Path() / 'text1.txt'
orig_size = 7*file.stat().st_size
file = Path() / 'text1d_co.txt'
comp_size = file.stat().st_size
print("Dynamiczny Huffman - współczynnik kompresji 1kB: ", 1-comp_size/orig_size)


file = Path() / 'text2.txt'
orig_size = 7*file.stat().st_size
file = Path() / 'text2d_co.txt'
comp_size = file.stat().st_size

print("Dynamiczny Huffman - współczynnik kompresji 10kB: ", 1-comp_size/orig_size)
      

file = Path() / 'text3.txt'
orig_size = 7*file.stat().st_size
file = Path() / 'text3d_co.txt'
comp_size = file.stat().st_size

print("Dynamiczny Huffman - współczynnik kompresji 100kB: ", 1-comp_size/orig_size)
      

file = Path() / 'text4.txt'
orig_size = 7*file.stat().st_size
#file = Path() / 'text4d_co.txt'
#comp_size = file.stat().st_size

#print("Dynamiczny Huffman - współczynnik kompresji 1MB: ", 1-comp_size/orig_size)     

Dynamiczny Huffman - współczynnik kompresji 1kB:  0.335006435006435
Dynamiczny Huffman - współczynnik kompresji 10kB:  0.3589571665220478
Dynamiczny Huffman - współczynnik kompresji 100kB:  0.3680362371888726
