<a href="https://colab.research.google.com/github/estebanhernandezr/DNA-compression/blob/master/GZIP_DEFLATE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [73]:
pip install bitarray



In [74]:
from bitarray import *
from heapq import heappush, heappop

In [75]:
class heapNode(object):
    def __init__(self):
        self.left = None
        self.right = None
        self.counts = 0
        self.twelveBitInteger = ""
    def __lt__(self, other):
        return self.counts < other.counts

In [76]:
n = 4000
Ls = 15
pad_symb = 37

def data_from_file(filename: str):
    with open(filename, 'rb') as input_file:
        filedata = input_file.read()
    return filedata

def file_from_bin(filename: str, newBuffer):
    with open(filename, 'wb') as outFile:
        outFile.write(newBuffer)
        return None

def bin_from_file(filename: str):
    filedata = bitarray(endian='big')
    with open(filename, 'rb') as input_file:
        filedata.fromfile(input_file)
    return filedata


def iniPad(symb, padsize, cad):
    pad = bytearray(chr(symb), 'utf-8')
    for i in range(padsize-1):
        pad.append(pad[0])
    return pad + cad

def reproducible_extension(search, lookahead):
    pos = -1
    size = 0
    char = ''
    for prefixsize in range(1, min(n-Ls, len(lookahead))):
        prefix = lookahead[:prefixsize]
        p = search.rfind(prefix, 0, (n-Ls)+prefixsize-1)
        if p >= 0:
            pos = p
            size = prefixsize
            char = lookahead[size]
        else:
            break
    return pos, size, char

def get_distances(cad):
    pcad = iniPad(cad[0], n-Ls, cad)
    distances = []
    j: int = 0
    while j < len(pcad)-(n-Ls):
        #print(pcad[j:j+n], "|", pcad[j:j+n-Ls], " --> ", pcad[j+n-Ls:j+n])
        pos, size, char = reproducible_extension(pcad[j:j+n], pcad[j+n-Ls:j+n])
        #print(pos, size)
        if pos >= 0:
          distances.append(pos)
        j += max(1,size)#+1)
    return distances

def bl_count_from_distances(distances):
    counts = {}
    for distance in distances:
        if distance in counts:
            counts[distance]+=1
        else:
            counts[distance] = 1
    return counts

def heap_from_dictionary(counts):
    heap = []
    for key in counts:
        node = heapNode()
        node.twelveBitInteger = key
        node.counts = counts[key]
        heappush(heap, node)
    #for node in heap:
    #    print(node.twelveBitInteger, ":", node.counts)
    return heap

def create_huffman_tree(counts):
    heap = heap_from_dictionary(counts)
    root = None
    while len(heap) > 1:
        smallestNode = heappop(heap)
        secondSmallestNode = heappop(heap)

        newRoot = heapNode()
        newRoot.counts = smallestNode.counts + secondSmallestNode.counts
        newRoot.twelveBitInteger = ''
        newRoot.left = smallestNode
        newRoot.right = secondSmallestNode
        root = newRoot
        heappush(heap, newRoot)
    return root

codeDictionary = {}
def dictionary_from_tree(root, path, codes):
    if root.left is None and root.right is None:
        #print(root.twelveBitInteger, ": ", root.counts)
        codes["{0:0{width}b}".format(root.twelveBitInteger, width=len("{0:b}".format(n)))] = path
        return
    else:
        if root.left is not None:
            dictionary_from_tree(root.left, path+"0", codes)
        if root.right is not None:
            dictionary_from_tree(root.right, path+"1", codes)

treeBuffer = bitarray(endian='big')
def codify_huffman_tree(root, buffer):
    if root.left is None and root.right is None:
        buffer.append(False) # If current node is leaf then False (internal)
        bin_distance = "{0:0{width}b}".format(root.twelveBitInteger, width=len("{0:b}".format(n)))
        for bit in bin_distance:
            if bit == '1':
                buffer.append(True)
            else:
                buffer.append(False)
    else:
        buffer.append(True) # If current node is internal then True
        codify_huffman_tree(root.left, buffer)
        codify_huffman_tree(root.right, buffer)

def decodify_huffman_tree(buffer):
    nextBit = buffer.pop(0)
    if nextBit == False: # if leaf node
        root = heapNode()
        binaryString = ''
        for i in range(len("{0:b}".format(n))):# read next twelve bytes to get full code
            bit = buffer.pop(0)
            if bit == True:
                binaryString+="1"
            else:
                binaryString+="0"
        root.twelveBitInteger = int(binaryString, 2)
    else:
        root = heapNode()
        root.left = decodify_huffman_tree(buffer)
        root.right = decodify_huffman_tree(buffer)
    return root

outputBuffer = bitarray(endian='big')
def codify_cad(cad, root, buffer):
    pad_symb = cad[0]
    pcad = iniPad(cad[0], n-Ls, cad)
    i: int = 0
    while i < len(pcad)-(n-Ls):
        pos, size, char = reproducible_extension(pcad[i:i+n], pcad[i+n-Ls:i+n])
        if pos >= 0 and size > 1:
            outputBuffer.append(True)
            if '{0:012b}'.format(pos) in codeDictionary:
                huffman_code = codeDictionary["{0:0{width}b}".format(pos, width=len("{0:b}".format(n)))]
                #print("huffman_code:", huffman_code, " size:", '{0:04b}'.format(size))
                bin_code = huffman_code + "{0:0{width}b}".format(size, width=len("{0:b}".format(Ls)))
                #print("bin_code:= ", bin_code)
                #print(type(bin_code))
                for bit in bin_code:
                    if bit == '1':
                        outputBuffer.append(True)
                    else:
                        outputBuffer.append(False)
                i += size#+1
        else:
            outputBuffer.append(False)
            outputBuffer.frombytes(bytes([pcad[i+n-Ls]])) # Literal symbol
            i += 1

def compress(filename: str):
    filedata = data_from_file(filename) #CHECKED
    distances = get_distances(filedata) #CHECKED
    #print(distances)
    counts = bl_count_from_distances(distances) #CHECKED
    #print(counts)
    huffman_tree = create_huffman_tree(counts) #CHECKED

    dictionary_from_tree(huffman_tree, '', codeDictionary) #CHECKED
    #print(codeDictionary)
    codify_huffman_tree(huffman_tree, treeBuffer) #CHECKED
    #print(treeBuffer)
    codify_cad(filedata, huffman_tree, outputBuffer) #CHECKED
    #print(outputBuffer)

    newBuffer = treeBuffer + outputBuffer
    newBuffer.fill()
    #print(newBuffer)

    file_from_bin('compressed_file', newBuffer)
    print("File compressed succesfully")
    return filedata

def decompress(filename, outputFile):
    filedata = bin_from_file(filename)
    codeDictionary = {}
    root = decodify_huffman_tree(filedata)
    dictionary_from_tree(root, '', codeDictionary)
    outputBuffer = iniPad(pad_symb, n-Ls, bytes())
    k: int = 0
    while len(filedata) >= 9:
        huffman_pair = filedata.pop(0)
        if not huffman_pair:
            byte = filedata[0:8].tobytes()
            outputBuffer += byte
            del filedata[0:8]
            k += 1
        else:
            curbitsubstring = ''
            stop = False
            while stop == False: # compare each bit subsequence and see if it is in the huffman table
                bit = filedata.pop(0)
                if bit == True:
                    curbitsubstring+="1"
                else:
                    curbitsubstring+="0"
                for key in codeDictionary:
                    if codeDictionary[key] == str(curbitsubstring):
                        twelvebitnumber = key
                        stop = True

            bestLengthBinary = ''
            for i in range(0, len("{0:b}".format(Ls))):
                bit = filedata.pop(0)
                if bit == True:
                    bestLengthBinary+='1'
                else:
                    bestLengthBinary+='0'

            bestDistance = int(twelvebitnumber, 2)
            bestLength = int(bestLengthBinary, 2)
            for i in range(bestLength):
                outputBuffer.append(outputBuffer[k+bestDistance+i])
            k += bestLength

    file_from_bin(outputFile, outputBuffer[n-Ls:])
    print("File decompressed succesfully")
    return outputBuffer[n-Ls:]

In [77]:
filename: str = 'BioCompress.py'
data = compress(filename)

out_filename: str = 'uncompressed_file'
decompressed = decompress('compressed_file', out_filename)

print(data == decompressed)
print(data)
print(decompressed)

File compressed succesfully
File decompressed succesfully
True
b'import numpy as np\r\nfrom SuffixTree import *\r\n\r\n\r\n"""Recursive function for calculating the representation\r\nof the number decnum in base radix."""\r\ndef d2r(radix, decnum, current):\r\n    res = decnum % radix\r\n    decnum = decnum // radix\r\n    if decnum == 0:\r\n        return str(res) + current\r\n    else:\r\n        return d2r(radix, decnum, str(res) + current)\r\n\r\n"""Wrapper function for d2r."""\r\ndef dec2radix(radix, decnum):\r\n    return d2r(radix, decnum, \'\')\r\n\r\n"""Looks for the longest string that starts at the \r\nsearch buffer and that is a prefix of the lookahead\r\nbuffer, using a SUFFIX TREE for the search. This method\r\ntraverse the SuffixTree exploting the propertie of\r\nMcCreights\' SuffixTree construction: only the first symbol\r\nof the edge label must be checked in every node."""\r\ndef ST_based_reproducible_extension(STD, s, lim=None, limsup=None):\r\n    v = STD.root\r\n  