In [12]:
import os
import subprocess
from Bio import SeqIO, AlignIO
from Bio.SeqRecord import SeqRecord
from collections import defaultdict
from Bio.Align import AlignInfo

In [13]:
def write_temp_fasta(records, filepath):
    """
    Write a list of SeqRecord objects to a FASTA file.

    Parameters:
    records (list of SeqRecord): List of sequence records to write to the file.
    filepath (str): Path to the output FASTA file.
    """
    SeqIO.write(records, filepath, "fasta")

In [14]:
def process_sequences_by_class(input_fasta):
    """
    Parse sequences from a FASTA file and group them by class.

    Assumes that the class label is the last item in the description, separated by '|'.

    Parameters:
    input_fasta (str): Path to the input FASTA file containing sequences.

    Returns:
    dict: A dictionary mapping class labels to lists of SeqRecord objects.
    """
    sequences_by_class = defaultdict(list)
    for record in SeqIO.parse(input_fasta, "fasta"):
        sequence_class = record.description.split('|')[-1]
        sequences_by_class[sequence_class].append(record)
    return sequences_by_class

In [15]:
def align_sequences(mafft_path, input_fasta, output_fasta):
    """
    Perform multiple sequence alignment using MAFFT.

    Parameters:
    mafft_path (str): Absolute path to the MAFFT executable.
    input_fasta (str): Path to the input FASTA file containing unaligned sequences.
    output_fasta (str): Path to the output FASTA file to save aligned sequences.

    Notes:
    - Uses '--maxiterate' and '--thread' options for MAFFT to improve alignment.
    - Suppresses MAFFT output with '--quiet' for cleaner logs.
    """
    command = [
        mafft_path,
        '--maxiterate', '1000',
        '--thread', '20',
        '--quiet',
        input_fasta
    ]
    print(f"Executing command: {' '.join(command)}")

    with open(output_fasta, 'w') as outfile:
        try:
            subprocess.run(
                command,
                check=True,
                stdout=outfile,
                stderr=subprocess.PIPE,
                text=True
            )
            print(f"MAFFT successfully executed for {input_fasta}")
        except subprocess.CalledProcessError as e:
            print(f"Error executing MAFFT for {input_fasta}: {e.stderr}")

In [16]:
def read_alignment(aligned_fasta):
    """
    Read an aligned FASTA file and return a MultipleSeqAlignment object.

    Parameters:
    aligned_fasta (str): Path to the aligned FASTA file.

    Returns:
    MultipleSeqAlignment: An object containing aligned sequences.
    """
    return AlignIO.read(aligned_fasta, "fasta")

In [17]:
def generate_consensus(align):
    """
    Generate a consensus sequence from a multiple sequence alignment.

    Parameters:
    align (MultipleSeqAlignment): Alignment object containing aligned sequences.

    Returns:
    Seq: Consensus sequence as a Seq object.
    """
    summary_align = AlignInfo.SummaryInfo(align)
    consensus_seq = summary_align.gap_consensus(threshold=0.7, ambiguous='N')
    return consensus_seq

In [18]:
def generate_consensus_per_class(input_fasta, output_fasta, mafft_path):
    """
    Generate consensus sequences for each class in the input FASTA file.

    Parameters:
    input_fasta (str): Path to the input FASTA file containing sequences with class labels.
    output_fasta (str): Path to the output FASTA file to save consensus sequences.
    mafft_path (str): Absolute path to the MAFFT executable.

    Workflow:
    - Processes sequences grouped by class.
    - Performs multiple sequence alignment using MAFFT.
    - Generates a consensus sequence for each class.
    - Writes the consensus sequences to the output FASTA file.
    """
    sequences_by_class = process_sequences_by_class(input_fasta)
    consensus_records = []

    for cls, records in sequences_by_class.items():
        temp_fasta = f'temp_{cls}.fasta'
        aligned_fasta = f'{cls}_aligned.fasta'
        
        if not os.path.exists(aligned_fasta):
            write_temp_fasta(records, temp_fasta)
            align_sequences(mafft_path, temp_fasta, aligned_fasta)
            if os.path.exists(temp_fasta):
                os.remove(temp_fasta)

        if os.path.exists(aligned_fasta):
            align = read_alignment(aligned_fasta)
            os.remove(aligned_fasta)
        else:
            print(f"Failed to generate alignment for class {cls}")
            continue

        consensus_seq = generate_consensus(align)
        consensus_records.append(SeqRecord(consensus_seq, id=cls, description=""))

    SeqIO.write(consensus_records, output_fasta, "fasta")

In [19]:
# List of input and output file paths
files = [
    ('../data/Human_betaherpesvirus_5/US28/US28_nucleotide_sequences.fasta',
     '../data/Human_betaherpesvirus_5/US28/US28_consensus.fasta'),
    ('../data/Human_betaherpesvirus_5/UL55/UL55_nucleotide_sequences.fasta',
     '../data/Human_betaherpesvirus_5/UL55/UL55_consensus.fasta'),
    ('../data/Human_betaherpesvirus_5/UL73/UL73_nucleotide_sequences.fasta',
     '../data/Human_betaherpesvirus_5/UL73/UL73_consensus.fasta'),
    ('../data/Severe_acute_respiratory_syndrome_coronavirus_2/Severe_acute_respiratory_syndrome_coronavirus_2_nucleotide_sequences.fasta',
     '../data/Severe_acute_respiratory_syndrome_coronavirus_2/Severe_acute_respiratory_syndrome_coronavirus_2_consensus.fasta'),
    ('../data/Human_immunodeficiency_virus_1/Human_immunodeficiency_virus_1_nucleotide_sequences.fasta',
     '../data/Human_immunodeficiency_virus_1/Human_immunodeficiency_virus_1_consensus.fasta')
]

# Absolute path to the MAFFT executable
mafft_path = os.path.abspath("../lib/mafft/mafft.bat")

# Check if MAFFT executable exists before proceeding
if not os.path.exists(mafft_path):
    print(f"MAFFT executable not found at {mafft_path}")
else:
    # Process each pair of input and output files
    for input_fasta, output_fasta in files:
        print(f"Processing {output_fasta}")
        generate_consensus_per_class(input_fasta, output_fasta, mafft_path)

Processing ../data/Human_betaherpesvirus_5/US28/US28_consensus.fasta
Executing command: c:\Users\lebat\Documents\KSS\lib\mafft\mafft.bat --maxiterate 1000 --thread 20 --quiet temp_A1.fasta
MAFFT successfully executed for temp_A1.fasta
Executing command: c:\Users\lebat\Documents\KSS\lib\mafft\mafft.bat --maxiterate 1000 --thread 20 --quiet temp_A2.fasta
MAFFT successfully executed for temp_A2.fasta
Executing command: c:\Users\lebat\Documents\KSS\lib\mafft\mafft.bat --maxiterate 1000 --thread 20 --quiet temp_D.fasta
MAFFT successfully executed for temp_D.fasta
Executing command: c:\Users\lebat\Documents\KSS\lib\mafft\mafft.bat --maxiterate 1000 --thread 20 --quiet temp_B2.fasta
MAFFT successfully executed for temp_B2.fasta
Executing command: c:\Users\lebat\Documents\KSS\lib\mafft\mafft.bat --maxiterate 1000 --thread 20 --quiet temp_B1.fasta
MAFFT successfully executed for temp_B1.fasta
Processing ../data/Human_betaherpesvirus_5/UL55/UL55_consensus.fasta
Executing command: c:\Users\lebat\