## Laboratorium 2


### Huffman's static algorithm

In [3]:
import time
import heapq
from bitarray import bitarray
from bitarray.util import ba2int, int2ba

Algorytm do utworzenia drzewa dla skompresowania danych - Huffman statyczny 

In [4]:
BITS_IN_BYTE = 8
BIT = 1
ONE = bitarray("00")
TWO = bitarray("01")
THREE = bitarray("10")
FOUR = bitarray("11")

class Node():
    def __init__(self, value, char = None, parent = None, left = None, right = None, index = 0, weight = 0):
        self.left = left
        self.right = right
        self.value = value
        self.char = char
        self.huff = ''
        self.parent = parent
        self.index = index
        self.weight = weight

    def  __lt__(self, nxt):
        return self.value < nxt.value

    def __str__(self) -> str:
        return "{} {}".format(str(self.value),str(self.huff))


class StaticHuffmanTree():

    def __init__(self, filename):
        self.root = None
        self.filename = filename
        self.text = ""
        self.char_freq = dict()
        self.prefix_code = dict()
        self.bits_to_save = bitarray("10000000")
        # self.bits_to_save = bitarray()
        self.bit_counter = 0


    def createHuffmanTree(self):
        self.__readAllFile()
        self.__loadChars()
        
        Q = []
        for item in self.char_freq.items():
            heapq.heappush(Q, (item[1], Node(item[1], item[0])))

        n = len(Q)
        for _ in range(n-1):
            left = heapq.heappop(Q)
            left[1].huff = 0b0
            right = heapq.heappop(Q)
            right[1].huff = 0b1
            value = left[0] + right[0]
            z = Node(value, left=left[1], right=right[1])
            heapq.heappush(Q, (value, z))
            
        self.root = heapq.heappop(Q)[1]
        
        self.__createHuffmanCode(self.root, bitarray())



    def compressData(self, file_to_write):
        self.__saveHuffmanCode(self.root)
        # self.bits_to_save.fill()
        for i in self.text:
            self.bits_to_save.extend(self.prefix_code[i])
        self.__lastBits(len(self.bits_to_save))
        with open(file_to_write,"wb") as f:
            self.bits_to_save.tofile(f)



    def __createHuffmanCode(self, node : Node, huff_code : bitarray):
        if node.char is not None:
            self.prefix_code[node.char] = huff_code
            return
        
        code = huff_code.copy()
        if node.left is not None:
            code.append(0)
            self.__createHuffmanCode(node.left, code)

        if node.right is not None:
            code = huff_code.copy()
            code.append(1)
            self.__createHuffmanCode(node.right, code)



    def __saveHuffmanCode(self, node: Node):
        if node.char is not None:
            self.bits_to_save.append(1)
            ba = bitarray()
            ba.frombytes(node.char.encode("utf-8"))
            utf_len = len(ba) // BITS_IN_BYTE
            match utf_len:
                case 1:
                    self.bits_to_save.extend(ONE)
                case 2:
                    self.bits_to_save.extend(TWO)
                case 3:
                    self.bits_to_save.extend(THREE)
                case 4:
                    self.bits_to_save.extend(FOUR)

            self.bits_to_save.extend(ba)
            return

        if node.left is not None:
            self.bits_to_save.append(0)
            self.__saveHuffmanCode(node.left)
            
        if node.right is not None:
            self.bits_to_save.append(0)
            self.__saveHuffmanCode(node.right)


    
    def __lastBits(self, num : int):
        # kalamarnica to ladna ryba :"DDDD
        x = int2ba(self.bits_to_save.fill())
        i = x.fill()
        x >>= i
        self.bits_to_save[:BITS_IN_BYTE] = x


    def __readAllFile(self):
        with open(self.filename, "r") as f:
            self.text = f.read()
    


    # decompressing

    def decompressData(self, file_to_write):
        data = bitarray()
        with open(self.filename, "rb") as f:
            # data = f.read()
            data.fromfile(f)
            # print(len(data))
            self.__restoreHuffman(data, file_to_write)
    

    def __restoreHuffman(self, data: bitarray, file_to_write):
        
        byte = self.__getNBytes(data, 1)
        last_bits = ba2int(byte)
        n = len(data) - last_bits
        # n = len(data)

        self.root = Node(0)
        self.__recHuffTree(data, self.root)
        self.prefix_code = dict()
        self.__createHuffmanCode(self.root, bitarray())
        self.__ba2Txt(data, n, file_to_write)



    def __recHuffTree(self, data: bitarray, vert: Node):
        if vert.left is not None and vert.right is not None:
            return
        bit = self.__getBit(data)
        # print(bit)
        # print(data)
        if bit == 1:
            len_bytes = ba2int(self.__getNBit(data, 2)) + 1
            byte = self.__getNBytes(data, len_bytes)
            vert.char = bitarray(byte).tobytes().decode("utf-8")
            return

        if vert.left is None:
            new_node = Node(0)
            vert.left = new_node
            self.__recHuffTree(data, new_node)


        bit = self.__getBit(data)
        
        if vert.right is None:
            new_node = Node(0)
            vert.right = new_node
            self.__recHuffTree(data, new_node)
        else:
            len_bytes = ba2int(self.__getNBit(data, 2)) + 1
            byte = self.__getNBytes(data, len_bytes)
            vert.char = bitarray(byte).tobytes().decode("utf-8")
            return
            
    
    
    def __ba2Txt(self, data: bitarray, length: int, file_to_write):
        vert = self.root
        with open(file_to_write, "w") as f:
            while self.bit_counter <= length:
                bit = self.__getBit(data)
                if vert.char is not None:
                    f.write(vert.char)
                    vert = self.root
                if bit == 0:
                    vert = vert.left
                else:
                    vert = vert.right

        

    def __getNBytes(self, tab : bitarray, n: int):
        x = tab[:n*BITS_IN_BYTE]
        self.bit_counter += n*BITS_IN_BYTE
        tab <<= n*BITS_IN_BYTE
        return x
    


    def __getBit(self, tab: bitarray):
        x = tab[0]
        self.bit_counter += BIT
        tab <<= BIT
        return x
    
    def __getNBit(self, tab: bitarray, n : int):
        x = tab[:n*BIT]
        self.bit_counter += n*BIT
        tab <<= n*BIT
        return x



    def __loadChars(self):
        for char in self.text:
            if char in self.char_freq:
                self.char_freq[char] += 1
            else:
                self.char_freq[char] = 1



    def __printResult(self, tmp):
        print(tmp)
        
        if tmp.left is not None:
            self.__printResult(tmp.left)
        if tmp.right is not None:
            self.__printResult(tmp.right)


    def __str__(self):
        self.__printResult(self.root)
        return ""




In [476]:
def compressFile(file_to_read, file_to_compress):
    huff = StaticHuffmanTree(file_to_read)
    huff.createHuffmanTree()
    huff.compressData(file_to_compress)
    print(len(huff.bits_to_save))


In [477]:
def decompressFile(file_compressed, file_to_write):
    huff_dec = StaticHuffmanTree(file_compressed)
    huff_dec.decompressData(file_to_write)



In [478]:
compressFile("input_files/1MB.txt", "bin_files/1MB.bin")

4322416


In [492]:
start = time.perf_counter()
decompressFile("bin_files/1MB.bin", "output_files/1MB.txt")
all_time1MB = time.perf_counter() - start

In [484]:
compressFile("input_files/1kB.txt", "bin_files/1kB.bin")

6744


In [486]:
start_1kB = time.perf_counter()
decompressFile("bin_files/1kB.bin", "output_files/1kB.txt")
all_time1kB = time.perf_counter() - start_1kB

In [487]:
print(all_time1kB)


0.011736362997908145


### Adaptive Huffman - FGK algorithm

In [109]:
class AdaptiveHuffmanTree:
    def __init__(self, filename) -> None:
        self.root = None
        self.filename = filename
        self.nyt = Node(0)  # not yet transferred - special node
        self.text = ""
        self.char_dict = dict()
        self.prefix_code = dict()
        self.bits_to_save = bitarray("10000000")
        self.bit_counter = 0
        self.depth = {0 : {self.root}, 1: set()}

    # Compressing
    def compressData(self, file_to_write):
        self.createHuffmanTree()
        self.__lastBits(len(self.bits_to_save))
        # print(self.bits_to_save)
        # for key, value in self.char_dict.items():
        #     print(key, value)
        with open(file_to_write, "wb") as f:
            self.bits_to_save.tofile(f)

    def createHuffmanTree(self):
        self.__readAllFile()

        self.root = self.nyt

        for char in self.text:
            ba = bitarray()
            ba.frombytes(char.encode("utf-8"))
            # print(ba)
            utf_len = len(ba) // BITS_IN_BYTE
            # print(ba)
            node = self.__insertChar(char)
            self.__updateParentStepByStep(node)


            self.bits_to_save.extend(node.huff)
            # print(node.huff, node.char)
            match utf_len:
                case 1:
                    self.bits_to_save.extend(ONE)
                case 2:
                    self.bits_to_save.extend(TWO)
                case 3:
                    self.bits_to_save.extend(THREE)
                case 4:
                    self.bits_to_save.extend(FOUR)

            self.bits_to_save.extend(ba)
        # print("_____________________--")
        # self.__printResult(self.root)


    def __updateParentChain(self, node: Node):
        parent_node = node.parent

        # TODO to check later if working
        while parent_node is not None:
            parent_node.value += 1
            parent_node = parent_node.parent

    def __updateParentStepByStep(self, node: Node):
        node_cp = node

        while node_cp != self.root:
            node_cp = self.__checkSiblings(node_cp)
        
        self.__updateParentChain(node)


    def __swapNodes(self, node1: Node, node2: Node):
        parent_node1 = node1.parent
        parent_node2 = node2.parent

        if parent_node1.right is node1:
            if parent_node2.right is node2:
                parent_node1.right, parent_node2.right = node2, node1
            else:
                parent_node1.right, parent_node2.left = node2, node1
        else:
            if parent_node2.right is node2:
                parent_node1.left, parent_node2.right = node2, node1
            else:
                parent_node1.left, parent_node2.left = node2, node1
        node1.parent = parent_node2
        node2.parent = parent_node1
        self.__bottomUpHuffCode(node1)
        self.__bottomUpHuffCode(node2)

    # inserting node to Huffman tree
    def __insertChar(self, char: str):
        if char not in self.char_dict:
            new_node = Node(1, char, parent=self.nyt)

            new_null_node = Node(0, parent=self.nyt)

            self.nyt.right = new_node
            self.nyt.left = new_null_node

            self.nyt = new_null_node

            self.char_dict[char] = new_node
            # print(new_node.parent)
            self.__bottomUpHuffCode(new_node)
            return new_node

        else:
            self.char_dict[char].value += 1
            return self.char_dict[char]

    # return new node if is grather then current node
    def __checkSiblings(self, node: Node):
        parent_node = node.parent

        if parent_node.left is node:
            if parent_node.right.value < node.value:
                self.__swapNodes(node, parent_node.right)
                return node
            else:
                return None

        if parent_node.parent is not None:
            pp_node = parent_node.parent
        else:
            return None

        # checking if the node is the most right node in it's own depth
        if parent_node.right is node and pp_node.right is parent_node:
            return parent_node
        
        if pp_node.right.value < node.value:
            self.__swapNodes(node, pp_node.right)
            return node
        else:
            parent_node.value += 1
            return parent_node


    # for every char -> save huffcode
    def __bottomUpHuffCode(self, node: Node):
        ba = bitarray()
        node_to_save = node

        while node.parent is not None:
            parent_node = node.parent
            if parent_node.right is node:
                ba.append(1)
            else:
                ba.append(0)
            node = parent_node
        ba.reverse()
        node_to_save.huff = ba
        # print(node_to_save.huff)

    def __readAllFile(self):
        with open(self.filename, "r") as f:
            self.text = f.read()

    def __lastBits(self, num: int):
        # kalamarnica to ladna ryba :"DDDD
        x = int2ba(self.bits_to_save.fill())
        i = x.fill()
        x >>= i
        self.bits_to_save[:BITS_IN_BYTE] = x

    
    def __printResult(self, tmp: Node):
        print(tmp.value, tmp.huff, tmp.char)
        
        if tmp.left is not None:
            self.__printResult(tmp.left)
        if tmp.right is not None:
            self.__printResult(tmp.right)

# TODO : depth and weight of nodes -> implement

In [110]:
adaptive_huff = AdaptiveHuffmanTree("example.txt")
adaptive_huff.compressData("example_zip.bin")

In [49]:
data = bitarray()
with open("example_zip.bin", "rb") as f:
    data.fromfile(f)

print(data)

bitarray('000000011000110001001000110111101000110111101000110101101000110101101000110010101000110010101000111000000100011001010010001110010000100000010100')
