# utils

In [None]:
#| default_exp utils

In [None]:
#| export
import gzip
import itertools
import os
import csv
from dgrec import pairwise2
from dgrec.pairwise2 import format_alignment
from Bio.Align import PairwiseAligner
from Bio.Seq import Seq



In [None]:
#| export
def align2mut(align):
    """Converts a sequence alignment result from Bio.pairwise2.Align.globalms into a list of mutations.
    Positions are those of the alignment."""
    res=[]
    for i in range(align.end):
        if align.seqA[i]!=align.seqB[i]:
            mut=(align.seqA[i],i,align.seqB[i])
            res.append(mut)
    return res

In [None]:
seqA = "ATCCCGGCAGC"
seqB = "ATCCACGGTCAGC"
align=pairwise2.align.globalms(seqA,seqB, 2, -1, -1, -.5, one_alignment_only=True)[0]
align2mut(align)

[('-', 4, 'A'), ('-', 8, 'T')]

In [None]:
#| export
def mut_rix(mutations):
    """Reindexes the positions of the mutations to go from 
    their position in the sequence alignment to their position in the original sequence."""
    ph=0
    res_rix=[]
    for mut in mutations:
        rix=mut[1]+ph
        res_rix.append((mut[0],rix,mut[2]))
        if mut[0]=='-':
            ph-=1
            
    return res_rix

In [None]:
seqA = "ATCCCGGCAGC"
seqB = "ATCCACGGTCAGC"
align=pairwise2.align.globalms(seqA,seqB, 2, -1, -1, -.5, one_alignment_only=True)[0]
print(format_alignment(*align))

mutations=align2mut(align) 
print("Output of align2mut:")
print(mutations)

print("Output of mut_rix:")
print(mut_rix(mutations))

ATCC-CGG-CAGC
|||| ||| ||||
ATCCACGGTCAGC
  Score=20

Output of align2mut:
[('-', 4, 'A'), ('-', 8, 'T')]
Output of mut_rix:
[('-', 4, 'A'), ('-', 7, 'T')]


In [None]:
#| export
def get_mutations(seqA,seqB):
    """Aligns two sequences and returns a genotype string.
    The string is a comma separated list of mutations.
    """
    align=pairwise2.align.globalms(seqA,seqB, 2, -1, -1, -.5, one_alignment_only=True)[0]
    mutations=align2mut(align) 
    mutations=mut_rix(mutations)
    return mutations

In [None]:
seqA = "ATCCGGCAGCAGGTCGTGAGC"
seqB = "ATCCACGGTCAGCACGTCGTGGC"
align=pairwise2.align.globalms(seqA,seqB, 2, -1, -1, -.5, one_alignment_only=True)[0]
print(format_alignment(*align))

get_mutations(seqA,seqB)

ATC--CGG-CAGCAGGTCGTGAGC
|||  ||| |||||.|||||| ||
ATCCACGGTCAGCACGTCGTG-GC
  Score=33.5



[('-', 3, 'C'), ('-', 3, 'A'), ('-', 6, 'T'), ('G', 11, 'C'), ('A', 18, '-')]

In [None]:
#| hide
def align_to_mut(alignment):
    muts=[]
    p=0
    for i in range(alignment.indices.shape[1]):
        if alignment.indices[0][i]==-1:
            muts.append(["-",p,alignment.query[alignment.indices[1][i]]])
        elif alignment.indices[1][i]==-1:
            p=alignment.indices[0][i]
            muts.append([alignment.target[alignment.indices[0][i]],p,"-"])
        elif alignment.target[alignment.indices[0][i]]!=alignment.query[alignment.indices[1][i]]:
            p=alignment.indices[0][i]
            muts.append([alignment.target[alignment.indices[0][i]],p,alignment.query[alignment.indices[1][i]]])
        else:
            p=alignment.indices[0][i]
    return muts

In [None]:
#| hide
aligner=PairwiseAligner()
aligner.mode = 'global'
aligner.match_score = 2
aligner.mismatch_score = -1
aligner.open_gap_score = -1
aligner.extend_gap_score = -0.5

alignments = aligner.align(seqA, seqB)
alignment = alignments[0]

print(alignment)
align_to_mut(alignment)

target            0 ATC--CGG-CAGCAGGTCGTGAGC 21
                  0 |||--|||-|||||.||||||-|| 24
query             0 ATCCACGGTCAGCACGTCGTG-GC 23



[['-', 2, 'C'], ['-', 2, 'A'], ['-', 5, 'T'], ['G', 11, 'C'], ['A', 18, '-']]

In [None]:
#| hide

aligner=PairwiseAligner()
aligner.mode = 'global'
aligner.match_score = 2
aligner.mismatch_score = -1
aligner.open_gap_score = -1
aligner.extend_gap_score = -0.5

def get_mutations_new(seqA,seqB):
    """Aligns two sequences and returns a genotype string.
    The string is a comma separated list of mutations.
    This implementation is much slower than the pairwise2 implementation.
    """
    alignments = aligner.align(seqA, seqB)
    align = alignments[0]
    mutations=align_to_mut(align) 
    return mutations

In [None]:
#| hide
seqA = "ATCCGGCAGCAGGTCGTGAGC"
seqB = "ATCCACGGTCAGCACGTCGTGGC"
get_mutations(seqA,seqB)

[('-', 3, 'C'), ('-', 3, 'A'), ('-', 6, 'T'), ('G', 11, 'C'), ('A', 18, '-')]

In [None]:
#| export
def mut_to_str(mutations: list):
    """Converts list of mutations to a comma separated string"""
    mut_str_list=[''.join(map(str,mut)) for mut in mutations]
    mut_str=','.join(mut_str_list)
    return mut_str

In [None]:
mut_to_str([('-', 3, 'C'), ('-', 3, 'A'), ('-', 6, 'T'), ('G', 11, 'C'), ('A', 18, '-')])

'-3C,-3A,-6T,G11C,A18-'

In [None]:
#| export
def str_to_mut(gen: str):
    """Converts genotype string to a list of mutations"""
    
    mutations=[]
    if gen=="":
        return mutations
    else:
        g=gen.split(',')
        for mut in g:
            mut_from=mut[0]
            ix=int(mut[1:-1])
            mut_to=mut[-1]
            mutations.append([mut_from,ix,mut_to])

        return mutations

In [None]:
assert(str_to_mut('')==[])
str_to_mut('-4A,-7T,G12C')


[['-', 4, 'A'], ['-', 7, 'T'], ['G', 12, 'C']]

In [None]:
#| export
def genstr_to_seq(genstr,refseq):
    j=0
    seq=''
    for mut in str_to_mut(genstr):
        tb, i, qb = mut
        seq+=refseq[j:i]
        if tb=="-":
            seq+=qb
            j=i
        elif qb=="-":
            j=i+1
            pass
        else:
            seq+=qb
            j=i+1

    seq+=refseq[j:]

    return seq

In [None]:
genstr='-3C,-3A,-6T,G11C,A18-'
genstr_to_seq(genstr,seqA)

'ATCCACGGTCAGCACGTCGTGGC'

In [None]:
#| hide
seqA = "ATCCGGCAGCAGGTCGTGAGC"
seqB = "ATCCACGGTCAGCACGTCGTGGC"
genstr=mut_to_str(get_mutations(seqA,seqB))
assert(genstr_to_seq(genstr,seqA)==seqB)

seqA = "AGCGCTATGCTGCGCGCGTACTGCCGCTAGCTATGCTCAGGCCGATATATGCGAGC"
seqB = "AGCGCTAGCATGGTGCGCGCGTACTGCAGCTAGCTATGCAGGCCGATATATGCAAGC"
genstr=mut_to_str(get_mutations(seqA,seqB))
assert(genstr_to_seq(genstr,seqA)==seqB)

In [None]:
#| export
def get_prot_mut(genstr,refseq):
    mut_seq=genstr_to_seq(genstr,refseq)
    mut_prot=Seq(mut_seq).translate()
    ref_prot=Seq(refseq).translate()
    return mut_to_str(get_mutations(ref_prot,mut_prot))


In [None]:
seqA = "AGCGCTATGCTGCGCGCGTACTGCCGCTAGCTATGCTCAGGCCGATATATGCGAGCA"
seqB = "AGCGCTATGCTGCGCGCGCACTGCCGCAAGCTATGCGCAGGCAGATATATGCGAGCA"
mut_to_str(get_mutations(seqA,seqB))

'T18C,T27A,T36G,C42A'

In [None]:
get_prot_mut('T18C,T27A,T36G,C42A',seqA)

'Y6H,*9K,S12A'

In [None]:
#| export
def parse_genotypes(genotypes_file):
    gen_list=[]
    with open(genotypes_file,"r") as handle: 
        reader = csv.reader(handle, delimiter='\t')
        for row in reader:
            gen_list.append((row[0],int(row[1])))
    return gen_list

In [None]:
from dgrec.example_data import get_example_data_dir

In [None]:
data_path=get_example_data_dir()
gen_list=parse_genotypes(os.path.join(data_path,"sacB_genotypes.csv"))
for g,n in itertools.islice(gen_list,30,40):
    print(n,"\t",g)

20 	 A72G,A79G
19 	 A72G,A79T,A91G
17 	 T67G,A91G
17 	 A76G,A79T
17 	 A68C,A72G
17 	 A111G
16 	 A68G,A91G
16 	 A86G,A91T
15 	 A72G,A91T
15 	 A79G,A86G


In [None]:
#| export

def downsample_fastq_gz(input_file, output_file, num_reads=10000):
    """Downsamples a compressed FASTQ file to the specified number of reads.

    Args:
        input_file (str): Path to the input FASTQ.gz file.
        output_file (str): Path to the output FASTQ.gz file.
        num_reads (int, optional): Number of reads to keep. Defaults to 10000.
    """

    with gzip.open(input_file, 'rb') as infile, gzip.open(output_file, 'wb') as outfile:
        lines = itertools.islice(infile, num_reads * 4)  # Read 4 lines (1 read) at a time
        for line in lines:
            outfile.write(line)

In [None]:

input_file=os.path.join(data_path,"sacB_example.fastq.gz")
output_file="sacB_example_downsampled.fastq.gz"
downsample_fastq_gz(input_file, output_file, num_reads=100)



In [None]:
#| export
def get_basename_without_extension(file_path):
    """
    Extracts the basename of a file without the extension.

    Args:
        file_path (str): The path to the file.

    Returns:
        str: The basename of the file without the extension.
    """

    basename = os.path.basename(file_path)
    if '.' in basename:
        # Split at the last dot to remove the extension
        return basename.rsplit('.', 1)[0]
    else:
        # No extension, return the whole filename
        return basename

In [None]:
# Example usage
file_path = "C:/Users/John/Documents/my_file.txt"
basename_without_extension = get_basename_without_extension(file_path)
print(basename_without_extension)  # Output: my_file

my_file


In [None]:
#| hide
# Remove test files

# List all files in the directory
files = os.listdir()

# Iterate over the files
for file in files:
    if file.endswith(".gz"):
        
        try:
            # Delete the file
            os.remove(file)
        except PermissionError:
            print(f"Permission denied to delete file '{file}'.")
        except FileNotFoundError:
            print(f"File '{file}' not found.")

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()

In [None]:
#| export
def pickle_save(data_in,file_name_out):
    pickle_out = open(file_name_out,"wb")
    pickle.dump(data_in, pickle_out)
    pickle_out.close()
    


In [None]:
#| export
def pickle_load(file_name_in):
    pickle_in = open(file_name_in,"rb")
    data_out = pickle.load(pickle_in)
    return data_out

In [None]:
# | export
def reverse_complement(sequence):
    complement_dict = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C'}
    reverse_sequence = sequence[::-1]
    reverse_complement_sequence = ''.join(complement_dict[base] for base in reverse_sequence)
    return reverse_complement_sequence


In [None]:
# | export
def make_dgr_oligos(target:str #TR DNA
                    ,split_number:int #Number of desired splits
                    ):
    "Split the TR target into the input number and then generates the oligos to order"
    bad_overhangs=['AATT', 'ATAT', 'TATA', 'TTAA', 'ACGT', 'CATG', 'CTAG', 'GATC', 'GTAC', 'TCGA', 'TGCA', 'CCGG', 'CGCG', 'GCGC', 'GGCC']
    target=target.upper()
    split=len(target)//split_number
    overhang_list=[]
    split_k_list=[]
    forward_list=[]
    reverse_list=[]
    full_list=[]


    
    for k in range(1,split_number):
        split_k=k*split
        overhang = target[split_k-2:split_k+2]
        
        while overhang in bad_overhangs+['ATAA','TCAG']:
            if split_k%2 == 0:
                split_k += 1
                print('+1')
            elif split_k%2 == 1:
                split_k += -1
                print('-1')
            overhang = target[split_k-2:split_k+2]
        overhang_list.append(overhang)
        split_k_list.append(split_k)

    forward_list.append('ATAA'+target[:split_k_list[0]-2])
    reverse_list.append(reverse_complement(target[:split_k_list[0]+2]))

    for j in range (len(split_k_list)-1):
        forward_list.append(target[split_k_list[j]-2:split_k_list[j+1]-2])
        reverse_list.append(reverse_complement(target[split_k_list[j]+2:split_k_list[j+1]+2]))


    forward_list.append(target[split_k_list[-1]-2:])
    reverse_list.append('CAGA'+reverse_complement(target[split_k_list[-1]+2:]))

    for i in range(split_number):
        full_list.append(forward_list[i])
        full_list.append(reverse_list[i])
            
    return(full_list)

In [None]:
target='CCTCAGATACAAGCCGGCATAAATAATAACATATTCTATGACCATGATAATAGTGTAGGTGCAAACGCCAACGCTAAAAACACTGGAACCATGAACGGTAATACTGCAGGGACGAATATAGCCAAAACTTCT'
make_dgr_oligos(target,4)

['ATAACCTCAGATACAAGCCGGCATAAATAATAACA',
 'AATATGTTATTATTTATGCCGGCTTGTATCTGAGG',
 'TATTCTATGACCATGATAATAGTGTAGGTGCAA',
 'GCGTTTGCACCTACACTATTATCATGGTCATAG',
 'ACGCCAACGCTAAAAACACTGGAACCATGAACG',
 'TTACCGTTCATGGTTCCAGTGTTTTTAGCGTTG',
 'GTAATACTGCAGGGACGAATATAGCCAAAACTTCT',
 'CAGAAGAAGTTTTGGCTATATTCGTCCCTGCAGTA']