In [1]:
from collections import Counter
from bitarray import bitarray
from time import time
import os
from prettytable import PrettyTable
import matplotlib.pyplot as plt
from collections import defaultdict

In [2]:
class Node():
    nodes = []

    def __init__(self, letter, weight=1, parent=None):
        self.letter = letter
        self.weight = weight
        self.parent = parent
        self.children = [None, None]
        Node.nodes[0:0] = [self]
        self.index = -len(Node.nodes)
        self.label = None
        self.id = len(Node.nodes)
        
    def add_child(self, index, node):
        self.children[index] = node
        node.label = index
        
    def increment(self):
        self.weight += 1
        if self.parent:
            for i in range(len(Node.nodes) + self.index + 1, len(Node.nodes)):
                if(self.weight <= Node.nodes[i].weight):
                    other_node = Node.nodes[i-1]
                    if(self != other_node and self.parent != other_node):
                        #print(f"swapping\n{self}\nand\n{other_node}")
                        Node.nodes[self.index] = other_node
                        Node.nodes[other_node.index] = self
                        self.index, other_node.index = other_node.index, self.index
                        self.parent.children[self.label] = other_node
                        other_node.parent.children[other_node.label] = self
                        self.parent, other_node.parent = other_node.parent, self.parent
                        self.label, other_node.label = other_node.label, self.label
                    break
            self.parent.increment()
    def code(self):
        if(self.parent):
            return self.parent.code() + str(self.label)
        else:
            return ''
    def __str__(self, depth=0):
        result = f"#{self.weight}({self.index},{self.label}) "
        #result = ""
        if(self.children[0] is None):
            result += self.letter + "\n"
        else:
            result += "\n"
            result += " " * (depth + 1) + "0 -> " + self.children[0].__str__(depth+1)
            result += " " * (depth + 1) + "1 -> " + self.children[1].__str__(depth+1)
        return result

In [3]:
class HuffmanTree:
    def __init__(self, text=None):
        self.text = text
        self.root = None
        self.letters_counts = None
        self.dictionary = dict()
        self.file_name = None
            
    
    def take_two_min_nodes(self, leafs, internal_nodes):
        all_nodes = leafs + internal_nodes
        element1 = min(all_nodes, key=lambda x:x.weight)
        all_nodes.remove(element1)
        if element1 in leafs:
            leafs.remove(element1)
        else:
            internal_nodes.remove(element1)

        element2 = min(all_nodes, key=lambda x:x.weight)
        all_nodes.remove(element2)
        if element2 in leafs:
            leafs.remove(element2)
        else:
            internal_nodes.remove(element2)

        return element1, element2
    
    
    def create_dictionary(self, node, code=''):
        if node is None:
            return
        if node.letter != "" and node.letter != "#":
            self.dictionary[node.letter] = code
            return
        self.create_dictionary(node.children[0], code+'0')
        self.create_dictionary(node.children[1], code+'1')
        return
    
    
    def create_header(self):
        header = bitarray()
        for key, value in self.letter_counts.items():
            letter = bitarray()
            letter.frombytes(key.encode('utf-8'))
            count = bitarray()
            count.frombytes(value.to_bytes(4, 'big'))
            header += letter+count
        header_length = bitarray()
        header_length.frombytes(len(self.letter_counts).to_bytes(4, 'big'))
        header = header_length + header
        return header
                
            
    def counts_from_header(self, bits):
        letters_no = int.from_bytes(bits[:32].tobytes(), 'big')
        self.dictionary = dict()
        i = 1
        bits = bits[32:]
        for _ in range(letters_no):
            letter = bits[:8].tobytes().decode('utf-8')
            count = int.from_bytes(bits[8:40], 'big')

            self.dictionary[letter] = count
            i+=2
            bits = bits[40:]
        return bits


    def decompress(self, input_name):
        bits = bitarray()
        with open(input_name, 'rb') as input_file:
            bits.fromfile(input_file)
        
        time_start = time()

        bits = self.counts_from_header(bits)
        self.build_tree()
        output = ''
        node = self.root
        for bit in bits:
            if bit == 0:
                node = node.children[0]
            else:
                node = node.children[1]
            if node.letter != '':
                output += node.letter
                node = self.root
                
        time_stop = time()
        with open('decompressed_'+input_name[:-3]+'txt', 'w') as output_file:
            output_file.write(output)
        return time_stop - time_start


In [4]:
class StaticHuffmanTree(HuffmanTree):
    def build_tree(self):
        self.letter_counts = Counter(self.text)
        nodes = [Node(a, weight) for a, weight in self.letter_counts.items()]
        internal_nodes = []
        leafs = sorted(nodes, key=lambda x:x.weight)
        while len(leafs) + len(internal_nodes) > 1:
            element1, element2 = self.take_two_min_nodes(internal_nodes, leafs)
            new_node = Node('', element1.weight + element2.weight)
            new_node.children[0], new_node.children[1] = element1, element2
            internal_nodes.append(new_node)

        self.root = internal_nodes[0]
        
    def compress(self, input_file_name):
        with open(input_file_name, 'r') as file:
            self.text = file.read()
            self.file_name = input_file_name
        
        time_start = time()
        self.build_tree()
        self.dictionary = dict()
        self.create_dictionary(self.root)
        result = bitarray()
        for letter in self.text:
            result += bitarray(self.dictionary[letter])
        result = self.create_header() + result
        time_stop = time()
        
        output_file_name = 'compressed_'+self.file_name[:-3]+'bin'
        with open(output_file_name, 'wb') as output_file:
            result.tofile(output_file)
            
        compressed_size = os.path.getsize(output_file_name)
        original_size = os.path.getsize(self.file_name)
        
        return time_stop - time_start, compressed_size/original_size*100
        

class AdaptiveHuffmanTree(HuffmanTree):
    def __init__(self, text=None):
        super().__init__(text)
        self.result = bitarray()
    
    
    def build_tree(self):
        Node.nodes = []
        count = defaultdict(int)
        nodes = {"#": Node("#", weight=0)}
        root = nodes["#"]
        for letter in list(self.text):
            if letter in nodes:
                node = nodes[letter]
                self.result+=node.code()
                node.increment()
            else:
                updated_node = nodes["#"]
                self.result+=updated_node.code()

                node = Node(letter, parent=updated_node)
                nodes[letter] = node
                del nodes["#"]
                zero_node = Node("#", parent=updated_node, weight=0)
                updated_node.add_child(0, zero_node)
                updated_node.add_child(1, node)
                nodes["#"] = zero_node
                updated_node.increment()
        self.root = root
        self.letter_counts = dict()
    
    def compress(self, input_file_name):
        with open(input_file_name, 'r') as file:
            self.text = file.read()
            self.file_name = input_file_name
        
        self.result = bitarray()
        time_start = time()
        self.build_tree()
        time_stop = time()
        
        output_file_name = 'compressed_'+self.file_name[:-3]+'bin'
        with open(output_file_name, 'wb') as output_file:
            self.result.tofile(output_file)
            
        compressed_size = os.path.getsize(output_file_name)
        original_size = os.path.getsize(self.file_name)
        
        return time_stop - time_start, compressed_size/original_size*100

In [5]:
def tests(Trees, dir_name):
    init_dir = os.getcwd()
    os.chdir(dir_name)
    files = os.listdir()
    results = {Tree:{file:dict() for file in files} for Tree in Trees}
    for Tree in Trees:
        tree = Tree()
        for file in files:
            file_size = os.path.getsize(file)
            results[Tree][file]['file_size'] = file_size

            results[Tree][file]['compression_time'], results[Tree][file]['compression_ratio'] = tree.compress(file)
            compressed_file = 'compressed_'+file[:-3]+'bin'
            
            if Tree is not AdaptiveHuffmanTree:
                results[Tree][file]['decompression_time'] = tree.decompress(compressed_file)
            
            os.remove(compressed_file)
            if Tree is not AdaptiveHuffmanTree:
                os.remove('decompressed_'+compressed_file[:-3]+'txt')
    os.chdir(init_dir)

    return results

In [6]:
test_results = tests([StaticHuffmanTree, AdaptiveHuffmanTree], 'testFiles')

In [7]:
def create_table(dict_result, title):
    d = dict_result
    table = PrettyTable(['']+list(d.keys()))
    table.title = title
    table.add_row(['Size [Bytes]']+[v['file_size'] for v in d.values()])
    table.add_row(['Compression time [s]']+[v['compression_time'] for v in d.values()])
    table.add_row(['Compression ratio [%]']+[v['compression_ratio'] for v in d.values()])
    table.add_row(['Decompression time [s]']+[v.get('decompression_time', '-------') for v in d.values()])
        
    return table

In [8]:
print(create_table(test_results[StaticHuffmanTree], 'StaticHuffmanTree'))

+--------------------------------------------------------------------------------------------------------------------+
|                                                 StaticHuffmanTree                                                  |
+------------------------+---------------------+----------------------+----------------------+-----------------------+
|                        |      large.txt      |      medium.txt      |      small.txt       |        tiny.txt       |
+------------------------+---------------------+----------------------+----------------------+-----------------------+
|      Size [Bytes]      |       1000000       |        100340        |        10034         |          1004         |
|  Compression time [s]  | 0.42800045013427734 | 0.042289018630981445 | 0.005693912506103516 | 0.0008757114410400391 |
| Compression ratio [%]  |        54.994       |  53.59876420171417   |  55.50129559497707   |   71.41434262948208   |
| Decompression time [s] |  0.7803876399993896 |

In [9]:
print(create_table(test_results[AdaptiveHuffmanTree], 'AdaptiveHuffmanTree'))

+--------------------------------------------------------------------------------------------------------------+
|                                             AdaptiveHuffmanTree                                              |
+------------------------+-------------------+--------------------+---------------------+----------------------+
|                        |     large.txt     |     medium.txt     |      small.txt      |       tiny.txt       |
+------------------------+-------------------+--------------------+---------------------+----------------------+
|      Size [Bytes]      |      1000000      |       100340       |        10034        |         1004         |
|  Compression time [s]  | 5.922583818435669 | 0.5719335079193115 | 0.05938243865966797 | 0.006661653518676758 |
| Compression ratio [%]  |      54.9993      | 53.40043850906917  |  53.56786924456847  |  53.48605577689243   |
| Decompression time [s] |      -------      |      -------       |       -------       |       