# Opracowanie formatu pliku:
plik jest podzielony na trzy części:
1. Informacja o drzewie kompresji
2. Ilość nadmiarowych bitów
3. zakodowane dane

## Drzewo kompresji
Drewo jest przechowywane jako ciąg znaków zakodowanych w UTF-8. Znaki te są zapisem liści oraz operacji sklejania drzew (operacjie oznaczam znakiem "__$__") w odwrotnej notacji polskiej. Korzystam z własności że każde drzewo jest sklejone z dwóch (wynika to z tego jak budujemy drzewo).<br>
Informacja na temat drzewa kompresji kończy się znakiem "__#__"<br>
Jeśli w drzewie pojawi się jakiś znak specjalny ["$", "#", "\"] to będzie on porzedzony znakiem ucieczki "__\\__"

## Ilość nadmiarowych bitów
z racji że plik wypełniany jest bitami tak aby sumaryczna ilość bitów była podzielna przez 8 to występuje problem za durzej ilości danych. w tym celu 4 kolejne bity po informacji o drzewie informują ile będzie bitów dopisanych do końca pliku. takich bitów będzię maksymalnie 7 dlategoteż wykorzystuje tylko 4 bity danych do przechowania tej informacji.

## Zakodowane dane
Właściwe dane zakodowane algorytmem Huffmana na których to końcu są dopisane nadmiarowe bity


# Implementacja statycznego algorytmu Huffmana

In [1]:
from queue import PriorityQueue
import bitstring
from collections import deque

class CodingTreeNode:
    def __init__(self,val=None, left=None, right=None):
        # representation atribute is string that represents object using RPN
        # for example string ab$ represents:
        #               .
        # ab$   -->    / \
        #             a   b
        self.left = left
        self.right = right

        if val == "\\":
            val = "\\\\"
        elif val == "$":
            # $ is marker for compere tree operation
            val = "\\$"
        elif val == "#":
            # # is marker for end of tree in compresed file
            val = "\\#"
        self.val = val
        self.parent = None
        if val is None:
            self.representation = left.representation + right.representation + "$"
        else:
            self.representation = val
    
    def __str__(self) -> str:
        return self.representation
    
    def __getitems(self, headCode):
        if self.left is None:
            val = self.val
            if val == "\\\\":
                val = "\\"
            elif val == "\\$":
                # $ is marker for compere tree operation
                val = "$"
            elif val == "\\#":
                # # is marker for end of tree in compresed file
                val = "#"
            return [(val, headCode)]
        else:
            return self.left.__getitems(headCode + "0") + self.right.__getitems(headCode + "1")

    def getCodeDict(self):
        codeDict = dict()
        for key, val in self.__getitems(""):
            codeDict[key] = bitstring.Bits(bin=val)
        return codeDict

    def __lt__(self, other):
        return True

In [2]:
def _merge(a, b):
    incidence = a[0] + b[0]
    merged_tree = CodingTreeNode(left = a[1], right = b[1])
    return (incidence, merged_tree)

def static_huffman(data: str, compered_file_name: str) -> None:
    # calculating incidence of letters
    alphabet = dict()
    for letter in data:
        if letter in alphabet:
            alphabet[letter] += 1
        else:
            alphabet[letter] = 1

    # bilding coding tree
    q = PriorityQueue()
    for key, value in alphabet.items():
        q.put((value, CodingTreeNode(val=key)))
    tree = None
    while True:
        a = q.get()
        if q.empty():
            tree = a[1]
            break
        b = q.get()
        q.put(_merge(a,b))

    # coding data
    codeDisct = tree.getCodeDict()
    coded_data = bitstring.BitArray()
    for letter in data:
            coded_data.append(codeDisct[letter])
    overdata_size = (8 - (len(coded_data)+4) % 8) % 8
    coded_data.insert(bitstring.Bits(int=overdata_size, length=4), 0)
    # saving data to file
    with open(compered_file_name, "w", encoding="utf-8") as f:
        f.write(tree.representation + "#")        
    with open(compered_file_name, "ab") as f:
        coded_data.tofile(f)


# Implementacja dynamicznego algorytmu Huffmana

In [142]:
class DynamicCodingTree:
    def __init__(self) -> None:
        self.NYT = DynamicCodingTreeNodeLeaf(val=None)
        self.root = self.NYT
        self.blocks = DynamicCodingTreeBlocks()
        self.blocks.node_created(self.NYT)

        self.leafs = {None: self.root}

    def __expend_NYT(self, letter):
        expendended_leaf = DynamicCodingTreeNodeLeaf(val=letter)
        self.leafs[letter] = expendended_leaf
        expendended_subtree = DynamicCodingTreeNodeInterial(left=self.NYT, right=expendended_leaf)
        expendended_leaf.parent = expendended_subtree
        
        expendended_subtree.parent = self.NYT.parent
        if self.NYT.parent is None:
            self.root = expendended_subtree
        else:
            if self.NYT == self.NYT.parent.left:
                expendended_subtree.parent.left = expendended_subtree
            else:
                expendended_subtree.parent.right = expendended_subtree
        self.NYT.parent = expendended_subtree
        
        self.blocks.node_created(expendended_subtree)
        self.blocks.node_created(expendended_leaf)

    def __swap_nodes_in_same_block(self, p1, p2):
        if p1 == p2:
            return
        # detect root:
        if p1.parent is None:
            self.root = p2
        elif p2.parent is None:
            self.root = p1
        # detect type of son p1:
        if p1 == p1.parent.left:
            p1.parent.left = p2
        else:
            p1.parent.right = p2
        # detect type of son p2:
        if p2 == p2.parent.left:
            p2.parent.left = p1
        else:
            p2.parent.right = p1
            
        p1.parent, p2.parent = p2.parent, p1.parent
        p1.next_in_block, p2.next_in_block = p2.next_in_block, p1.next_in_block
        self.blocks.node_change_order(p1, p2)

    def __Slide_and_increment(self, p):
        previous_p = p.parent
        w = p.weight
        self.blocks.node_increased(p, w, w + 1)
        p.weight += 1

        if isinstance(p, DynamicCodingTreeNodeInterial):
            p = previous_p
        else:
            p = p.parent
        return p
    
    def input_leter(self, letter: str):
        if letter in self.leafs:
            p = self.leafs.get(letter)
        else:
            p = self.NYT
        
        leaf_to_increment = None
        if p == self.NYT:
            # print("\texpend_NYT")
            self.__expend_NYT(letter)
            p = self.NYT.parent
            leaf_to_increment = p.right
        else:
            # print("\tincrease_old")
            self.__swap_nodes_in_same_block(p, self.blocks.get_leader_leaf(p.weight))
            if p.parent.left == self.NYT:
                leaf_to_increment = p
                p = p.parent
        
        while p is not None:
            p = self.__Slide_and_increment(p)
        
        if leaf_to_increment is not None:
            p = self.__Slide_and_increment(leaf_to_increment)
    
    def cut_NYT(self):
        if self.NYT.parent is None:
            self.root = None
        elif self.NYT.parent.parent is None:
            self.root = self.NYT.parent.right
        else:
            sibling = self.NYT.parent.right
            parent = sibling.parent
            if parent == parent.parent.left:
                parent.parent.left = sibling
            else:
                parent.parent.right = sibling
            sibling.parent = parent
    
    def get_string_code(self):
        if self.root is None:
            return ""
        def _get_string_code(node):
            if isinstance(node, DynamicCodingTreeNodeLeaf):
                val = node.value
                if val == "\\":
                    val = "\\\\"
                elif val == "$":
                    # $ is marker for compere tree operation
                    val = "\\$"
                elif val == "#":
                    # # is marker for end of tree in compresed file
                    val = "\\#"
                elif val is None:
                    val = "[NYT]"
                return val
            else:
                return _get_string_code(node.left) + _get_string_code(node.right) + "$"
        return _get_string_code(self.root)
    
    def __get_items_code(self, node, headCode):
        if isinstance(node, DynamicCodingTreeNodeLeaf):
            val = node.value
            if val == "\\\\":
                val = "\\"
            elif val == "\\$":
                # $ is marker for compere tree operation
                val = "$"
            elif val == "\\#":
                # # is marker for end of tree in compresed file
                val = "#"
            return [(val, headCode)]
        else:
            return self.__get_items_code(node.left, headCode + "0") + self.__get_items_code(node.right, headCode + "1")

    def getCodeDict(self):
        codeDict = dict()
        for key, val in self.__get_items_code(self.root,""):
            codeDict[key] = bitstring.Bits(bin=val)
        return codeDict
        


class DynamicCodingTreeBlocks:
    def __init__(self) -> None:
        #  Leaders of weight w is the highest ordered node beyond his type and weight w
        self.leaders_leaf = dict()
        self.leaders_internal = dict()
        
        #  Slaves of weight w is the lowest ordered node beyond his type and weight w
        self.slaves_leaf = dict()
        self.slaves_internal = dict()
    
    def __add_at_the_end(self, node, weight):
        if isinstance(node, DynamicCodingTreeNodeInterial):
            # print("\t\t internal")
            if weight in self.leaders_internal and self.leaders_internal[weight] is not None:
                old_slave = self.slaves_internal[weight]
                old_slave.next_in_block = node
                node.previous_in_block = old_slave
                self.slaves_internal[weight] = node
            else:
                self.leaders_internal[weight] = node
                self.slaves_internal[weight] = node
        else:
            # print("\t\t leaf")
            if weight in self.leaders_leaf and self.leaders_leaf[weight] is not None:
                old_slave = self.slaves_leaf[weight]
                old_slave.next_in_block = node
                node.previous_in_block = old_slave
                self.slaves_leaf[weight] = node
            else:
                self.leaders_leaf[weight] = node
                self.slaves_leaf[weight] = node

    def __remove_from_block(self, node, weight):
        is_node_at_the_end = False
        if isinstance(node, DynamicCodingTreeNodeInterial):
            if node == self.leaders_internal[weight]:
                self.leaders_internal[weight] = node.next_in_block
                is_node_at_the_end = True
            if node == self.slaves_internal[weight]:
                self.slaves_internal[weight] = node.previous_in_block
                is_node_at_the_end = True
        else:
            if node == self.leaders_leaf[weight]:
                self.leaders_leaf[weight] = node.next_in_block
                is_node_at_the_end = True
            if node == self.slaves_leaf[weight]:
                self.slaves_leaf[weight] = node.previous_in_block
                is_node_at_the_end = True
        if not is_node_at_the_end:
            node.previous_in_block.next_in_the_block = node.next_in_block
            node.next_in_block.previous_in_the_block = node.previous_in_block
        
        node.next_in_block = None
        node.previous_in_block = None

    def node_created(self, node):
        # if node is just created then have weight equal 0
        # print("\tnew", 0)
        self.__add_at_the_end(node, 0)
    
    def node_increased(self, node, old_weight, new_weight):
        # print("\tchange", old_weight, new_weight)
        self.__remove_from_block(node, old_weight)
        self.__add_at_the_end(node, new_weight)
        
    
    def node_change_order(self, *nodes):
        for node in nodes:
            if node.previous_in_block is None: # is leader
                if isinstance(node, DynamicCodingTreeNodeInterial):
                    self.leaders_internal[node.weight] = node
                else:
                    self.leaders_leaf[node.weight] = node

            if node.next_in_block is None: #is slave
                if isinstance(node, DynamicCodingTreeNodeInterial):
                    self.slaves_internal[node.weight] = node
                else:
                    self.slaves_leaf[node.weight] = node
    
    def get_leader_leaf(self, weight):
        return self.leaders_leaf[weight]


class DynamicCodingTreeNodeInterial:
    def __init__(self, left=None, right=None, parent=None) -> None:
        self.left = left
        self.right = right
        self.parent = None
        self.next_in_block = None
        self.previous_in_block = None
        self.weight = 0


class DynamicCodingTreeNodeLeaf:
    def __init__(self, val, parent=None) -> None:
        self.parent = None
        self.previous_in_block = None
        self.next_in_block = None
        # val equal None mean Not Yet Transfered Node
        self.value = val
        self.weight = 0

In [151]:
def dynamic_huffman(data: str, compered_file_name: str) -> None:
    # building coding tree:
    tree = DynamicCodingTree()
    for letter in data:
        tree.input_leter(letter)
    tree.cut_NYT()
    
    # coding data
    codeDisct = tree.getCodeDict()
    coded_data = bitstring.BitArray()
    for letter in data:
            coded_data.append(codeDisct[letter])
    overdata_size = (8 - (len(coded_data)+4) % 8) % 8
    coded_data.insert(bitstring.Bits(int=overdata_size, length=4), 0)
    # saving data to file
    with open(compered_file_name, "w", encoding="utf-8") as f:
        f.write(tree.get_string_code() + "#")        
    with open(compered_file_name, "ab") as f:
        coded_data.tofile(f)


# dekodowanie

In [153]:
class BinTree:
    def __init__(self, left, right) -> None:
        self.left = left
        self.right = right

def rebuild_tree(tree_string):
    Nodes = []
    i = 0
    n = len(tree_string)
    while i < n:
        letter = tree_string[i]
        if letter == "\\":
            Nodes.append(tree_string[i:i+2])
            i += 2
        else:
            Nodes.append(letter)
            i += 1
    lowest = len(Nodes) - 1
    def _rebuild_tree(Nodes, i):
        nonlocal lowest
        lowest = min(lowest, i)
        if Nodes[i] == "$":
            right = _rebuild_tree(Nodes, lowest-1)
            left = _rebuild_tree(Nodes, lowest-1)
            return BinTree(left, right)
        elif Nodes[i][0] == "\\":
            return Nodes[i][1:]
        else:
            return Nodes[i]
        
    return _rebuild_tree(Nodes, lowest)


def decode_huffman(file: str) -> str:
    tree_str = ""
    with open(file, "rb") as f:
        # reading tree
        char = f.read(1).decode("utf-8")
        while char != "#":
            tree_str += char
            if char == "\\":
                tree_str += f.read(1).decode("utf-8")
            char = f.read(1).decode("utf-8")

        # reading data
        data = bitstring.BitArray(f.read())
        
    # decoding data
    root = rebuild_tree(tree_str)
    wanderer = root
    decoded_text = ""
    overdata_size = data[:4].int
    for bit in data[4:-overdata_size+1]:
        if not isinstance(wanderer, BinTree):
            decoded_text += wanderer
            wanderer = root
        if bit:
            wanderer = wanderer.right
        else:
            wanderer = wanderer.left
    return decoded_text

    

# Analiza algorytmów kompresji

In [154]:
with open("test_file2", "r") as f:
    text = f.read()
static_huffman(text, "some_file")

text2 = decode_huffman("some_file")

if text == text2:
    print("hurra")
else:
    print(text)
    print(text2)

hurra


In [155]:
with open("test_file2", "r") as f:
    text = f.read()
dynamic_huffman(text, "some_file")

text2 = decode_huffman("some_file")

if text == text2:
    print("hurra")
else:
    print(text)
    print(text2)

hurra
