In [1]:
import primer3
from Bio import SeqIO
from Bio.Seq import Seq
import re
from matplotlib import pyplot as plt
from typing import Dict, List
root_dir="/home/lshas17/"
import pandas as pd
from itertools import product
from sys import path
path.insert(1, f'{root_dir}/HandyAmpliconTool/scripts/')
from data_classes import BlastResult, PrimerPair, Primer

primer3 is a package in primer3-py - a python wrapper for primer3 engine

https://libnano.github.io/primer3-py

https://github.com/primer3-org/primer3

Load reference data and existing primers

In [2]:
#Load reference fasta file
ref_seq=""
for seq in SeqIO.parse(f'{root_dir}/HandyAmpliconTool/test_data/inputs/GCA_000195995.1_for_VCFs.fasta', "fasta"):
    if seq.id=="AL513382_143N_pHCM1_120N_pHCM2":
        ref_seq=seq

#Load existing primers from fasta file
existing_primers={}
for record in SeqIO.parse(f'{root_dir}/HandyAmpliconTool/test_data/inputs/existing_primers.fasta', "fasta"):
    existing_primers[record.id]=str(record.seq)


Load the VCF containing the genotype and species defining SNPS

In [3]:

vcf_file_name=f'{root_dir}/HandyAmpliconTool/test_data/outputs/2_3_1/snps.vcf'
header_rows=0
for line in open(vcf_file_name):
    if line[0]=="#":
        header_rows+=1
vcf_data=pd.read_csv(vcf_file_name, sep="\t", header=0, skiprows=header_rows-1) # -1 because header row also starts with #CHROM



Load functions that create and process inputs from the primer3 functions

In [4]:
def add_fixed_primer(for_seq: str, rev_seq:str, template: str):
    global_args={
        'PRIMER_OPT_SIZE': 20,
        'PRIMER_OPT_TM': 60.0,
        'PRIMER_MIN_TM': 50.0,
        'PRIMER_MAX_TM': 70.0,
        'PRIMER_PRODUCT_SIZE_RANGE': f'100-{len(template)}'}
    if for_seq!=None:
        global_args['SEQUENCE_PRIMER']=for_seq
    if rev_seq!=None:
        global_args['SEQUENCE_PRIMER_REVCOMP']=rev_seq
    return global_args

def has_homodimers(forward_seq, reverse_seq) -> bool:
    thermoresult_forward=primer3.bindings.calc_homodimer(forward_seq)
    thermoresult_reverse=primer3.bindings.calc_homodimer(reverse_seq)
    if (thermoresult_forward.structure_found and thermoresult_forward.tm>10) or \
            (thermoresult_reverse.structure_found and thermoresult_reverse.tm>10):
        return True
    else:
        return False 
def check_heterodimers(forward_seq, reverse_seq):
    global existing_primers
    total_structures=0
    for primer in existing_primers:
        if primer.find("Forward")>-1:
            forward_result=primer3.bindings.calc_heterodimer(reverse_seq, existing_primers[primer])
            total_structures+=1 if forward_result.structure_found and forward_result.tm>10 else 0
        else:
            reverse_result=primer3.bindings.calc_heterodimer(forward_seq, existing_primers[primer])
            total_structures+=1 if reverse_result.structure_found and reverse_result.tm>10 else 0
    return total_structures

def process_p3_output(output: Dict, id_prefix: str) -> List[PrimerPair]:
    primer_pairs=len(output["PRIMER_PAIR"])
    result: List[PrimerPair]=[]
    for i in range(0,primer_pairs):
        forward=Primer(seq=output[f'PRIMER_LEFT_{str(i)}_SEQUENCE'],
                       g_c=output[f'PRIMER_LEFT_{str(i)}_GC_PERCENT'],
                       t_m=output[f'PRIMER_LEFT_{str(i)}_TM'])
        reverse=Primer(seq=output[f'PRIMER_RIGHT_{str(i)}_SEQUENCE'],
                       g_c=output[f'PRIMER_RIGHT_{str(i)}_GC_PERCENT'],
                       t_m=output[f'PRIMER_RIGHT_{str(i)}_TM'])
        new_pair=PrimerPair(name=id_prefix, forward=forward, reverse=reverse)
        new_pair.penalty=output[f'PRIMER_PAIR_{str(i)}_PENALTY']
        if not has_homodimers(forward.seq, reverse.seq) and check_heterodimers(forward.seq, reverse.seq)==0:
            result.append(new_pair)
            #output[f'PRIMER_PAIR_{str(i)}_PRODUCT_SIZE']
            print(f'LEFT: {output["PRIMER_LEFT_EXPLAIN"]}')
            print(f'RIGHT: {output["PRIMER_RIGHT_EXPLAIN"]}')
            print(f'PAIR: {output["PRIMER_PAIR_EXPLAIN"]}')
            print("\n")
    return result

In [6]:
unique_new_primer_pairs: List[PrimerPair]=[]
existing_sequences=set()
for pair in new_primer_pairs:
    if pair.forward.seq not in existing_sequences and pair.reverse.seq not in existing_sequences:
        unique_new_primer_pairs.append(pair)
        existing_sequences.add(pair.forward.seq)
        existing_sequences.add(pair.reverse.seq)

In [13]:
min_penalty_pairs[key].name


'2.3.1_4105815_both_fixed'

In [14]:
with open(f'{root_dir}/HandyAmpliconTool/test_data/outputs/temp.tsv', "w") as output:
    output.write("Name\tStart\tEnd\tLength\tForward\tTm\tReverse\tTm\n")
    for i, key in enumerate(min_penalty_pairs):
        pair=min_penalty_pairs[key]
        output.write( "\t".join( [ str(f) for f in [min_penalty_pairs[key].name,
                       pair.forward.ref_start,
                       pair.reverse.ref_end,
                       pair.length,
                       pair.forward.seq,
                       pair.forward.t_m,
                       pair.reverse.seq,
                       pair.reverse.t_m,
                       key
                       ]  ] ) +"\n")
                     


In [7]:
#check primers vs reference with existing primers
pairs_to_take=10

with open(f'{root_dir}/HandyAmpliconTool/test_data/inputs/temp.fasta', "w") as temp_fasta:
    for key in existing_primers:
        temp_fasta.write(f'>{key}'+"\n")
        temp_fasta.write(f'{existing_primers[key]}'+"\n")
    #prioritise the pairs with both ends on Serovar SNPs
    serovar_both_sides=sorted( [f for f in unique_new_primer_pairs if f.name.find("both_fixed")>-1], key=lambda x: x.penalty)

    min_penalty_pairs=sorted( unique_new_primer_pairs, key=lambda x: x.penalty)[0: min(len(unique_new_primer_pairs), pairs_to_take)] + \
                        serovar_both_sides[0: min(len(unique_new_primer_pairs), pairs_to_take)]
    min_penalty_pairs=dict([ (pair.uuid, pair) for pair in min_penalty_pairs] )

    for pair_uuid, pair in min_penalty_pairs.items():
        temp_fasta.write(f'>{pair.uuid}_Forward'+"\n")
        temp_fasta.write(pair.forward.seq +"\n")
        temp_fasta.write(f'>{pair.uuid}_Reverse'+"\n")
        temp_fasta.write(pair.reverse.seq+"\n")

   
sensitive_search=!/home/lshas17/utilities/quickblast.sh /home/lshas17/HandyAmpliconTool/test_data/inputs/temp.fasta ~/HandyAmpliconTool/test_data/inputs/GCA_000195995.1_for_VCFs.fasta 5
blast_results: List[BlastResult]=[]
for i, item in enumerate(sensitive_search):
    new_result=BlastResult.from_blast_line(item)
    blast_results.append(new_result)
print(blast_results)
with open(f'{root_dir}/HandyAmpliconTool/test_data/inputs/blast_results.tsv', "w") as output:
    ##Check the blat results to idenify primers from outside pair that allign within the pair product
    for i, result_outer in enumerate(blast_results):
        outer_seq={"uuid": result_outer.qseqid.replace("_Forward","").replace("_Reverse",""), 
                    "orientation": "forward" if result_outer.qseqid.find("Forward")>-1 else "reverse"}
        # output.write("\t".join( [str(f) for f in [result_outer.sseqid, result_outer.sstart, result_outer.send, result_outer.qseqid]]  )+"\n" )
        # continue

        for j, result_inner in enumerate(blast_results[i+1:]):
            inner_seq={"uuid": result_inner.qseqid.replace("_Forward","").replace("_Reverse",""), 
                    "orientation": "forward" if result_inner.qseqid.find("Forward")>-1 else "reverse"}
            #conditions that need to be met:
            #1. length between matches < maximum interval between primers
            #2. the aligned primers are not the same sequence
            if outer_seq["uuid"] not in min_penalty_pairs:
                continue
            else:
                if outer_seq["orientation"]=="forward":
                    length=min_penalty_pairs[outer_seq["uuid"]].forward.length
                    if result_outer.send-result_outer.sstart==length-1:
                        min_penalty_pairs[outer_seq["uuid"]].forward.ref_start=result_outer.sstart+1
                else:
                    length=min_penalty_pairs[outer_seq["uuid"]].reverse.length
                    if result_outer.sstart-result_outer.send==length-1:
                        min_penalty_pairs[outer_seq["uuid"]].reverse.ref_start=result_outer.sstart+1



                #     if (result_outer.send-result_outer.sstart)>min_penalty_pairs[outer_seq["uuid"]].forward.ref_end-min_penalty_pairs[outer_seq["uuid"]].forward.ref_start:
                #         min_penalty_pairs[outer_seq["uuid"]].forward.ref_start=result_outer.ref_start
                #         min_penalty_pairs[outer_seq["uuid"]].forward.ref_end=result_outer.ref_end
                # else:
                #     if (result_outer.send-result_outer.sstart)>min_penalty_pairs[outer_seq["uuid"]].reverse.ref_end-min_penalty_pairs[outer_seq["uuid"]].reverse.ref_start:
                #         min_penalty_pairs[outer_seq["uuid"]].reverse.ref_start=result_outer.ref_start
                #         min_penalty_pairs[outer_seq["uuid"]].reverse.ref_start=result_outer.ref_end

                # if outer_seq["uuid"]=="11215168-7bc8-4f7f-8a26-eecfe75e441c" or inner_seq["uuid"]=="11215168-7bc8-4f7f-8a26-eecfe75e441c":
                #     a=1            
                # if outer_seq["orientation"]==inner_seq["orientation"]:
                #     #both sequences amplify in same direction, so they can't create a amplification product
                #     continue
                # elif (outer_seq["orientation"]=="forward" and result_outer.is_flipped) or \
                #             (outer_seq["orientation"]=="reverse" and not result_outer.is_flipped) or \
                #             (inner_seq["orientation"]=="forward" and result_inner.is_flipped) or \
                #             (inner_seq["orientation"]=="reverse" and not result_inner.is_flipped) :
                #     #at least one of the two sequences don't align in the correct orientation, so they can't create a amplification product
                #     continue
                # elif abs(result_outer.sstart-result_inner.sstart)<interval_len:
                #     #sequences align to correct strands and are close enough to interact
                #     output.write("\t".join(  ["NC_003198.1", str(result_outer.sstart), str(result_outer.send), result_outer.qseqid] ) +"\n"  )
                #     output.write("\t".join(  ["NC_003198.1", str(result_inner.sstart), str(result_inner.send), result_inner.qseqid] ) +"\n"  )
                
# with open(f'{root_dir}/HandyAmpliconTool/test_data/inputs/blast_results.tsv', "w") as output:
#     for id, pair in min_penalty_pairs.items():
#         output.write(pair.value+"\n")
    

[<data_classes.BlastResult object at 0x7f4fb00e8fd0>, <data_classes.BlastResult object at 0x7f4f5bdb93f0>, <data_classes.BlastResult object at 0x7f4f5bdb9210>, <data_classes.BlastResult object at 0x7f4f5bdb9240>, <data_classes.BlastResult object at 0x7f4f5bdb9300>, <data_classes.BlastResult object at 0x7f4f5bdb92d0>, <data_classes.BlastResult object at 0x7f4f5bdb92a0>, <data_classes.BlastResult object at 0x7f4f5bdbb3d0>, <data_classes.BlastResult object at 0x7f4f5bdb9360>, <data_classes.BlastResult object at 0x7f4f5bdb8c70>, <data_classes.BlastResult object at 0x7f4f5bdb93c0>, <data_classes.BlastResult object at 0x7f4f5bdb9390>, <data_classes.BlastResult object at 0x7f4f5bdb8c10>, <data_classes.BlastResult object at 0x7f4f5bdbb3a0>, <data_classes.BlastResult object at 0x7f4f5bdbb370>, <data_classes.BlastResult object at 0x7f4f5bdbb340>, <data_classes.BlastResult object at 0x7f4f5bdbb310>, <data_classes.BlastResult object at 0x7f4f5bdbb2e0>, <data_classes.BlastResult object at 0x7f4f5bd

In [5]:
target_lineage="2.3.1"
interval_len=1500
min_interval_len=200
target_lineage_index=vcf_data.index[ [f for f in vcf_data.index if vcf_data.loc[f, "ID"].find(target_lineage)>-1]  ]

new_primer_pairs: List[PrimerPair] = []

for index in target_lineage_index:
    snp_position=vcf_data.loc[index, "POS"]
    valid_range=range(snp_position-interval_len, snp_position+interval_len)
    interval_indices=[ f for f in vcf_data.index if vcf_data.loc[f, "POS"] in valid_range ]

    interval_left_serovar_snps=[f for f in interval_indices if vcf_data.loc[f,"ID"].find("Serovar")>-1 and f<index ]
    interval_right_serovar_snps=[f for f in interval_indices if vcf_data.loc[f,"ID"].find("Serovar")>-1 and f>index]
    if len(interval_left_serovar_snps)>0 and len(interval_right_serovar_snps)>0:
        print(f'SNP at {snp_position} has serovar SNPs left and right of genotype SNP')
    elif len(interval_right_serovar_snps)>0:
        print(f'SNP at {snp_position} has serovar SNPs right of genotype SNP')
    elif len(interval_left_serovar_snps):
        print(f'SNP at {snp_position} has serovar SNPs left of genotype SNP')
    else:
        print(f'SNP at {snp_position} has no serovar SNPs within {str(interval_len)} region')


#four three cases: 
# 1 - left and right serovar SNPs anchored primers
# 2 - left serovar anchored primer
# 3 - right serovar anchored primer
# 4 - neither side has serovar primer, ignore this kind of SNP

    for left_snp_index, right_snp_index in product(interval_left_serovar_snps, interval_right_serovar_snps):
        print("fixed")
        #case 1
        right_snp_pos=vcf_data.loc[right_snp_index,"POS"]
        left_snp_pos=vcf_data.loc[left_snp_index,"POS"]
        if right_snp_pos-left_snp_pos>interval_len or right_snp_pos-left_snp_pos<min_interval_len: #distance between SNPs is too long:
            print(f'SNPs too far apart: {left_snp_pos} to {right_snp_pos} vs max len {str(interval_len)}')
            continue
        forward=ref_seq[left_snp_pos-1:left_snp_pos+20]
        reverse=ref_seq[right_snp_pos-20:right_snp_pos]
        seq_args={"SEQUENCE_ID":f'{target_lineage}_{str(snp_position)}', 
                "SEQUENCE_TEMPLATE": str(ref_seq[left_snp_pos-1:right_snp_pos].seq),
                "SEQUENCE_INCLUDED_REGION": [0, right_snp_pos-left_snp_pos+1 ]}
        #add "fixed" to name to show that this pair has both sides fixed
        global_args=add_fixed_primer(for_seq=str(forward.seq),rev_seq=str(reverse.seq.reverse_complement()), template=seq_args["SEQUENCE_TEMPLATE"])
        primers=primer3.bindings.design_primers(seq_args=seq_args, global_args=global_args)
        new_primer_pairs += process_p3_output(primers, id_prefix=f'{target_lineage}_{snp_position}_both_fixed')

    for right_snp_index in interval_right_serovar_snps:
        print("right")
        right_snp_pos=vcf_data.loc[right_snp_index,"POS"]
        reverse=ref_seq[right_snp_pos-20:right_snp_pos]
        seq_args={"SEQUENCE_ID":f'{target_lineage}_{str(snp_position)}', 
                "SEQUENCE_TEMPLATE": str(ref_seq[right_snp_pos-interval_len:right_snp_pos].seq),
                "SEQUENCE_INCLUDED_REGION": [0, interval_len ]}
        global_args=add_fixed_primer(for_seq=None,rev_seq=str(reverse.seq.reverse_complement()),  template=seq_args["SEQUENCE_TEMPLATE"])
        primers=primer3.bindings.design_primers(seq_args=seq_args, global_args=global_args)
        new_primer_pairs += process_p3_output(primers, id_prefix=f'{target_lineage}_{snp_position}_right_fixed')

    for left_snp_index in interval_left_serovar_snps:
        print("left")
        left_snp_pos=vcf_data.loc[left_snp_index,"POS"]
        forward=ref_seq[left_snp_pos-1:left_snp_pos+20] #-1 due to VCF being 1 indexed
        seq_args={"SEQUENCE_ID":f'{target_lineage}_{str(snp_position)}', 
                "SEQUENCE_TEMPLATE": str(ref_seq[left_snp_pos-20:left_snp_pos+interval_len].seq),
                "SEQUENCE_INCLUDED_REGION": [0, interval_len ]}
        global_args=add_fixed_primer(for_seq=str(forward.seq), rev_seq=None, template=seq_args["SEQUENCE_TEMPLATE"])
        primers=primer3.bindings.design_primers(seq_args=seq_args, global_args=global_args)
        new_primer_pairs += process_p3_output(primers, id_prefix=f'{target_lineage}_{snp_position}_both_fixed')

#sometimes the same pair of F/R sequences is generated multiple times because there are multiple species SNPs around same genotype SNP
unique_new_primer_pairs: List[PrimerPair]=[]
existing_sequences=set()
for pair in new_primer_pairs:
    sequences=pair.forward.seq+"_"+pair.reverse.seq
    if sequences not in existing_sequences:
        unique_new_primer_pairs.append(pair)
        existing_sequences.add(sequences)


SNP at 27750 has serovar SNPs left and right of genotype SNP
fixed
right
left
SNP at 34652 has serovar SNPs left and right of genotype SNP
fixed
fixed
fixed
fixed
fixed
fixed
fixed
fixed
right
right
LEFT: considered 12207, GC content failed 1499, low tm 1913, high tm 116, high hairpin stability 2, long poly-x seq 262, ok 8415
RIGHT: considered 1, ok 1
PAIR: considered 5, ok 5


right
LEFT: considered 12207, GC content failed 1499, low tm 1913, high tm 116, high hairpin stability 2, long poly-x seq 262, ok 8415
RIGHT: considered 1, ok 1
PAIR: considered 5, ok 5


right
LEFT: considered 12479, GC content failed 1651, low tm 2090, high tm 142, high hairpin stability 1, long poly-x seq 223, ok 8372
RIGHT: considered 1, ok 1
PAIR: considered 5, ok 5


left
left
SNP at 36076 has serovar SNPs left of genotype SNP
left
LEFT: considered 1, ok 1
RIGHT: considered 13570, GC content failed 1126, low tm 1794, high tm 417, high hairpin stability 1, long poly-x seq 64, ok 10168
PAIR: considered 5, ok

In [7]:
len(new_primer_pairs)

15850

In [9]:
interval_len=1000
new_primer_pairs: List[PrimerPair] = []
rev_straight="TGAGTCCGGTAAAACGAGCTC"
rev_seq=Seq(rev_straight[-20:]).reverse_complement()
#rev_straight=rev_straight[-20:]
for_straight="TGCAAGCTGCTTAGTGATCGA"
#for_straight=for_straight[0:20]
rev_pos=str(ref_seq.seq).find(str(rev_seq))+len(rev_straight)
for_pos=str(ref_seq.seq).find(for_straight)

seq_args={"SEQUENCE_ID":f'parC', 
        "SEQUENCE_TEMPLATE": str(ref_seq[for_pos-1:rev_pos+200].seq),
        "SEQUENCE_INCLUDED_REGION": [0, rev_pos-for_pos+200 ]}

global_args=add_fixed_primer(for_seq=for_straight[0:20],rev_seq=str(rev_seq.reverse_complement()),  template=seq_args["SEQUENCE_TEMPLATE"])
primers=primer3.bindings.design_primers(seq_args=seq_args, global_args=global_args)
new_primer_pairs += process_p3_output(primers, id_prefix=f'parC_left_fixed')
print(primers["PRIMER_PAIR_0_PRODUCT_SIZE"])

LEFT: considered 1, ok 1
RIGHT: considered 1, ok 1
PAIR: considered 1, ok 1


841


In [21]:
with open(f'{root_dir}HandyAmpliconTool/test_data/outputs/new_primer.tsv', "w") as output:
    for pair in min_penalty_pairs:
        output.write(pair.value+"\n")
        print(pair.penalty)


0.5512316707744276
0.5534699146223829
0.5569654140309126
0.6253644260896749
0.6257255372350414
