# Encodng Methods Analysis

Importing required packages

In [25]:
import Bio
from Bio import SeqIO
from Bio.SeqUtils import gc_fraction
from Bio import Align
import pysam

import numpy as np
import pandas as pd

In [26]:
def principal_period(s):
    """
    A function that detects if a string in its entirety consists of a repeating pattern.
    Returns a string of the repeating pattern.
    E.g. "ATGATG" returns "ATG", "ATGATGT" returns None
    """
    i = (s+s).find(s, 1, -1)
    return None if i == -1 else s[:i]

In [27]:
def has_polymer(sequ, length = 3, type = None):
    
    """
    A function that checks if a sequence contains a homopolymer of certain length.
    length - int, minimum length of hompolymer
    type - None, "AT" or "GC", type of homopolymers to look for
           None searches for all 
    """
    if type == None:
        if len(sequ) < length:
            print('Error sequence less than homopolymer length')
            return
        elif sequ.count("A"*length) != 0:
            return True
        elif sequ.count("T"*length) != 0:
            return True
        elif sequ.count("G"*length) != 0:
            return True
        elif sequ.count("C"*length) != 0:
            return True
        else:
            return False
    elif type == "AT":
        if len(sequ) < length:
            print('Error sequence less than homopolymer length')
            return
        elif sequ.count("A"*length) != 0:
            return True
        elif sequ.count("T"*length) != 0:
            return True
        else:
            return False
    elif type == "GC":
        if len(sequ) < length:
            print('Error sequence less than homopolymer length')
            return
        elif sequ.count("G"*length) != 0:
            return True
        elif sequ.count("C"*length) != 0:
            return True
        else:
            return False
    else:
        raise ValueError("Type must be None, 'AT' or 'GC'")

In [28]:
def homopolymer_finder(alignres, polymer_length = 3, bases = None):
    """
    A function that finds the indexes of all the
    homopolymers in a sequence.
    Returns a list of start and end indexes of each polymer.
    alginres - AlignResults object or a DNA sequence string.
    polymer_length - int, minumum homopolymer length.
    bases - None, "AT" or "GC", type of homopolymers to look for
            None searches for all 
    """

    seq = alignres

    polymer_list = []
    prev = False
    memory, i = 0, 0
    while (i+polymer_length-1) < len(seq):
        if has_polymer(seq[i:i+polymer_length], polymer_length, bases) == True:
            if prev == True:
                i+=1
                continue
            else:
                prev = True
                memory = i
        else:
            if prev == True:
                prev = False
                polymer_list.append([memory, i+polymer_length-2])
        i+=1
    #catch any homopolymers that are at the end of the sequence
    if prev == True:
        polymer_list.append([memory, i+polymer_length-2])
    return(polymer_list)

In [29]:
def repeat_finder(alignres, repeat_length = 3, repeat_count = 2):
    """
    A function that finds the indexes of all the
    repeats in a sequence.
    In this case the amount of repeats needing to occur
    is only 2, so "ATGATG" will be included.
    Returns a list of start and end indexes of each polymer.
    alginres - AlignResults object or a DNA sequence string.
    repeating_element - int, the size of the repeating element,
                        e.g., 2 = "ATAT" - "AT",
                              3 = "ATGATG" - "ATG".
                        Default = 3
    repeat_count - int, minimum times repeating element must
                   be encountered. E.g., if 3 then
                   "ATGATG" won't be taken into account but
                   "ATGATGATG" will. Default = 2
    """

    sequ = alignres

    repeat_idx = []
    #This loop returns the starting index of each repeat.
    #This does result in overlaps for repeat sizes greater than 2.
    #But this is dealt with in the next loop
    for i in range(repeat_length):
        while (i+(repeat_length*2)) < len(sequ):
            x = sequ[i:i+(repeat_length*2)]
            pp = principal_period(x)
            if pp is not None:
                if len(pp) == repeat_length:
                    repeat_idx.append(i)
            i += repeat_length
            
    #Removing duplicating repeats
    #For example "ATG ATG ATG"
    #also contains "A TGATGA TG" "TGA" repeat
    #Solve this by checking for overlaps and deleting them
    repeat_idx.sort()
    i = 0
    while i < (len(repeat_idx)-1):
        if i == 0:
            i+=1
            continue
        if (repeat_idx[i] - repeat_idx[i-1]) < repeat_length:
            del repeat_idx[i]
            continue 
        i+=1

    #This here converts the starting index of each repeat
    #into a list of ranges as well as accounting for
    #repeats where the repeating elements occurs more than twice.
    combined_idx = []
    prev = False #Are we currently in series or not
    count = 0
    for i in range(len(repeat_idx)):
        #catch out of bounds
        if i == len(repeat_idx)-1:
            if prev == True:
                prev = False
                combined_idx.append([memory, repeat_idx[i]])
            break
        #If two repeats are in series:
        if (repeat_idx[i+1] - repeat_idx[i]) == repeat_length:
            if prev == False:
                count = 1
                prev = True
                memory = repeat_idx[i]
            else:
                count+=1
                continue
        else:
            if prev == True:
                prev = False
                if count >= (repeat_count-2):
                    combined_idx.append([memory, repeat_idx[i]+(repeat_length*2)-1])
                count = 0
            else:
                if repeat_count == 2:
                    combined_idx.append([repeat_idx[i], repeat_idx[i]+(repeat_length*2)-1])
                    count = 0

    return(combined_idx)

-----------------------


### Sequence Analysis


In [100]:
datas = []
for seq_record in SeqIO.parse("Wuk_output/G_wukong.fasta", "fasta"):
        datas.append(seq_record)

In [31]:
def dict_add(x,y):

    for i in y:
        if i in x:
            x[i] += y[i]
        else:
            x[i] = y[i]
    return(x)

In [32]:
def polymer_counter(seq, min = 3):

    previous_base = "none"
    counter = 1
    final_count = {}
    for i in range(len(seq)):
        if i == 0:
            previous_base = seq[i]
        else:
            if seq[i] == previous_base:
                counter += 1
            else:
                if counter >= min:
                    if counter not in final_count:
                        final_count[counter] = 1
                    else:
                        final_count[counter] += 1
                    previous_base = seq[i]
                    counter = 1
                else:
                    previous_base = seq[i]
                    counter = 1
    return(final_count)

In [33]:
def repeat_counter(seq):
    """
    Will count number of repeats of sizes 2 and 3,
    with a minimum of 2 repeating elements in a row
    """
    seq = seq.seq
    #Dimer Repeats
    counter = 1
    previous_dimer = "none"
    dimer_count = {}
    i = 0
    while (i+2) < len(seq):
        if i == 0:
            previous_dimer = seq[0:2]
        else:
            if seq[i:i+2] == previous_dimer:
                counter += 1
            else:
                if counter >= 2:
                    if counter not in dimer_count:
                        dimer_count[counter] = 1
                    else:
                        dimer_count[counter] += 1
                    previous_dimer = seq[i:i+2]
                    counter = 1
                else:
                    previous_dimer = seq[i:i+2]
                    counter = 1
        i+=2
    
    #Triimer Repeats
    counter = 1
    previous_trimer = "none"
    trimer_count = {}
    i = 0
    while (i+3) < len(seq):
        if i == 0:
            previous_triimer = seq[0:3]
        else:
            if seq[i:i+3] == previous_triimer:
                counter += 1
            else:
                if counter >= 2:
                    if counter not in trimer_count:
                        trimer_count[counter] = 1
                    else:
                        trimer_count[counter] += 1
                    previous_trimer = seq[i:i+3]
                    counter = 1
                else:
                    previous_trimer = seq[i:i+3]
                    counter = 1
        i+=3  

    return(dimer_count,trimer_count)

In [34]:
def sequence_analysis(seqs, min = 3):

    total_length = 0 
    gc_distribution = {}
    pol_distribution = {}
    dimer,trimer = {},{}
    
    for i in seqs:
        total_length += len(i)
        gc = round(gc_fraction(i),2)
        if gc not in gc_distribution:
            gc_distribution[gc] = 1
        else:
            gc_distribution[gc] += 1
        pol_distribution = dict_add(polymer_counter(i,min), pol_distribution)
        x,y = repeat_counter(i)
        dimer = dict_add(x,dimer)
        trimer = dict_add(y, trimer)

    print('The number of sequences is: ', len(seqs))
    print('The length of each sequence is: ', len(seqs[1]))
    print('The total base count is: ', total_length)
    
    return(gc_distribution, pol_distribution, dimer, trimer)

In [101]:
res = sequence_analysis(datas)

The number of sequences is:  561
The length of each sequence is:  200
The total base count is:  112000


In [105]:
res

({0: 1,
  0.52: 43,
  0.56: 124,
  0.53: 56,
  0.55: 82,
  0.59: 24,
  0.49: 8,
  0.58: 36,
  0.57: 67,
  0.51: 25,
  0.54: 67,
  0.46: 3,
  0.48: 6,
  0.5: 14,
  0.47: 3,
  0.6: 2},
 {3: 4597, 4: 1254},
 {2: 2493, 3: 118, 4: 4},
 {2: 633, 3: 7})

In [37]:
res[0]

{0: 1,
 0.47: 21,
 0.46: 13,
 0.56: 55,
 0.51: 30,
 0.57: 25,
 0.49: 31,
 0.55: 37,
 0.48: 20,
 0.54: 41,
 0.58: 15,
 0.52: 41,
 0.5: 21,
 0.53: 30,
 0.45: 9,
 0.59: 9,
 0.44: 3,
 0.6: 1,
 0.43: 1}

In [38]:
def single_sequence(seq_list, name, type = "fasta"):

    final = ""
    for i in seq_list:
        final += str(i.seq)
    x = Bio.SeqRecord.SeqRecord(Bio.Seq.Seq(final), id = name)
    SeqIO.write(x, name, type)

In [40]:
def gc_analysis_splitter(seq, size = 200):

    x = len(seq)
    i = 0
    gc_distribution = {}
    while (i+size) < len(seq):
        gc = round(gc_fraction(seq[i:i+size]),2)
        if gc not in gc_distribution:
            gc_distribution[gc] = 1
        else:
            gc_distribution[gc] += 1
        i += size

    return(gc_distribution)

---------------------------------
### Error simulation

In [42]:
import random 

In [43]:
def transition(base):

    if base == "A":
        return("G")
    elif base == "G":
        return("A")
    elif base == "T":
        return("C")
    elif base == "C":
        return("T")
    else:
        raise ValueError("Invalid base")

In [44]:
def transversion(base):
    if base == "A":
        return(random.choice(["C", "T"]))
    elif base == "G":
        return(random.choice(["C", "T"]))
    elif base == "T":
        return(random.choice(["A", "G"]))
    elif base == "C":
        return(random.choice(["A", "G"]))
    else:
        raise ValueError("Invalid base")

In [45]:
def base_error(base, weights):

    x = random.choices(["same", "transition", "transversion", "insertion", "deletion"],
                        weights, k = 1)[0]
    if x == "same":
        return(base, np.array([0,0,0,0]))
    elif x == "transition":
        return(transition(base), np.array([1,0,0,0]))
    elif x == "transversion":
        return(transversion(base), np.array([0,1,0,0]))
    elif x == "deletion":
        return("", np.array([0,0,0,1]))
    elif x == "insertion":
        base += random.choice(["A","T","G","C"])
        return(base, np.array([0,0,1,0]))

In [46]:
def weight_adder(weight):

    s = 1 - round(sum(weight),10)
    return([s] + weight)

In [47]:
def error_simulator(seq, normal_weights, poly_weights, dimer_weights, trimer_weights, gc_weights = None):

    if type(seq) == Bio.SeqRecord.SeqRecord:
        seq = seq.seq

    poly_indexes = homopolymer_finder(seq)
    dimer_indexes = repeat_finder(seq, 2)
    trimer_indexes = repeat_finder(seq, 3)
    error_counter = np.array([0,0,0,0])

    if gc_weights != None:
        gc = round(gc_fraction(seq),2)
        if gc in gc_weights:
            gc_weights = gc_weights[gc]
        else:
            gc_weights = normal_weights
        gc_weights = weight_adder(gc_weights)

    normal_weights = weight_adder(normal_weights)
    poly_weights = weight_adder(poly_weights)
    dimer_weights = weight_adder(dimer_weights)
    trimer_weights = weight_adder(trimer_weights)
    #print(normal_weights)
    
    new_seq = ""
    for i in range(len(seq)):
        if i in poly_indexes:
            x = base_error(seq[i], poly_weights)
            new_seq += x[0]
            error_counter += x[1]
        elif i in dimer_indexes:
            x = base_error(seq[i], dimer_weights)
            new_seq += x[0]
            error_counter += x[1]
        elif i in trimer_indexes:
            x = base_error(seq[i], trimer_weights)
            new_seq += x[0]
            error_counter += x[1]
        else:
            if gc_weights == None:
                x = base_error(seq[i], normal_weights)
                new_seq += x[0]
                error_counter += x[1]
            else:
                x = base_error(seq[i], gc_weights)
                new_seq += x[0]
                error_counter += x[1]

    return(new_seq, error_counter)

In [48]:
def error_simulator_multiple(seq_list, normal_weights, poly_weights, dimer_weights, trimer_weights, gc_weights = None):

    error_counter = np.array([0,0,0,0])
    final_list = []
    for sequence in seq_list:
        results = error_simulator(sequence, normal_weights, poly_weights, dimer_weights, trimer_weights, gc_weights)
        new_seq = Bio.SeqRecord.SeqRecord(Bio.Seq.Seq(results[0]), id= sequence.id, name = sequence.name, description= sequence.description)
        final_list.append(new_seq)
        error_counter += results[1]

    return(final_list, error_counter)

In [49]:
MinION_w = [[0.0144,0.0097,0.0111,0.0181],
            [0.0122,0.0083,0.0137,0.0485],
            [0.0076,0.0056,0.0107,0.0322],
            [0.0035,0.0024,0.0058,0.0149]]
NovaSeq_w = [[0.0013,0.0054,0.00004,0.00001],
             [0.0012,0.0048,0.00017,0.0002],
             [0.0003,0.0020,0,0.00007],
             [0.00006,0.0003,0,0.00003]]
IonTorrent_w = [[0.0012,0.0012,0.0024,0.0011],
                [0.0012,0.0016,0.0061,0.0102],
                [0.0002,0.0002,0.0012,0.0002],
                [0.00006,0.00005,0.0002,0.0003]]
PacBio_w = [[0.0026,0.0016,0.0015,0.0128],
            [0.0057,0.0030,0.0138,0.0473],
            [0.0002,0.0001,0.0012,0.0016],
            [0.0001,0.0004,0.0119,0.0013]]

In [108]:
sub1 = [[0.0025,0.0025,0,0],
        [0.0025,0.0025,0,0],
        [0.0025,0.0025,0,0],
        [0.0025,0.0025,0,0]]
sub2 = [[0.005,0.005,0,0],
        [0.005,0.005,0,0],
        [0.005,0.005,0,0],
        [0.005,0.005,0,0]]
sub3 = [[0.075,0.075,0,0],
        [0.075,0.075,0,0],
        [0.075,0.075,0,0],
        [0.075,0.075,0,0]]

In [51]:
ins1 = [[0,0,0.015,0],
        [0,0,0.015,0],
        [0,0,0.015,0],
        [0,0,0.015,0]]

In [52]:
del1 = [[0,0,0,0.015],
        [0,0,0,0.015],
        [0,0,0,0.015],
        [0,0,0,0.015]]

In [89]:
test = [[0,0,0,0.0],
        [0,0,0,0.0],
        [0,0,0,0.0],
        [0,0,0,0.0]]

In [30]:
random.seed(2468)
error_counter = np.array([0,0,0,0])
for i in range(1,41):
    name = str(i) + ".fasta"
    if i <= 10:
        x = error_simulator_multiple(datas, MinION_w[0],MinION_w[1],MinION_w[2],MinION_w[3])
        error_counter += x[1]
        if i == 10:
            print((error_counter/10))
            error_counter = np.array([0,0,0,0])
    elif i <= 20:
        x = error_simulator_multiple(datas, NovaSeq_w[0],NovaSeq_w[1],NovaSeq_w[2],NovaSeq_w[3])
        error_counter += x[1]
        if i == 20:
            print((error_counter/10))
            error_counter = np.array([0,0,0,0])
    elif i <= 30:
        x = error_simulator_multiple(datas, IonTorrent_w[0],IonTorrent_w[1],IonTorrent_w[2],IonTorrent_w[3])
        error_counter += x[1]
        if i == 30:
            print((error_counter/10))
            error_counter = np.array([0,0,0,0])
    else:
        x = error_simulator_multiple(datas, PacBio_w[0],PacBio_w[1],PacBio_w[2],PacBio_w[3])
        error_counter += x[1]
        if i == 40:
            print((error_counter/10))
            error_counter = np.array([0,0,0,0])
    SeqIO.write(x[0], name, "fasta")

[1479.7  992.3 1132.7 1849.8]
[137.3 565.5   4.1   1.3]
[126.  129.3 244.4 113.6]
[ 262.3  163.7  162.3 1311.7]
