# Dominik Adamczyk
## Laboratorium 2 - rozwiązania 

#### 1. Implementacja statycznego algorytmu Huffmana

In [48]:
from collections import Counter
from heapq import heapify, heappop, heappush
from bitarray import bitarray, frozenbitarray

class Static_node:
    def __init__(self, symbol, weight, left=None, right=None):
        self.symbol = symbol
        self.weight = weight
        self.left = left
        self.right = right
    
    def __lt__(self, other):
        return self.weight < other.weight
    
def static_huffman_traverse(codes, node, code=bitarray()):
    if node.symbol is not None:
        codes[node.symbol] = frozenbitarray(code)
    else:
        static_huffman_traverse(codes, node.left, code + bitarray('0'))
        static_huffman_traverse(codes, node.right, code + bitarray('1'))

def static_huffman_encoding(data):
    freq = Counter(data)
    heap = [Static_node(symbol, weight) for symbol, weight in freq.items()]
    
    heapify(heap)
    
    while len(heap) > 1:
        node1 = heappop(heap)
        node2 = heappop(heap)
        merged = Static_node(None, node1.weight + node2.weight, node1, node2)
        heappush(heap, merged)

    codes = {}
    static_huffman_traverse(codes, heap[0])

    encoded_data = bitarray()
    for symbol in data:
        encoded_data += codes[symbol] 
    codes =  {code: symbol for symbol, code in codes.items()}

    return encoded_data, codes

def static_huffman_decoding(encoded_data, codes):
    encoded_data = encoded_data.to01()
    decoded_data = ''
    code = ''
    for bit in encoded_data:
        # print
        code += bit
        if code in codes:
            decoded_data += codes[code]
            code = ''
    return decoded_data


#### 2. Implementacja dynamicznego algorytmu Huffmana

In [49]:
# naprawianie
from bitarray import bitarray
class AdaptiveNode:
    def __init__(self, symbol=None, weight=None,parent=None, left=None, right=None, order=None):
        self.symbol = symbol
        self.weight = weight
        self.left = left
        self.right = right
        self.parent = parent
        self.order = order
    

class AdaptiveHuffmanTree:
    def __init__(self):
        self.order = 1
        self.zero = AdaptiveNode(weight=0, order=1, symbol='Zero')
        self.root = self.zero
        self.leaves = {'Zero' : self.zero}
        self.weights = {key: set() for key in range(2)}
        self.weights[0].add(self.root)
    
    def create_node(self, symbol):
        zero = self.zero
        zero.left = AdaptiveNode(weight=0, order=2*zero.order, parent=zero, symbol='Zero')
        zero.right = AdaptiveNode(weight=1, order=2*zero.order+1, parent=zero, symbol=symbol)
        zero.symbol = None
        self.zero = zero.left
        self.weights[0].add(zero.left)
        self.weights[1].add(zero.right)
        self.leaves[symbol] = zero.right
        self.leaves['Zero'] = zero.left
        self.update_tree(zero)
    
    def reconstruct_order(self, node):
        if node.left != None:
            node.left.order = 2 * node.order
            self.reconstruct_order(node.left)
        if node.right != None:
            node.right.order = 2 * node.order + 1
            self.reconstruct_order(node.right)
        
    def swap_nodes(self, n1, n2):
        if n1 != n2:
            if n1.parent == n2.parent:
                if n1 == n1.parent.left:
                    n1.parent.right = n1
                    n1.parent.left = n2
                else:
                    n1.parent.left = n1
                    n1.parent.right = n2
            else:
                if n1 == n1.parent.left:
                    n1.parent.left = n2
                else:
                    n1.parent.right = n2
                if n2 == n2.parent.left:
                    n2.parent.left = n1
                else:
                    n2.parent.right = n1

                n1.parent, n2.parent = n2.parent, n1.parent
            n1.order, n2.order = n2.order, n1.order
            self.reconstruct_order(n1)
            self.reconstruct_order(n2)
            
    
    def update_tree(self, node):
        while node != self.root:
            same_val_node = min(self.weights[node.weight], key=lambda x:x.order)
            if same_val_node != self.root and not self.is_ancestor(node, same_val_node):
                self.swap_nodes(node, same_val_node)
            self.weights[same_val_node.weight].remove(node)
            node.weight += 1
            if node.weight not in self.weights:
                self.weights[node.weight] = set()
            self.weights[node.weight].add(node)
            node = node.parent
    
    def is_ancestor(self, child, ancestor):
        while child != self.root:
            if child == ancestor:
                return True
            child = child.parent
        return False
            
    def get_code(self, symbol):
        node = self.leaves[symbol]
        code = ''
        while node.parent != None:
            if node == node.parent.right:
                code += '1'
            else:
                code += '0'
            node = node.parent
        return bitarray(code[::-1])
    
    def get_symbol(self, code):
        node = self.root
        for el in code:
            if el == '1':
                node = node.right
            else:
                node = node.left
        return node
    
    def encode(self, text):
        encoded = bitarray()
        codebook = []
        for symbol in text:
            if symbol in self.leaves:
                encoded += self.get_code(symbol)
                self.update_tree(self.leaves[symbol])
                
            else:
                codebook.append((symbol, self.get_code('Zero')))
                encoded += self.get_code('Zero')
                self.create_node(symbol)
        return encoded, codebook

    def decode(self, codebook, bits):
        codes = [code.to01() for _, code in codebook]
        bits = bits.to01()
        decoded = ''
        code = ''
        node = self.root
        idx = 0
        while idx < len(bits):
            while not (node.left is None and node.right is None):
                if bits[idx] == '0':
                    node = node.left
                else:
                    node = node.right
                idx += 1
            if node.symbol == 'Zero':
                symbol = codebook[codes.index(self.get_code('Zero').to01())][0]
                codes[codes.index(self.get_code('Zero').to01())] = None
                self.create_node(symbol)
            else:
                symbol = node.symbol
                self.update_tree(node)
            decoded += symbol
            node = self.root
            code = ''
        return decoded


bitarray('11111111010100000010110110001000000000110000100000110111000100010100001') [('l', bitarray()), ('a', bitarray('0')), (' ', bitarray('00')), ('m', bitarray('000')), ('k', bitarray('0110')), ('s', bitarray('00100')), ('d', bitarray('000000')), ('j', bitarray('01100')), ('h', bitarray('001000')), ('o', bitarray('000100')), ('t', bitarray('010100'))]


'lllllllllala ma ksdjhalsota'

#### 3. Opracowanie typu oraz algorytmu kompresji i dekompresji

In [50]:
import struct

def encode_file(filename, static=True):
    with open("text_files/" + filename + ".txt", "r") as f:
        text = f.read()
    if static:
        text_coded, decode_tree = static_huffman_encoding(text)
        with open("compressed_files/" + filename + ".bin", "wb+") as f:
            num_entries = len(decode_tree)
            header = struct.pack("i", num_entries)
            f.write(header)
            bits_to_omit = 0 if len(text_coded) % 8 == 0 else 8 - len(text_coded) % 8
            header2 = struct.pack("B", bits_to_omit)
            f.write(header2)
            for code, letter in decode_tree.items():
                code_len = len(code)
                f.write(struct.pack('B', code_len))
                code_bytes = code.tobytes()
                f.write(code_bytes)
                letter_bytes = letter.encode("utf-8")
                letter_len = len(letter_bytes)
                entry = struct.pack(f'B{letter_len}s', letter_len, letter_bytes)
                f.write(entry)
            text_coded.tofile(f)
    else:
        encoded, codebook = AdaptiveHuffmanTree().encode(text)
        num_entries = len(codebook)
        header = struct.pack("i", num_entries)
        bits_to_omit = 0 if len(encoded) % 8 == 0 else 8 - len(encoded) % 8
        header2 = struct.pack("B", bits_to_omit)
        with open("compressed_files/" + filename + ".bin", "wb+") as f:
            f.write(header)
            f.write(header2)
            for letter, code in codebook:
                code_len = len(code)
                f.write(struct.pack('B', code_len))
                code_bytes = code.tobytes()
                f.write(code_bytes)
                letter_bytes = letter.encode("utf-8")
                letter_len = len(letter_bytes)
                entry = struct.pack(f'B{letter_len}s', letter_len, letter_bytes)
                f.write(entry)
            encoded.tofile(f)
            
def decode_file(filename, static=True):
    if static:
        with open("compressed_files/" + filename + ".bin", "rb") as f:
            header = f.read(4)
            num_entries = struct.unpack('i', header)[0]
            header = f.read(1)
            bits_to_omit = struct.unpack('B', header)[0]
            
            decode_tree = {}
            for _ in range(num_entries):
                len_bits = f.read(1)
                len_bits = struct.unpack('B', len_bits)[0]
                code = bitarray()
                code.fromfile(f, (len_bits + 7) // 8)
                code = code [:len_bits]
                len_letter = f.read(1)
                len_letter = struct.unpack('B', len_letter)[0]
                letter_bytes = f.read(len_letter)
                letter = letter_bytes.decode("utf-8")
                decode_tree[code.to01()] = letter
            coded_data = bitarray()
            coded_data.fromfile(f)
            if bits_to_omit != 0:
                coded_data = coded_data[:-bits_to_omit]
        text = static_huffman_decoding(coded_data, decode_tree) 
    else:
        with open("compressed_files/" + filename + ".bin", "rb") as f:
            header = f.read(4)
            num_entries = struct.unpack('i', header)[0]
            header = f.read(1)
            bits_to_omit = struct.unpack('B', header)[0]
            codebook = []
            for _ in range(num_entries):
                
                len_bits = f.read(1)
                len_bits = struct.unpack('B', len_bits)[0]
                code = bitarray()
                code.fromfile(f, (len_bits + 7) // 8)
                code = code [:len_bits]
                len_letter = f.read(1)
                len_letter = struct.unpack('B', len_letter)[0]
                letter_bytes = f.read(len_letter)
                letter = letter_bytes.decode("utf-8")
                codebook.append((letter, bitarray(code)))
            encoded = bitarray()
            encoded.fromfile(f)
            if bits_to_omit != 0:
                encoded = encoded[:-bits_to_omit]
        text = AdaptiveHuffmanTree().decode(codebook, encoded)
    with open("decompressed_files/" + filename + ".txt", "w+") as f:
        f.write(text)
    


#### Funkcje pomocnicze - mierzenie czasu, porównaywanie plików, generowanie plików

In [51]:
import random, time
import os

def file_compare(filename):
    with open("text_files/" + filename + ".txt", "r") as f1, open("decompressed_files/" + filename + ".txt", "r") as f2:
        f1_cont = f1.read()
        f2_cont = f2.read()
        if f1_cont == f2_cont:
            print(f"Initial, and decompressed file '{filename}.txt' are the same")
        else:
            print(f"Initial, and decompressed file '{filename}.txt' are not samethe ")
    print('\n')

def compare_size(filename):
    original_size = os.path.getsize("text_files/" + filename + ".txt")
    comp_size = os.path.getsize("compressed_files/" + filename + ".bin")
    ratio = (1 - (comp_size / original_size)) * 100
    return ratio

def test_compression(filename, static=True):
    if not static:
        filename += 'Adaptive'
    print(("[Static]" if static else "[Adaptive]") + f"\n[Filename] {filename}.txt" )
    t = time.time()
    encode_file(filename, static)
    print(f'[Encoding time] {time.time() - t}s')
    t = time.time()
    decode_file(filename, static)
    print(f'[Decoding time] {time.time() - t}s')
    print(f'[Compression ratio] {compare_size(filename)}%')
    file_compare(filename)

def generate_uniform_unicode_text(filename, num_chars):
    uni_chars = [chr(i) for i in range(150)]
    
    text = ''.join(random.choices(uni_chars, k = num_chars))
    # print(text)
    with open("text_files/" + filename + ".txt", 'w') as f:
        f.write(text)

# test_compression('uniform100kb', False)

In [52]:
uniform_tests = ['uniform1kb',
                 'uniform10kb',
                 'uniform100kb',
                 'uniform1mb']
for num, name in enumerate(uniform_tests):
    generate_uniform_unicode_text(name, 910 * 10 ** num)
linux_tests = ['halbtc8821a2antc1kb',
               'halbtc8821a2antc10kb',
               'halbtc8821a2antc100kb',
               'halbtc8821a2antc1mb']
book_tests = ['theBrothersKaramazov1kb',
              'theBrothersKaramazov10kb',
              'theBrothersKaramazov100kb',
              'theBrothersKaramazov1mb']

In [53]:
for name in uniform_tests:
    test_compression(name)
    test_compression(name, False)
    # test_compression(name, False)

[Static]
[Filename] uniform1kb.txt
[Encoding time] 0.008056879043579102s
[Decoding time] 0.008947372436523438s
[Compression ratio] -38.43930635838151%
Initial, and decompressed file 'uniform1kb.txt' are the same


[Adaptive]
[Filename] uniform1kbAdaptive.txt
[Encoding time] 0.02482128143310547s
[Decoding time] 0.025200605392456055s
[Compression ratio] -44.688995215311%
Initial, and decompressed file 'uniform1kbAdaptive.txt' are the same


[Static]
[Filename] uniform10kb.txt
[Encoding time] 0.008251667022705078s
[Decoding time] 0.016162872314453125s
[Compression ratio] 15.45142202710441%
Initial, and decompressed file 'uniform10kb.txt' are the same


[Adaptive]
[Filename] uniform10kbAdaptive.txt
[Encoding time] 0.1950685977935791s
[Decoding time] 0.18739795684814453s
[Compression ratio] 14.446243214931908%
Initial, and decompressed file 'uniform10kbAdaptive.txt' are the same


[Static]
[Filename] uniform100kb.txt
[Encoding time] 0.024578571319580078s
[Decoding time] 0.10674166679382324s

In [54]:
for name in book_tests:
    test_compression(name)
    test_compression(name, False)

[Static]
[Filename] theBrothersKaramazov1kb.txt
[Encoding time] 0.00209808349609375s
[Decoding time] 0.006036043167114258s
[Compression ratio] 13.829787234042556%
Initial, and decompressed file 'theBrothersKaramazov1kb.txt' are the same


[Adaptive]
[Filename] theBrothersKaramazov1kbAdaptive.txt
[Encoding time] 0.014789581298828125s
[Decoding time] 0.016117095947265625s
[Compression ratio] 11.605415860735013%
Initial, and decompressed file 'theBrothersKaramazov1kbAdaptive.txt' are the same


[Static]
[Filename] theBrothersKaramazov10kb.txt
[Encoding time] 0.00304412841796875s
[Decoding time] 0.010019540786743164s
[Compression ratio] 38.916303821964206%
Initial, and decompressed file 'theBrothersKaramazov10kb.txt' are the same


[Adaptive]
[Filename] theBrothersKaramazov10kbAdaptive.txt
[Encoding time] 0.12248349189758301s
[Decoding time] 0.09436964988708496s
[Compression ratio] 38.69375907111756%
Initial, and decompressed file 'theBrothersKaramazov10kbAdaptive.txt' are the same


[Stat

In [55]:
for name in linux_tests:
    test_compression(name)
    test_compression(name, False)

[Static]
[Filename] halbtc8821a2antc1kb.txt
[Encoding time] 0.0019998550415039062s
[Decoding time] 0.00661158561706543s
[Compression ratio] 26.356589147286826%
Initial, and decompressed file 'halbtc8821a2antc1kb.txt' are the same


[Adaptive]
[Filename] halbtc8821a2antc1kbAdaptive.txt
[Encoding time] 0.010405540466308594s
[Decoding time] 0.01337885856628418s
[Compression ratio] 26.25968992248062%
Initial, and decompressed file 'halbtc8821a2antc1kbAdaptive.txt' are the same


[Static]
[Filename] halbtc8821a2antc10kb.txt
[Encoding time] 0.0022382736206054688s
[Decoding time] 0.010680437088012695s
[Compression ratio] 30.28031925248199%
Initial, and decompressed file 'halbtc8821a2antc10kb.txt' are the same


[Adaptive]
[Filename] halbtc8821a2antc10kbAdaptive.txt
[Encoding time] 0.11802363395690918s
[Decoding time] 0.11363053321838379s
[Compression ratio] 29.7839205762118%
Initial, and decompressed file 'halbtc8821a2antc10kbAdaptive.txt' are the same


[Static]
[Filename] halbtc8821a2antc10