Agnieszka Dutka
# Laboratory 3 - Huffman algorithm

Contents:  
[Classic Huffman](#classic)  
[Adaptive Huffman](#adaptive)  
[Summary](#evaluation)  

In [116]:
from bitarray import bitarray
from collections import defaultdict
files_dir = "files/"

<a id='classic'></a>
## Classic Huffman algorithm

### Node structure

In [117]:
class Node:
    def __init__(self, letter, weight, children=[]):
        self.letter =letter
        self.weight = weight
        self.children = children
        
    def to_dict(self, dic, prefix=""):
        if self.letter is not None:
            dic[self.letter] = bitarray(prefix)
        for idx, child in enumerate(self.children):
            child.to_dict(dic, prefix+str(idx))
            
    def __setitem__(self, index, value):
        self.children[index] = value

    def __getitem__(self, index):
        return self.children[index]
        
    def __repr__(self):
        return f"{self.letter}"


### Huffman structure

In [118]:
class Huffman:
    """ class containing huffman encoding in form of dictionary and tree (for quick coding and encoding), 
        based on provided text. 
        Class provides methods to compress and decompress files based on this encoding, and create encoding itself.
    """
    def __init__(self, file):
        f = open(files_dir+file, "r", encoding='utf-8')
        text = ''.join(f.readlines())
        letter_freq = Huffman.get_letter_freq(text)  # creating dictionary
        self.root = Huffman.huffman(letter_freq)  # creating tree
        self.dic = {}
        self.root.to_dict(self.dic)  # creating dictionay with letter codes
        self.file = file  # only name saved, for documentation purposes only
    
    @staticmethod
    def get_letter_freq(text):  # creates dictionary
        letter_freq = {}
        for c in text:
            letter_freq[c] = 1 if c not in letter_freq.keys() else letter_freq[c]+1
        return letter_freq
    
    @staticmethod
    def huffman(letter_counts: dict):
        nodes=[]
        for a,weight in letter_counts.items():
            nodes.append(Node(a,weight))
        internal_nodes=[]
        leafs=sorted(nodes,key=lambda n:n.weight)
        while(len(leafs)+len(internal_nodes)>1):
            head=[]
            if(len(leafs)>=2):
                head+=leafs[:2]
            elif(len(leafs)==1):
                head+=leafs[:1]
            if(len(internal_nodes)>=2):
                head+=internal_nodes[:2]
            elif(len(internal_nodes)==1):
                head+=internal_nodes[:1]
            element_1,element_2=sorted(head,key=lambda n:n.weight)[:2]
            internal_nodes.append(Node(None, element_1.weight+element_2.weight, [element_1, element_2]))
            if(len(leafs)>0 and element_1==leafs[0]):
                leafs=leafs[1:]
            else:
                internal_nodes=internal_nodes[1:]
            if(len(leafs)>0and element_2==leafs[0]):
                leafs=leafs[1:]
            else:internal_nodes=internal_nodes[1:]
        return internal_nodes[0]
    
    def compress(self, to_file=None):
        f = open(files_dir+self.file, "r", encoding='utf-8')
        text = ''.join(f.readlines())
        f.close()
        compressed = bitarray()
        for c in text:
            compressed += self.dic[c]
        if to_file is not None:
            compressed_f = open(to_file, "wb")
            extra_int = (8-len(compressed)%8)%8
            extra_b = bitarray()  # save info in first byte how many extra bytes at the end (to fill to full byte)
            extra_b.frombytes(str(extra_int).encode('utf-8'))
            compressed_f.write(extra_b)
            compressed_f.write(compressed.tobytes())
            compressed_f.close()
        return compressed
    
    def decompress(self, file, to_file=None):
        array = bitarray()
        with open(file, "rb") as f:
            array.frombytes(f.read())
            f.close()
        extra = int(array[:8].tobytes().decode()) # delete reduntant bits filling to full bytes
        array = array[8:-extra]
        decompressed = ""
        i = 0
        node = self.root
        while i < len(array):
            while node.letter is None:
                node = node[array[i]]
                i += 1
            decompressed += node.letter
            node = self.root
        if to_file is not None:
            with open(to_file, "w", encoding='utf-8') as f:
                f.write(decompressed)
                f.close()
        return decompressed


## Example

compressing file to "compressed" folder, and recompressing to "decompressed.txt"
Content of decompressed file is then printed, to check if text was correctly decoded

In [130]:
to_file = "compressed"
huff = Huffman("1KB.txt")
huff.compress(to_file)
huff.decompress(to_file, "decompressed.txt")
with open("decompressed.txt", "r") as f:
    text = f.read()
    print(text)

User pages are administration pages in the User and User talk namespaces that are useful for organizing and aiding the work users do on Wikipedia, as well as facilitating interaction and sharing between users. User pages are mainly for interpersonal discussion, notices, testing and drafts (see: Sandboxes), and, if desired, limited autobiographical and personal content.

User pages are available to Wikipedia users personally for purposes compatible with the Wikipedia project and acceptable to the community; Wikipedia is not a blog, webspace provider, or social networking site. Wikipedia policies concerning the content of pages can and generally do apply to user pages, and users must observe these policies. Users believed to be in violation of these policies should first be advised on their talk page using {{subst:uw-userpage}} when immediate action is not otherwise necessary. 
Your in this context means associated with you, not belonging to you. 
something else yet to be seen.


<a id='adaptive'></a>
## Adaptive huffman algorithm

### Node structure

In [185]:
class AdNode:
    def __init__(self, letter="##", parent=None):
        self.weight = 0
        self.letter = letter
        self.parent = parent
        self.children = [None, None]

    def get_code(self, curr_code):
        if self.parent:
            code = int(not self.parent[0] == self)
            return self.parent.get_code(curr_code + str(code))
        else:
            return curr_code

    def add_children(self, left, right):  # both weight 0 at th beginning
        self[0] = left
        self[1] = right

    @staticmethod
    def swap(node1, node2):
        is_left = not node1.parent[0] == node1

        if node2.parent[0] == node2:
            node2.parent[0] = node1
        else:
            node2.parent[1] = node1

        node1.parent, node2.parent = node2.parent, node1.parent
        node2.parent[is_left] = node2

    def increment(self):
        self.weight += 1
        if self.uncle and self.uncle.weight < self.weight:
            AdNode.swap(self, self.uncle)

        if self.parent:
            if self.parent[0] == self and self.parent[1].weight < self.weight:
                AdNode.swap(self, self.parent[1])
            self.parent.increment()

    @property
    def code(self):
        code = self.get_code(bitarray())
        code.reverse()
        return code

    @property
    def uncle(self):
        if self.parent and self.parent.parent:
            if self.parent == self.parent.parent[0]:
                return self.parent.parent[1]
            else:
                return self.parent.parent[0]
        return None
    
    def __getitem__(self, item):
        return self.children[item]

    def __setitem__(self, key, value):
        self.children[key] = value


### Adaptive Huffman structure

In [186]:
class AdaptiveHuffman:
    def __init__(self, file):
        self.file = file  # only name saved, for documentation purposes only

    def compress(self, to_file=None):
        f = open(files_dir + self.file, "r", encoding='utf-8')
        text = ''.join(f.readlines())
        f.close()

        nodes = {"##": AdNode()}
        compressed = bitarray()

        for letter in list(text):
            if letter in nodes:
                node = nodes[letter]
                compressed += node.code
                node.increment()
            else:
                updated_node = nodes["##"]
                compressed += updated_node.code
                
                letter_b = bitarray()
                letter_b.frombytes(bytes(letter, "utf-8"))
                if (len(letter_b) > 8):
                    print("letter occupies", len(letter_b) / 8, "bytes!")
                    return -1
                compressed += letter_b

                node = AdNode(letter=letter, parent=updated_node)
                nyt_node = AdNode(parent=updated_node)
                updated_node.add_children(nyt_node, node)
                del nodes["##"]
                nodes["##"] = nyt_node
                nodes[letter] = node
                
                node.weight +=1
                updated_node.increment()

        if to_file is not None:
            compressed_f = open(to_file, "wb")
            extra_int = (8 - len(compressed) % 8) % 8
            extra_b = bitarray()  # save info in first byte how many extra bytes at the end (to fill to full byte)
            extra_b.frombytes(str(extra_int).encode('utf-8'))
            compressed_f.write(extra_b)
            compressed_f.write(compressed.tobytes())
            compressed_f.close()

        return compressed

    def decompress(self, file, to_file=None):
        # get compressed text
        compressed = bitarray()
        with open(file, "rb") as f:
            compressed.frombytes(f.read())
            f.close()

        # delete reduntant bits filling to full bytes
        extra = int(compressed[:8].tobytes().decode())
        if extra == 0:
            compressed = compressed[8:]
        else:
            compressed = compressed[8:-extra]

        nodes = {"##": AdNode()}
        root = nodes["##"]
        text = ""
        idx = 0

        while idx < len(compressed):
            curr_node = root

            while curr_node.weight > 0 and curr_node.letter == "##":
                curr_node = curr_node[compressed[idx]]
                idx += 1

            if curr_node.letter != "##":
                letter = curr_node.letter

                node = nodes[letter]
                node.increment()

            else:
                # decompress from 1 byte
                letter = compressed[idx:(idx + 8)].tobytes().decode("utf-8")
                idx += 8
                
                # fork current nyt_node
                updated_node = nodes["##"]
                node = AdNode(letter=letter, parent=updated_node)
                nyt_node = AdNode(parent=updated_node)
                updated_node.add_children(nyt_node, node)
                
                # update dictionary
                del nodes["##"]
                nodes["##"] = nyt_node
                nodes[letter] = node

                node.weight+=1
                updated_node.increment()
            text += letter
            
        if to_file is not None:
            with open(to_file, "w", encoding='utf-8') as f:
                f.write(text)
                f.close()
        return text


## Example

compressing file to "compressed" folder, and recompressing to "decompressed.txt"
Content of decompressed file is then printed, to check if text was correctly decoded

In [189]:
to_file = "compressed"
huff = AdaptiveHuffman("1KB.txt")
huff.compress(to_file)
huff.decompress(to_file, "decompressed.txt")
with open("decompressed.txt", "r") as f:
    text = f.read()
    print(text)

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed non dapibus dolor, non sodales felis. Curabitur vel tellus magna. Phasellus mattis sem non libero auctor ultrices. Aenean pellentesque semper ultricies. Vivamus est justo, congue vel pharetra vitae, sagittis a augue. Vivamus vitae arcu odio. Duis commodo, turpis sed vehicula tempor, lorem arcu blandit lectus, in gravida mi risus nec orci. Nullam et suscipit massa.

Pellentesque posuere porta felis, at accumsan orci fermentum non. Donec metus leo, fermentum efficitur consectetur a, ultricies a eros. Maecenas vulputate sem eget erat bibendum, vitae molestie eros ultricies. Fusce ut egestas mauris. Suspendisse posuere commodo justo, imperdiet consectetur nulla ornare nec. Cras porta leo vel tortor tempus, nec feugiat mi ultricies. Sed pretium tempor pretium. Donec magna sem, convallis nec efficitur eu, blandit in ante.

Curabitur at dignissim erat, eget condimentum ipsum. Vivamus at enim congue, finibus sapien quis, aliquet posu

<a id='evaluation'></a>
## Evaluation
For given files of size: 1kB, 10kB, 100kB and 1MB

In [156]:
from time import perf_counter
import os
def test_compression(file, Class):
    """ test provided coding class (must implement compress and decompress methods) """
    compressed = "compressed"
    tstart = perf_counter()
    huff = Class(file)
    tend1 = perf_counter()
    huff.compress(compressed)
    tend2 = perf_counter()
    huff.decompress(compressed)
    tend3 = perf_counter()
    
    print(f"Time evaluation for {file}:\nstructure creation:{tend1-tstart}\
            \ncompression: {tend2-tend1}\ndecompression: {tend3-tend2}")
    
    original_size = os.stat(files_dir+file).st_size
    compressed_size = os.stat(compressed).st_size
    ratio = 1-compressed_size/original_size
    print(f"original size: {original_size/1000}kB")
    print(f"compresion ratio: {round(ratio*100, 2)}%\n")
    

### Classic huffman

In [157]:
test_compression("1KB.txt", Huffman)
test_compression("10KB.txt", Huffman)
test_compression("100KB.txt", Huffman)
test_compression("1MB.txt", Huffman)

Time evaluation for 1KB.txt:
structure creation:0.0012965000005351612            
compression: 0.0006821999995736405
decompression: 0.00228889999925741
original size: 1.005kB
compresion ratio: 46.57%

Time evaluation for 10KB.txt:
structure creation:0.003115200001047924            
compression: 0.0032797000003483845
decompression: 0.020794299998669885
original size: 10.1kB
compresion ratio: 46.43%

Time evaluation for 100KB.txt:
structure creation:0.019422600000325474            
compression: 0.014856799998597126
decompression: 0.1564118000005692
original size: 100.657kB
compresion ratio: 46.64%

Time evaluation for 1MB.txt:
structure creation:0.19242510000003676            
compression: 0.15088209999885294
decompression: 1.7945044000007329
original size: 1038.587kB
compresion ratio: 47.29%



### Adaptive huffman

In [177]:
test_compression("1KB.txt", AdaptiveHuffman)
test_compression("10KB.txt", AdaptiveHuffman)
test_compression("100KB.txt", AdaptiveHuffman)
test_compression("1MB.txt", AdaptiveHuffman)

Time evaluation for 1KB.txt:
structure creation:1.4000015653437003e-06            
compression: 0.018384399998467416
decompression: 0.012035200001264457
original size: 1.005kB
compresion ratio: 40.3%

Time evaluation for 10KB.txt:
structure creation:1.0999992809956893e-06            
compression: 0.11195910000060394
decompression: 0.08279489999949874
original size: 10.1kB
compresion ratio: 43.69%

Time evaluation for 100KB.txt:
structure creation:1.2000000424450263e-06            
compression: 1.1390696999987995
decompression: 0.9539350000013656
original size: 100.657kB
compresion ratio: 44.31%

Time evaluation for 1MB.txt:
structure creation:1.3999997463542968e-06            
compression: 11.683703900000182
decompression: 9.329767499999434
original size: 1038.587kB
compresion ratio: 44.8%

