## Use of Huffman coding to compress and decompress DNA sequences

#### Imports

In [7]:
import heapq
import os
import numpy as np
import pandas as pd

#### Dataset

In [8]:
data = pd.read_csv('human.txt', sep=" ")
print(data.shape)
data.head()

(4380, 1)


Unnamed: 0,sequence\tclass
0,ATGCCCCAACTAAATACTACCGTATGGCCCACCATAATTACCCCCA...
1,ATGAACGAAAATCTGTTCGCTTCATTCATTGCCCCCACAATCCTAG...
2,ATGTGTGGCATTTGGGCGCTGTTTGGCAGTGATGATTGCCTTTCTG...
3,ATGTGTGGCATTTGGGCGCTGTTTGGCAGTGATGATTGCCTTTCTG...
4,ATGCAACAGCATTTTGAATTTGAATACCAGACCAAAGTGGATGGTG...


#### Two classes

In [9]:
class BinaryTree:
    def __init__(self, value, freq):
        self.value = value
        self.freq = freq
        self.left = None
        self.right = None
        
    def __lt__(self, other):
        return self.freq < other.freq
    
    def __eq__(self, other):
        return self.freq == other.freq

class Huffmancode:
    
    def __init__(self, path):       
        self.path = path
        self.__heap = []
        self.__code = {}      
        self.__reversecode = {}
        
    def __frequency_from_text(self, text):
        freq_dict = {}
        for char in text:
            if char not in freq_dict:
                freq_dict[char] = 0
            freq_dict[char] += 1
        return freq_dict
    
    def __build_heap(self, frequency_dict):
        for key in frequency_dict:
            frequency = frequency_dict[key]
            binary_tree_node = BinaryTree(key, frequency)
            heapq.heappush(self.__heap, binary_tree_node)
    
    def __build_binary_tree(self):
        while len(self.__heap) > 1:
            binary_tree_node_1 = heapq.heappop(self.__heap)
            binary_tree_node_2 = heapq.heappop(self.__heap)
            sum_of_freq = binary_tree_node_1.freq + binary_tree_node_2.freq
            newnode = BinaryTree(None, sum_of_freq)
            newnode.left = binary_tree_node_1
            newnode.right = binary_tree_node_2
            heapq.heappush(self.__heap, newnode)
        return
    
    def __build_tree_code_helper(self, root, curr_bits):
        if root is None:
            return
        if root.value is not None:
            self.__code[root.value] = curr_bits
            self.__reversecode[curr_bits] = root.value
            return 
        self.__build_tree_code_helper(root.left, curr_bits + '0')
        self.__build_tree_code_helper(root.right, curr_bits + '1')
    
    def __build_tree_code(self):
        root = heapq.heappop(self.__heap)
        self.__build_tree_code_helper(root, '')
    
    def __build_encoded_text(self, text):
        encoded_text = ''
        for char in text:
            encoded_text += self.__code[char]            
        return encoded_text  
    
    def __build_padded_text(self, encoded_text):
        padding_value = 8 - (len(encoded_text) % 8)
        for i in range(padding_value):
            encoded_text += '0'
        padded_info = "{0:08b}".format(padding_value)
        padded_text = padded_info + encoded_text
        return padded_text
    
    def __build_byte_array(self, padded_text):
        array = []
        for i in range(0, len(padded_text), 8):
            byte = padded_text[i: i + 8]
            array.append(int(byte, 2))
        return array        
    
    def compress(self):
        print("COMPRESSION FOR YOUR FILE STARTS...")
        
        # Access the file and extract text from the file.
        #text = 'ATGCCCCAACTAAATACTACCGTATGGCCCACCATAATTACCCCCATACTCC'        
        filename, file_extension = os.path.splitext(self.path)
        output_path = filename + '.bin'
        with open(self.path, 'r+') as file, open(output_path, 'wb') as output:
            text = file.read()
            text = text.rstrip()            
            frequency_dict = self.__frequency_from_text(text)
            print(frequency_dict)
            # Calculate frequency of each text and store it in frequency dictionary
            build_heap = self.__build_heap(frequency_dict)
            
            # Minimum heap for two minimum frequencies
            # Construct binary tree from heap
            self.__build_binary_tree()
            
            # Construct code from binary tree and store it in dictionary
            self.__build_tree_code()
            
            # Construct encoded text
            encoded_text = self.__build_encoded_text(text)
            
            # Padding of encoded text
            padded_text = self.__build_padded_text(encoded_text)
            
            # Return binary file as output
            bytes_array = self.__build_byte_array(padded_text)
            
            final_bytes = bytes(bytes_array)
            # bytes() method returns an immutable bytes object initialized with the given size and data
            output.write(final_bytes)
            
        print('compressed successfully')
        return output_path

    def __remove_padding(self, text):
        # padded_info in the form of binary having base of 10 while, we convert it into base 2 format
        padded_info = text[:8]
        padding_value = int(padded_info, 2)
        text = text[8:]
        padding_removed_text = text[: -1 * padding_value]
        return padding_removed_text
    
    def __decompress_text(self, text):
        decoded_text = ''
        current_bits = ''
        for bit in text:
            current_bits += bit
            if current_bits in self.__reversecode:
                character = self.__reversecode[current_bits]
                decoded_text += character
                current_bits = ''
        return decoded_text                
        
    def decompress(self, input_path):
        file_name, file_extension = os.path.splitext(input_path)
        output_path = file_name + ' decompressed' + '.txt'
        with open(input_path, 'rb') as file, open(output_path, 'w') as output:
            bit_string = ''
            byte = file.read(1)
            while byte:
                byte = ord(byte)
                # ord() function returns an integer representing the Unicode character
                bits = bin(byte)[2:].rjust(8, '0')
                #The Python String rjust() method, as the name suggests, will justify the string to a certain length 
                #to its right. Any character (letters, digits, or symbols) is added at the beginning of the string 
                #(performing Padding), based on the number of places the string shifts or the maximum length decided.
                bit_string += bits
                byte = file.read(1)
            no_padding_text = self.__remove_padding(bit_string)
            decompressed_text = self.__decompress_text(no_padding_text)
            output.write(decompressed_text)
            print('decompressed successfully')
        return        
        

#### Compress

In [11]:
path = input("ENTER THE PATH OF YOUR FILE WHICH YOU NEED TO COMPRESS...")
h = Huffmancode(path)
h.compress()

ENTER THE PATH OF YOUR FILE WHICH YOU NEED TO COMPRESS...human.txt
COMPRESSION FOR YOUR FILE STARTS...
compressed successfully


'human.bin'

#### Decompress

In [10]:
path = input("ENTER THE PATH OF YOUR FILE WHICH YOU NEED TO COMPRESS...")
h = Huffmancode(path)
output_path = h.compress()
h.decompress(output_path)

ENTER THE PATH OF YOUR FILE WHICH YOU NEED TO COMPRESS...human.txt
COMPRESSION FOR YOUR FILE STARTS...
{'s': 3, 'e': 3, 'q': 1, 'u': 1, 'n': 1, 'c': 2, '\t': 4381, 'l': 1, 'a': 1, '\n': 4380, 'A': 1402159, 'T': 1222500, 'G': 1453104, 'C': 1456269, '4': 711, '3': 672, '5': 240, '2': 349, '6': 1343, 'N': 530, '0': 531, '1': 534}
compressed successfully
decompressed successfully


#### Summary

In [6]:
original_size = os.path.getsize('human.txt')
compressed_size = os.path.getsize('human.bin')
decompressed_size = os.path.getsize('human decompressed.txt')

compression_ratio = np.round(original_size / compressed_size, 2)
decompression_ratio = np.round(original_size / decompressed_size, 2)

print('Compression ratio: ', {compression_ratio})
print('Decompression ratio: ', {decompression_ratio})

Compression ratio:  {3.59}
Decompression ratio:  {1.0}
