In [14]:

# The holy grail

from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio import SeqIO
import pandas as pd
import os
from typing import List, Dict, Union
import requests
import json
import re
from pydna.design import primer_design
from pydna.dseqrecord import Dseqrecord



def open_gff3_files(path: str = "") -> List[List[str]]:
    """
    Opens and reads a GFF3 file and returns its contents as a list of lists.

    Parameters:
    -----------
    path: str
        The path to the GFF3 file.

    Returns:
    --------
    List[List[str]]
        A list of lists containing the contents of the GFF3 file.
    """
    with open(path, "r") as infile:
        LINES = []
        for line in infile:
            LINES.append(line[:].split("\t"))
        LINES = LINES[1:]

    return LINES

def read_gff_to_pd(path: str = "") -> pd.DataFrame:
    """
    Reads a GFF3 file and returns a pandas DataFrame with columns 'gene', 'start_pos', 'end_pos',
    and 'signal_peptide_likelyhood'.

    Parameters:
    -----------
    path : str
        The path to the GFF3 file.

    Returns:
    --------
    df : pandas.DataFrame
        A DataFrame with columns 'gene', 'start_pos', 'end_pos', and 'signal_peptide_likelyhood'.
    """

    gff = open_gff3_files(path)
    dict_of_signal_peptides = tidy_up_gff(gff)
    df = pd.DataFrame.from_records(dict_of_signal_peptides)

    return df

def read_gff3_and_fasta(gff3_path: str, fasta_path: str):
    """
    Reads the GFF3 and FASTA files and returns parsed data.
    If no GFF3 file is given, returns only the sequences.
    """
    sequences = list(SeqIO.parse(fasta_path, format='fasta'))
    
    if gff3_path:
        signal_pep = read_gff_to_pd(gff3_path)
        N_pos = signal_pep['end_pos'].to_list()
        genes_with_signal_peptides = signal_pep['gene'].to_list()
        return sequences, genes_with_signal_peptides, N_pos
    
    return sequences, None, None

def tidy_up_gff(lst_of_gff: list) -> list:
    """
    This function takes a list of GFF lines and returns a list of dictionaries,
    with each dictionary containing information on the signal peptides in the GFF file.

    Parameters:
    lst_of_gff (list): A list of GFF lines.

    Returns:
    list_of_peptides (list): A list of dictionaries, with each dictionary containing information on the signal peptides in the GFF file.
    """
    list_of_peptides = []

    for peptide in lst_of_gff:
        # Splitting the gene attribute to extract the first protein name
        gene_attribute_parts = peptide[0].split()
        first_protein_name = gene_attribute_parts[0]

        signal_peptides = {
            "gene": first_protein_name,
            "start_pos": int(peptide[3]), #was - 1,
            "end_pos": int(peptide[4]), #was + 1,
            "signal_peptide_likelyhood": peptide[5],
        }

        list_of_peptides.append(signal_peptides)

    return list_of_peptides

def process_sequences(sequences: List[SeqRecord], fusion: bool, genes_with_signal_peptides: List[str] = None, N_pos: List[int] = None) -> List[SeqRecord]:
    """
    Process sequences based on the fusion flag and signal peptide information.
    
    Args:
        sequences (List[SeqRecord]): List of input sequences.
        fusion (bool): Whether to process for fusion protein use.
        genes_with_signal_peptides (List[str], optional): List of gene IDs with signal peptides.
        N_pos (List[int], optional): List of positions indicating where the signal peptide ends.
    
    Returns:
        List[SeqRecord]: Processed sequences.
    """
    processed_seqs = []

    if fusion:
        if genes_with_signal_peptides and N_pos:
            for i, gene_id in enumerate(genes_with_signal_peptides):
                for seq in sequences:
                    if gene_id in seq.id:
                        processed_seq = seq.seq[(N_pos[i]+1) * 3:-3]
                        processed_seqs.append(SeqRecord(processed_seq, id=seq.id, description=seq.description))
                        break
        
        for seq in sequences:
            if not genes_with_signal_peptides or not any(gene_id in seq.id for gene_id in genes_with_signal_peptides):
                processed_seq = seq.seq[3:-3]
                processed_seqs.append(SeqRecord(processed_seq, id=seq.id, description=seq.description))
    else:
        processed_seqs = sequences

    return processed_seqs

def truncate_sequences(sequences, genes_with_signal_peptides=None, N_pos=None):
    """
    Truncates sequences based on signal peptide information or removes the first
    and last 3 nucleotides for sequences without signal peptides.

    Args:
        sequences (list): List of SeqRecord objects.
        genes_with_signal_peptides (list, optional): List of gene IDs with signal peptides.
        N_pos (list, optional): List of positions indicating where the signal peptide ends.

    Returns:
        list: List of truncated SeqRecord objects.
    """
    truncated_seqs = []

    # Ensure sequences is a list of SeqRecord objects
    print(f"Number of sequences received: {len(sequences)}")
    
    if not all(isinstance(seq, SeqRecord) for seq in sequences):
        raise TypeError("All items in 'sequences' should be instances of Bio.SeqRecord.")
    
    # Handle sequences with signal peptides
    if genes_with_signal_peptides and N_pos:
        for i, gene_id in enumerate(genes_with_signal_peptides):
            for seq in sequences:
                if gene_id in seq.id:
                    
                    truncated_seq = seq.seq[(N_pos[i]+1) * 3:-3]  # Seq object, not SeqRecord - This cleaving is done remove one additional codon to remove any potential unwanted protease cleavage. Additionally, the stop codon is also removed.
                    truncated_seqs.append(SeqRecord(truncated_seq, id=seq.id, description=seq.description))
                    break

    # Handle sequences that don't have a signal peptide
    for seq in sequences:
        if not genes_with_signal_peptides or not any(gene_id in seq.id for gene_id in genes_with_signal_peptides):
            truncated_seq = seq.seq[3:-3]  # Work on Seq object. This code is to remove start and stop codons if the sequence is to be used for e.g. a fusion protein.
            truncated_seqs.append(SeqRecord(truncated_seq, id=seq.id, description=seq.description))

    return truncated_seqs

def read_fasta(fasta_path):
    """
    Reads a FASTA file and returns a list of SeqRecord objects.
    """
    return list(SeqIO.parse(fasta_path, "fasta"))

def primer_ta_neb(primer1, primer2, conc=0.4, prodcode="phusion-0"):
    """
    Calls the NEB API to calculate the annealing temperature for a primer pair.
    """
    url = "https://tmapi.neb.com/tm/batch"
    seqpairs = [[primer1, primer2]]

    input_data = {"seqpairs": seqpairs, "conc": conc, "prodcode": prodcode}
    headers = {"content-type": "application/json"}
    res = requests.post(url, data=json.dumps(input_data), headers=headers)
    r = json.loads(res.content)

    if r["success"]:
        return r["data"][0]["ta"]
    else:
        print("Request failed:", r["error"][0])
        return None

def primer_tm_neb1(primer, conc=0.4, prodcode="phusion-0"):
    """
    Calls the NEB API to calculate the melting temperature for a single primer.
    """
    url = "https://tmapi.neb.com/tm/batch"
    seqpairs = [[primer]]

    input_data = {"seqpairs": seqpairs, "conc": conc, "prodcode": prodcode}
    headers = {"content-type": "application/json"}
    res = requests.post(url, data=json.dumps(input_data), headers=headers)
    r = json.loads(res.content)

    if r["success"]:
        return r["data"][0]["tm1"]
    else:
        print("Request failed:", r["error"][0])
        return None

def extract_cds_identifiers(cds_fasta):
    """
    Extracts the protein identifiers from CDS FASTA file headers.

    Args:
        cds_fasta (str): Path to the CDS FASTA file.

    Returns:
        dict: A dictionary with protein_id as the key and the full SeqRecord as the value.
    """
    cds_ids = {}
    pattern = r"\[protein_id=([A-Za-z0-9_]+\.\d+)\]"  # Match [protein_id=...]

    with open(cds_fasta) as handle:
        for record in SeqIO.parse(handle, "fasta"):
            match = re.search(pattern, record.description)
            if match:
                protein_id = match.group(1)
                cds_ids[protein_id] = record  # Store the full SeqRecord
            else:
                # If no protein ID, use the first term from the header
                first_term = record.id.split()[0]  # This should capture the core part of the ID
                cds_ids[first_term] = record

    return cds_ids

def generate_amplicons_with_names(truncated_sequences, target_tm=58, limit=10, cds_identifiers=None):
    """
    Generates primers with target melting temperature and names them using protein identifiers.
    
    Args:
        truncated_sequences: List of truncated sequences
        target_tm: Target melting temperature
        limit: Limit for primer length
        cds_identifiers: Dict of CDS identifiers from FASTA headers

    Returns:
        List of amplicons with appropriate primer names and list of gene names.
    """
    from pydna.design import primer_design
    from pydna.dseqrecord import Dseqrecord
    from pydna.dseq import Dseq

    amplicons = []
    gene_names = []

    for seq in truncated_sequences:
        # Convert Biopython SeqRecord to pydna Dseqrecord
        pydna_seq = Dseqrecord(Dseq(str(seq.seq)), linear=True)
        
        # Find the correct protein ID from cds_identifiers
        protein_id = None
        for key in cds_identifiers:
            if key in seq.id or key in seq.description:
                protein_id = key
                break
        
        if not protein_id:
            # Fallback to the first part of the sequence id
            protein_id = seq.id.split()[0]
        
        # Generate primers without overhangs
        amplicon = primer_design(
            pydna_seq,
            target_tm=target_tm,
            limit=limit,
            tm_function=primer_tm_neb1
        )

        # Assign names using the protein ID
        amplicon.forward_primer.id = f"{protein_id}_fw"
        amplicon.reverse_primer.id = f"{protein_id}_rv"
        
        # Append to amplicons and gene_names
        amplicons.append(amplicon)
        gene_names.append(protein_id)

    return amplicons, gene_names

def add_overhangs_and_calculate_tm(amplicons, forward_overhang, reverse_overhang, max_primer_length=60, truncate=False):
    """
    Adds overhangs to primers and calculates the melting temperatures (Tm) for primers. 
    Optionally truncates overhangs to fit the maximum primer length.
    """
    def truncate_overhang(overhang, primer, max_length):
        if len(overhang) + len(primer) > max_length:
            overhang = overhang[-(max_length - len(primer)):]  # Only truncate if necessary
        return overhang + primer

    forward_primer_with_overhang = [
        truncate_overhang(forward_overhang, str(f.forward_primer.seq), max_primer_length) if truncate else forward_overhang + str(f.forward_primer.seq)
        for f in amplicons
    ]
    reverse_primer_with_overhang = [
        truncate_overhang(reverse_overhang, str(r.reverse_primer.seq), max_primer_length) if truncate else reverse_overhang + str(r.reverse_primer.seq)
        for r in amplicons
    ]

    # Calculate annealing temperatures
    aneal_f = [primer_tm_neb1(primer) for primer in forward_primer_with_overhang]
    aneal_r = [primer_tm_neb1(primer) for primer in reverse_primer_with_overhang]
    ta = [primer_ta_neb(str(f.forward_primer.seq), str(f.reverse_primer.seq)) for f in amplicons]

    return forward_primer_with_overhang, reverse_primer_with_overhang, aneal_f, aneal_r, ta

def write_to_excel(forward_primer, reverse_primer, sequence_names, aneal_f, aneal_r, ta, output_file):
    """
    Writes the primers to an Excel file with relevant data.
    """
    df = pd.DataFrame({
        'template': sequence_names,
        'f_primer': forward_primer,
        'r_primer': reverse_primer,
        'f_tm': aneal_f,
        'r_tm': aneal_r,
        'ta': ta
    })

    df['len_fw'] = df['f_primer'].apply(lambda x: len(x))
    df['len_rv'] = df['r_primer'].apply(lambda x: len(x))

    # Generate gene names for forward and reverse primers
    gene_names_fw = [name + '_fw' for name in sequence_names]
    gene_names_rv = [name + '_rv' for name in sequence_names]

    forward_df = pd.DataFrame({
        'Name': gene_names_fw,
        'Sequence': forward_primer,
        'Concentration': '25nm',
        'Purification': 'STD'
    })

    reverse_df = pd.DataFrame({
        'Name': gene_names_rv,
        'Sequence': reverse_primer,
        'Concentration': '25nm',
        'Purification': 'STD'
    })

    idt_primers_result = pd.concat([forward_df, reverse_df], ignore_index=True)
    idt_primers_result.to_excel(output_file, index=False)

    print(f"Primers saved to {output_file}")

import argparse

def parse_fasta_and_extract_identifiers(fasta_path):
    """
    Parses a FASTA file and extracts both the sequences and CDS protein identifiers.
    
    Args:
        fasta_path (str): Path to the FASTA file.
    
    Returns:
        list: List of Biopython SeqRecords (sequences).
        dict: Dictionary of protein_id to full SeqRecord (cds_identifiers).
    """
    sequences = []
    cds_identifiers = {}
    pattern = r"\[protein_id=([A-Za-z0-9_]+\.\d+)\]"  # Match [protein_id=...]

    with open(fasta_path) as handle:
        for record in SeqIO.parse(handle, "fasta"):
            sequences.append(record)
            match = re.search(pattern, record.description)
            if match:
                protein_id = match.group(1)
                cds_identifiers[protein_id] = record  # Store the full SeqRecord
            else:
                # If no protein ID, use the first term from the header
                first_term = record.id.split()[0]  # This should capture the core part of the ID
                cds_identifiers[first_term] = record

    return sequences, cds_identifiers

def main(gff3_path=None, fasta_path=None, output_file=None, forward_overhang="TTTTTTTTTT", reverse_overhang="CCCCCCCC", 
         max_primer_length=60, cutoff=False, target_tm=60, limit=14, fusion=False):
    """
    Main function that reads GFF3 and FASTA files, generates primers, and writes the result to an Excel file.
    If no GFF3 file is provided, genes will not be truncated, and this will be indicated in the output.
    """

    # Parse FASTA and extract both sequences and CDS identifiers
    sequences, cds_identifiers = parse_fasta_and_extract_identifiers(fasta_path)

    # If GFF3 file is provided, read it
    if gff3_path:
        _, genes_with_signal_peptides, N_pos = read_gff3_and_fasta(gff3_path, fasta_path)
    else:
        print("No GFF3 file provided. Genes will not be truncated.")
        genes_with_signal_peptides = []
        N_pos = None

    # Process sequences based on fusion flag
    processed_sequences = process_sequences(sequences, fusion, genes_with_signal_peptides, N_pos)

    # Generate amplicons
    amplicons, gene_names = generate_amplicons_with_names(processed_sequences, target_tm=target_tm, limit=limit, cds_identifiers=cds_identifiers)

    # Add overhangs and calculate TM, with optional cutoff
    forward_primer, reverse_primer, aneal_f, aneal_r, ta = add_overhangs_and_calculate_tm(amplicons, forward_overhang, reverse_overhang, max_primer_length, truncate=cutoff)

    # Write results to Excel
    write_to_excel(forward_primer, reverse_primer, gene_names, aneal_f, aneal_r, ta, output_file)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Generate primers with optional truncations, overhangs and accommodation for fusion protein use.")

    parser.add_argument("-f", "--fasta", required=True, help="Path to the input FASTA file")
    parser.add_argument("-g", "--gff3", help="Path to the GFF3 file for truncating the sequences. If omitted, no truncation will be performed.", default=None)
    parser.add_argument("-o", "--output", help="Path to the output Excel file.", required=True)
    parser.add_argument("--forward_overhang", help="Forward primer overhang sequence.", default="TTTTTTTTTT")
    parser.add_argument("--reverse_overhang", help="Reverse primer overhang sequence.", default="CCCCCCCC")
    parser.add_argument("--max_length", type=int, help="Maximum primer length including overhangs.", default=60)
    parser.add_argument("--cutoff", action="store_true", help="Cutoff overhangs to fit within the maximum primer length.")
    parser.add_argument("-tm", "--target_tm", type=int, help="Target melting temperature for primers as per the NEB Calculator for Phusion Polymerases. Default value is 60.", default=60)
    parser.add_argument("-lm", "--limit", type=int, help="Limit for primer length. Default value is 14.", default=14)
    parser.add_argument("--fusion", action="store_true", help="Process sequences for fusion protein use by removing signal peptides and start/stop codons")

    args = parser.parse_args()

    main(fasta_path=args.fasta,
         gff3_path=args.gff3,
         output_file=args.output,
         forward_overhang=args.forward_overhang,
         reverse_overhang=args.reverse_overhang,
         max_primer_length=args.max_length,
         cutoff=args.cutoff,
         target_tm=args.target_tm,
         limit=args.limit,
         fusion=args.fusion)

usage: ipykernel_launcher.py [-h] [-g GFF3] -o OUTPUT
                             [--forward_overhang FORWARD_OVERHANG]
                             [--reverse_overhang REVERSE_OVERHANG]
                             [--max_length MAX_LENGTH] [--truncate]
                             fasta
ipykernel_launcher.py: error: the following arguments are required: -o/--output


SystemExit: 2