# Encoding a file to DNA using in Binary using fountain codes and Reed-Solomon error correction code

# Import
First import some python functions

In [1]:
#imports
import random
import math
import reedsolo 
import os
import numpy as np
import functools

Open the file as a string and return the string

In [2]:
def openfile(file_path) -> str: 
    """
    INPUT:
    filepath: path to the file
    OUTPUT:
    message: string containing the data of the file
    
    """
    with open(path, 'rb') as file:
        message = file.read()
    return message

# Segmentation
Segment the data in segments of 32 bytes, using padding to make sure the last segment has the correct lenght

In [3]:
def segment(message) -> list:
    """ 
    INPUT:
    message: string containing the data of the file
    OUTPUT:
    segment_list: list of binary numbers
    Segments the original data by adding padding and segmenting the data into binary numbers of 32 bytes\
    and putting them in a list
    """
    bits_to_add = 256 - (len(message) % 256)
    if bits_to_add != 0:
        padding_length = format(bits_to_add,'08b')
        message += padding_length
        padding = '0'
        bits = int(bits_to_add-8)
        for i in range(bits):
            message += padding
    #check if the length is a multiplication of 256, if check is True this is the case
    c = (len(message)*8) % 256
    check = c == 0
    segments = []
    n=256
    #divide into segments
    for index in range(0, len(message), n):
        segments.append(message[index : index + n])
    return segments

# Randomization
Randomize the segments in an attempt to reduce homopolymers in the final DNA segments

In [4]:
def generate_key(keygenseed) -> int:
    """
    INPUT:
    keygenseed: integer seed for the keygen
    key: the key corresponding to the seed
    Generate a 1 byte key, using a fixed seed so it is repeatable
    """
    random.seed(keygenseed)
    return random.randint(0,255)

In [5]:
def randomize(segment, keygenseed = 7) -> bytearray:
    """
    INPUT: 
    segment: original file, in bytearray format
    keygenseed: integer value of the seed to be used, default 7 
    OUTPUT:
    Randomize the message one byte at a time, using a known keygenseed.
    """
    randomized_segment = bytearray(len(segment))
    for i in range(len(segment)):
        key = generate_key(keygenseed)
        keygenseed += 1 
        randomized_segment[i] = key ^  segment[i]
    return randomized_segment

# Droplets & Reed_Solomon
Construct droplets containing a seed, segment and Reed-Solomon error correcting code

In [6]:
def ideal_soliton(K) -> list: 
    """
    INPUT:
    K: length of list
    OUTPUT:
    probabilities: list of probabilities following ideal soliton distribution
    Generates a list of probalities of length K, following an ideal soliton distribution
    """
    # initialize with the first two values, p(0) = 0 and p(1) = 1/K
    probabilities = [0, 1/K]
    # calculate the rest of the values using p(i) = 1/(i*(i-1))
    probabilities += [1/(i*(i-1)) for i in range(2, K+1)]
    return probabilities 

In [7]:
def robust_soliton(K,c,delta) -> list:
    """
    INPUT:
    K: length of list
    c: value of c variable in distribution
    delta: value of delta variable in distribution
    OUTPUT:
    probabilities: list of probabilities following ideal soliton distribution
    Generates a list of probalities of length K, following an robust soliton distribution with variables c and delta.
    """
    #initialize with the ideal distribution
    probabilities = ideal_soliton(K)
    # Define R
    R = c*(math.log(K/delta)**2)*math.sqrt(K)
    # calculate the additional probabilities
    pivot = int(math.floor(K/R))
    robust_probabilities = [0] + [R/(i*K) for i in range(1, pivot)]
    robust_probabilities += [(R*math.log(R/delta))/K]
    robust_probabilities += [0 for i in range(pivot,K)]
    # add together
    probabilities = np.add(robust_probabilities, probabilities)
    #normalize 
    probabilities /= sum(probabilities)
    return probabilities

In [8]:
def number_2_bytearray(num): 
    """
    INPUT:
    num: string of binary bytes, so length multiple of 8 (eg. '010101101011011')
    OUTPUT 
    bytearray: bytearray conversion of the string
    Takes a string of binary bytes and converts it to a bytearray.
    """
    return bytearray(int(num,2).to_bytes((len(num)+7) // 8, byteorder='big'))

In [9]:
def prepare_seed(seed):
    """
    INPUT:
    seed: Integer seed for packing into a droplet.
    OUTPUT 
    seed_index: seed converted to bytearray so it can be added to the droplet directly 
    Convert the seed into the index that will be packaged into the droplet.
    so, for example with seed = 7 = 00000000 00000000 00000000 00000111. we create a byte array of these bytes.
    """
    # convert the test seed into the index that will be packaged into the droplet.
    # so for example with seed = 7 = 00000000 00000000 00000000 00000111. we create a byte array of these bytes.
    # first convert to bytearray.
    seed_array = bytearray(number_2_bytearray(bin(seed)))
    seed_index = bytearray()
    if len(seed_array) < 4:  
        seed_index = bytearray(4-len(seed_array)) + seed_array
    elif len(seed_array) > 4:
        if seed_array[0] == 0:
            del seed_array[0]
            seed_index = seed_array
        else:
            raise Exception("seed too big")
    else: 
        seed_index = seed_array
    return seed_index

In [10]:
def droplet(randomized_segments, segment_seed, prng, distribution_size = 1000) -> bytearray:
    """
    INPUT:
    randomized_segments: randomized and segmented data.
    segment_seed: integer seed for the creation of the droplet.
    prng: random.Random() random number generator.
    OUTPUT:
    droplet_rs: droplet encoded using Reed Solomon
    Takes the data and a seed and creates a droplet.
    """
    prng.seed(segment_seed)
    seed_index = prepare_seed(segment_seed)
    # max_choices = len(randomized_segment)
    amount = prng.choices(range(0,distribution_size+1), robust_soliton(distribution_size,0.001,0.025), k = 1)[0]
    segment_indices = prng.sample(range(len(randomized_segments)),k = amount)
    segments = []
    for i in segment_indices:
        segments.append( randomized_segments[i])
    droplet = seed_index + bytearray(functools.reduce(lambda i, j: bytes(a^b for (a, b) in zip(i,j)), segments))
    # prepare reedsolomon
    rsc = reedsolo.RSCodec(2)
    # create the encoded droplet (what will eventually be stored in DNA)
    droplet_rs = rsc.encode(droplet)
    return droplet_rs

# LFSR

In [11]:

def lfsr(state, mask):
    #Galois lfsr:
    result = state #Set the first state of the register
    nbits = mask.bit_length()-1
    while True:
        result = (result << 1) #Shift the register left once
        xor = result >> nbits #Shift the register right by the amount of bits in the mask -1
        if xor != 0: #XOR is useless if it is 0
            result ^= mask #XOR the state of the register with the mask

        yield result

def lfsr32p():
    #this function returns a hard coded polynomial (0b100000000000000000000000011000101).
    #The polynomial corresponds to 1 + x^25 + x^26 + x^30 + x^32, which is known 
    #to repeat only after 32^2-1 tries. Don't change unless you know what you are doing.
    return 0b100000000000000000000000011000101

def lfsr32s():
    #this function returns a hard coded state for the lfsr (0b001010101)
    #this state is the inital position in the register. You can change it without a major implication.
    return 0b101011100




# DNA segments
Convert droplets to DNA

In [12]:
#code to transform bytearrays into a 8-bit binary string
### insert code ###
def bytearray_to_binary(Bytearray) -> str:
    """
    INPUT:
    Bytearray: bytearray to be converted
    OUTPUT:
    binary_string: string of converted bytearray
    takes a bytearray and converts it to a string of binary numbers, keeping leading and trailing zeros 
    """
    binary_string = ""
    for i in Bytearray:
        binary_string += format(i, '08b')
    return binary_string

In [13]:
def Binary2DNA(binary_string):
    dna_string = ""
    for x in [binary_string[i:i+2] for i in range(0,len(binary_string),2)]:
        if x == "00":
            dna_string += "A"
        if x == "01":
            dna_string += "C"
        if x == "10":
            dna_string += "G"
        if x == "11":
            dna_string += "T"
    return dna_string

Check for biochemical constraints

In [14]:
def CheckBiochemicalRequirements(s, max_length=3, check=False):
    if s.find((max_length+1)*'C') ==-1 & s.find((max_length+1)*'G') ==-1 & s.find((max_length+1)*'T')==-1 & s.find((max_length+1)*'A')==-1:
# print('yes, no homopolymers')
        nr_CG = s.count('C')+s.count('G')
        per_CG=nr_CG/len(s)
        if (per_CG < 0.55) & (per_CG > 0.45):
#             print('Suitable CG content')
            check = True
#         else:
#             print('CG content not in accepted ranges')
#     else:
#         print('not a valid sequence: it contains homopolymers longer than three bases')
    return check

Mainscript

In [15]:
import os
def encode(file_path, seed, extra = 1.05, max_homopolymer = 3) -> list:
    """
    INPUT:
    file_path: path to the file to be encoded
    seed:  seed for the randomization of data
    extra: amount of extra droplets compared to data (default 5%, so 1.05)
    max_homopolymer: maximum length of homopolymer sequences allowed (default 3)
    OUTPUT:
    droplets: list of DNA sequences
    Read and encode a file using the luby transform, returning a list of DNA sequences
    """
    # first read file
    data = openfile(file_path)
    # make bytearray from data
    data = bytearray(data)
    # randomize first
    data = randomize(data, seed)
    # data needs to be converted to string for segmentation
    data = bytearray_to_binary(data)
    # segment
    data = segment(data)
    # now the luby transform needs to be applied
    total_segments = len(data)
    droplets = []
    data_bytearray = []
    # create a Pseudo random number generator 
    prng = random.Random()
    # first convert segments to bytearrays 
    seed_lfsr = lfsr(lfsr32s(), lfsr32p()) #starting an lfsr with a certain state and a polynomial for 32bits.
    for i in data:
        data_bytearray.append(number_2_bytearray(i))
    while len(droplets) < int(extra*total_segments):
        # generate a seed for droplet making
        seed = next(seed_lfsr)
        # create droplet
        current_drop = droplet(data_bytearray, seed, prng)
        # convert drop to string and then DNA
        current_drop = Binary2DNA(bytearray_to_binary(current_drop))
        # check biochemical requirements and append to list of droplets if ok
        if CheckBiochemicalRequirements(current_drop, max_homopolymer):
            droplets.append(current_drop)
    return droplets
path = os.path.relpath("C:\\Users\\justi\\Documents\\DBLCompBioGroup12\\Data\\Research_Proposal_Group_12-1.pdf")
encoded = encode(path, 7)
            

In [16]:
encoded

['CAGCTAATAAGCCCGCTGAGCTGGACGTAGCGTCCATTGCGAGATGATGAATCATGAGAAAGACTATGTGAGCGCCTAACAGCTTTGGAACAAGTAGACAGTGCTGTGCCCGGCAAACATGCCTGTACCTGTGTTTCAATGTACTCCTCAGC',
 'AGCTAATAAGCCGGACCTTGCATCGTTCGCCCTGCCTCACAAACACTAGCATGATGCCTGAGATGTTTGTCCTCTCGACCCTGGTTCGTCGTAACACGCCAGAGCTGATGGCCAAAGAGACAGCTCAATGCGTTCAGAGTATCGCGACGTCC',
 'TGACGACAGTCTATCATCTTACTCATTTCAGGCCTCGAGTGTAATGTCGACATAGGCCGATGACAATTCAACTGTGATATAAATCTACTCTAACACTAGATCCGGTTGGCTCGTTCACGCCAACACGCGCACTAAACAGCCTGGCCGCTTGT',
 'TATTGCGTTGGTCACGTCCATCACTTGTGCTGCGAGAAAGAACGCAACAACAGCTCGTCATGACCTTGTGACCGGAGACCTTAGGTCCCATGACTGTAGTGACAAGGCTAAGTTCGGGTCAGCACCGGAATCCGCAACACAGGTGTGGCTTT',
 'TTATCTTCCGAGCTTCCTACAAGTTAAGTTACTAAGTGGCGCGCGATAGTTCCATCACGCCTCTCCTCGGCCAGGTGGTCACAAGGTGATCACTACGTGGATATCAAGTGGTAAGGCGGTAGGACCTATCGTCAATCGTCAATGAGCAACCT',
 'AGATGTCCTTCGAATGCTAAGATACCTCTTCGTCGAAGTCTGACGCGAAAGACGATACGAGGTTGCGGGAGGGTGGTGAACCTACCGAAATTAGCATTCTAGGAGGATGTATCGTCCCTCCGTTCACCCTCCCGCTCGCCGCCCATCGAATC',
 'GATGTCCTTCGAATGACAGCACTACCCTATCTTGTGTCCAGTTATGAGGCACACAG

In [17]:
f = open("file.txt", "w")
for i in encoded:
    f.write(i + "\n")
f.close()