# *prepDyn*: Preprocessing of missing data for dynamic homology

In dynamic homology, data are typically preprocessed to distinguish differences in sequence length resulting from missing data or insertion-deletion events. However, previous empirical studies using POY/PhyG manually preprocessed missing data with varying approaches. Here we demonstrate that coding missing data with dashes (–) or IUPAC Ns increase tree costs and are biologically inappropriate. Although inserting pound signs (#) around blocks of missing data has been a common solution, it reduces the severity of homology tests and precludes the discovery of the optimal tree. Therefore, missing data should be coded with question marks (?) to minimize tree costs, whereas pound signs should be inserted only on highly conserved regions. To balance time complexity and severity of homology test, we propose a heuristic to successively partition data. All procedures are implemented in a collection of Python scripts to facilitate the preprocessing of input sequences to PhyG. 

## Pre-amble

Mafft should be installed locally in the system PATH (e.g. in Ubuntu, `$ sudo apt install mafft`). In addition, the following Python modules should be installed. 

In [133]:
# Load packages
from Bio import AlignIO, Entrez, SeqIO
from Bio.Align import MultipleSeqAlignment
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
import csv
from io import StringIO
import matplotlib.pyplot as plt
import numpy as np
import os
import pathlib
#import pytrimal
import re
#import rich.console
#import rich.panel
#from rich_msa import RichAlignment
import subprocess
import tempfile
from termcolor import colored
import time

Additional custom functions:

In [113]:
# FUNCTION 1: Define functions to visualize nucleotides in different colors
# Define colors for each nucleotide (case insensitive)
def color_nucleotide(nucleotide):
    color_map = {
        "A": "red",
        "T": "blue",
        "G": "green",
        "C": "yellow",
        "-": "white",
        "#": "black",
        "?": "magenta"
    }
    return colored(nucleotide, color_map.get(nucleotide.upper(), "white"))
# Function to print the alignment
def print_colored_alignment(alignment):
    """
    Prints a DNA alignment with each nucleotide in a different color.

    Parameters:
        alignment (dict): A dictionary with sequence names as keys and DNA sequences as values.
    """
    for name, sequence in alignment.items():
        colored_seq = ''.join(color_nucleotide(n) for n in sequence)
        print(f"{name}: {colored_seq}")


# FUNCTION 2: Print alignments using trimAl
def show_alignment(alignment):
    console = rich.console.Console(width=len(alignment.sequences[0])+40)
    widget = RichAlignment(names=[n.decode() for n in alignment.names], sequences=alignment.sequences, max_name_width=30)
    panel = rich.panel.Panel(widget, title_align="left", title="({} residues, {} sequences)".format(len(alignment.sequences[0]), len(alignment.sequences)))
    console.print(panel)

# FUNCTION 3: Convert dictionaries to MultipleSeqAlignment
def dict_to_multiple_seq_alignment(seq_dict):
    """
    Converts a dictionary of sequences into a MultipleSeqAlignment object.
    
    Args:
        seq_dict (dict): A dictionary where keys are sequence identifiers and values are sequences (strings).
        
    Returns:
        MultipleSeqAlignment: A Biopython MultipleSeqAlignment object.
    """
    # Create a list of SeqRecord objects from the dictionary
    seq_records = []
    
    for seq_id, seq_str in seq_dict.items():
        # Create a SeqRecord for each sequence
        seq = Seq(seq_str)
        seq_record = SeqRecord(seq, id=seq_id, description="")
        seq_records.append(seq_record)
    
    # Create and return the MultipleSeqAlignment object
    alignment = MultipleSeqAlignment(seq_records)
    return alignment

# FUNCTION 4: List lengths of blocks of contiguous gaps in internal and terminal positions
def list_gap_blocks_by_type(alignment, plot_distribution=False):
    """
    Identify all blocks of contiguous gaps in the DNA alignment.
    Classify gap blocks into terminal and internal blocks.
    Optionally, plot the distributions of gap block lengths for terminal and internal blocks.
    
    Args:
        alignment (dict): A dictionary with sequence IDs as keys and sequences as values.
        plot_distribution (bool): If True, plot the distributions of terminal and internal gap block lengths.
        
    Returns:
        tuple: Two lists:
            - terminal_blocks: A list of gap block lengths at the start or end of sequences.
            - internal_blocks: A list of gap block lengths in the middle of sequences.
    """
    terminal_blocks = []
    internal_blocks = []
    
    # Iterate over each sequence in the alignment
    for seq_id, sequence in alignment.items():
        sequence_length = len(sequence)
        gap_count = 0
        in_gap = False

        # Identify gap blocks and separate terminal vs internal
        for i, nucleotide in enumerate(sequence):
            if nucleotide == '-':  # We are in a gap
                if not in_gap:
                    in_gap = True
                    gap_count = 1  # Start a new gap block
                else:
                    gap_count += 1  # Continue counting the current gap block
            else:  # We encountered a non-gap nucleotide
                if in_gap:
                    # End of a gap block
                    # Check if this gap block is terminal (at start or end of the sequence)
                    if i == sequence_length or (i - gap_count == 0):
                        terminal_blocks.append(gap_count)
                    else:
                        internal_blocks.append(gap_count)
                    in_gap = False
                    gap_count = 0  # Reset gap count for the next block

        # If the sequence ends with a gap block, add it to the appropriate list
        if in_gap:
            if sequence[-1] == '-':  # Last character is a gap
                terminal_blocks.append(gap_count)
            else:
                internal_blocks.append(gap_count)

    # Optionally plot the distributions of terminal and internal blocks
    if plot_distribution:
        plt.figure(figsize=(8, 6))
        # Plotting terminal blocks
        plt.hist(terminal_blocks, bins=20, alpha=0.5, label='Terminal Blocks', color='blue')
        # Plotting internal blocks
        plt.hist(internal_blocks, bins=20, alpha=0.5, label='Internal Blocks', color='red')
        
        plt.xlabel('Gap Block Length')
        plt.ylabel('Frequency')
        plt.title('Distribution of Gap Block Lengths')
        plt.legend(loc='upper right')
        plt.show()

    return terminal_blocks, internal_blocks

# FUNCTION 5: Remove underscores in the beggining of file names
def remove_leading_underscores(file_path):
    """
    Remove leading contiguous underscores from the beginning of the file name.
    If a directory is provided, it renames all files in that directory.
    
    Args:
        file_path (str): The full path of the file or directory.
    
    Returns:
        str or None: The new file path with leading underscores removed if a file, 
                     or None if a directory (modifies in place).
    """
    if os.path.isdir(file_path):
        # If it's a directory, rename all files inside it
        for root, dirs, files in os.walk(file_path):
            for file in files:
                old_file_path = os.path.join(root, file)
                new_file_name = file.lstrip('_')
                new_file_path = os.path.join(root, new_file_name)
                
                if old_file_path != new_file_path:
                    os.rename(old_file_path, new_file_path)
        return None  # No return for directories, as the renaming is in place
    else:
        # If it's a single file, rename it
        dir_name, file_name = os.path.split(file_path)
        new_file_name = file_name.lstrip('_')
        new_file_path = os.path.join(dir_name, new_file_name)
        if file_path != new_file_path:
            os.rename(file_path, new_file_path)
        return new_file_path

In [None]:
## Main functions

We define below the main functions of *prepDyn*. To test each of the functions individually, we created a synthetic example. In such example, the step of data collection from GenBank was skipped.

In [114]:
# Input data
sequences = {
    'sp1': "TGCACCGTCGCCAACAGTAGTCCTCCACCGTCGCCACCGTCGCCAACAGTA",
    'sp2': "TCCACCGTCGN?????GTAGTCTCCACCGTCGC???????GCCAACAGTA",
    'sp3': "TCCACCGTCGCCAACAGTAGTCCC",
    'sp4': "GTCCTCCACCGTCGCCACCTCGCCAAAGTA",
    'sp5': "TCCACCGTCGCCAACAGTAGTCCTCCACCGTCGCCACCGTCGCCACAGA",
    'sp6': "CACCGTCCCAACAGTAGTCCTCCACCGTCGCACCGTCGCAACAGTA",
} 

# Create a temporary file to save the sequences in FASTA format
with tempfile.NamedTemporaryFile(delete=False, suffix='.fasta', mode='w') as fasta_file:
    # Write the sequences to the file
    for name, seq in sequences.items():
        fasta_file.write(f">{name}\n{seq}\n")
    fasta_file_path = fasta_file.name

# Define the output file for the alignment
aligned_fasta_file_path = fasta_file_path.replace('.fasta', '_aligned.fasta')

In [115]:
# Run MAFFT
mafft_command = ['mafft', '--auto', fasta_file_path]
with open(aligned_fasta_file_path, 'w') as aligned_file:
    subprocess.run(mafft_command, stdout=aligned_file)

outputhat23=16
treein = 0
compacttree = 0
stacksize: 8192 kb
generating a scoring matrix for nucleotide (dist=200) ... done
All-to-all alignment.
    0 / 6    1 / 6    2 / 6    3 / 6    4 / 6tbfast-pair (nuc) Version 7.520
alg=L, model=DNA200 (2), 2.00 (6.00), -0.10 (-0.30), noshift, amax=0.0
0 thread(s)

outputhat23=16
Loading 'hat3.seed' ... 
done.
Writing hat3 for iterative refinement
generating a scoring matrix for nucleotide (dist=200) ... done
Gap Penalty = -1.53, +0.00, +0.00
tbutree = 1, compacttree = 0
Constructing a UPGMA tree ... 
    0 / 6
done.

Progressive alignment ... 
STEP     1 /5 STEP     2 /5 STEP     3 /5 STEP     4 /5 STEP     5 /5 
done.
tbfast (nuc) Version 7.520
alg=A, model=DNA200 (2), 1.53 (4.59), -0.00 (-0.00), noshift, amax=0.0
1 thread(s)

minimumweight = 0.000010
autosubalignment = 0.000000
nthread = 0
randomseed = 0
blosum 62 / kimura 200
poffset = 0
niter = 16
sueff_global = 0.100000
nadd = 16
Loading 'hat3' ... done.
generating a scoring mat

In [122]:
# Input data
alignment = AlignIO.read(aligned_fasta_file_path, "fasta")
type(alignment) # Check the class of alignment (MultipleSeqAlignment
# Convert MultipleSeqAlignment to dictionary
alignment = {record.id: str(record.seq) for record in alignment}
type(alignment)
# Print MAFFT alignment
print("Step 0. Align sequences statically")
print_colored_alignment(alignment)

###########################
# Step 1: Data collection #
###########################

def GB2MSA_1(input_file, output_prefix, delimiter=',', write_names=True):
    """
    Downloads GenBank sequences based on accession numbers in a CSV/TSV file and aligns them by gene using MAFFT.

    Parameters:
    -----------
    input_file : str
        Path to the CSV or TSV input file. The first column should contain sequence names (sample identifiers).
        The first row should contain gene names starting from the second column.
        Cells contain GenBank accession numbers (one or more separated by slashes). "NA", empty cells, or dashes are ignored.

    output_prefix : str
        Prefix used for naming intermediate FASTA files and final aligned output files.

    delimiter : str, optional (default=',')
        Delimiter used in the input file (e.g., ',' for CSV or '\t' for TSV).

    write_names : bool, optional (default=True)
        If True, writes a TXT file listing all sequence names (from the first column).

    Returns:
    --------
    aligned_files : list of str
        List of file paths to the MAFFT-aligned FASTA files for each gene.
    """
    # Open the input CSV/TSV file
    with open(input_file, newline='') as file:
        reader = csv.reader(file, delimiter=delimiter)
        rows = list(reader)

    # Replace spaces in sequence names with underscores
    sequence_names = [row[0].replace(" ", "_") for row in rows[1:]]
    
    # Extract gene names from the header row (excluding first column)
    gene_names = rows[0][1:]
    
    # Extract gene accession data for each sequence (excluding first column)
    gene_columns = [row[1:] for row in rows[1:]]

    # If requested, write the sequence names to a text file
    if write_names:
        names_file = f"{output_prefix}_sequence_names.txt"
        with open(names_file, 'w') as nf:
            for name in sequence_names:
                nf.write(f"{name}\n")

    aligned_files = []  # List to store paths of aligned output files

    # Iterate over each gene (i.e., each column after the first)
    for gene_idx, gene_name in enumerate(gene_names):
        fasta_file = f"{output_prefix}_{gene_name}.fasta"  # Name of temporary FASTA file
        
        with open(fasta_file, 'w') as fasta_out:
            # Iterate through each row (sample/sequence)
            for i, seq_name in enumerate(sequence_names):
                cell = gene_columns[i][gene_idx].strip()
                
                # Skip cells with missing data ("NA", empty, or dash)
                if cell.upper() == "NA" or not cell or cell == "-":
                    continue

                # Split accession numbers by '/' and filter out invalid entries
                accessions = [acc for acc in cell.split('/') if acc.upper() != "NA" and acc != "" and acc != "-"]
                sequences = []  # To hold the sequences retrieved from GenBank

                # Fetch each sequence from GenBank
                for acc in accessions:
                    try:
                        handle = Entrez.efetch(db="nucleotide", id=acc, rettype="fasta", retmode="text")
                        seq_record = SeqIO.read(handle, "fasta")
                        handle.close()
                        sequences.append(str(seq_record.seq))  # Store the sequence string
                    except Exception as e:
                        print(f"Error fetching {acc}: {e}")

                # Combine multiple sequences with 'W' delimiters to mark junctions (to be handled later)
                combined_seq = "WWWWWWWWWWWWWWW".join(sequences)
                
                # Write the sequence to the FASTA file with its name as header
                fasta_out.write(f">{seq_name}\n{combined_seq}\n")

        # Define the name for the output alignment file
        aligned_file = f"{output_prefix}_{gene_name}_aligned.fasta"

        # Run MAFFT on the generated FASTA file and save the alignment
        with open(aligned_file, 'w') as aligned_out:
            subprocess.run(["mafft", "--auto", fasta_file], stdout=aligned_out)

        # Append the aligned file path to the result list
        aligned_files.append(aligned_file)

    return aligned_files  # Return list of aligned output file paths

def GB2MSA_2(alignment_file):
    """
    For each sequence in the alignment:
    - Replaces internal blocks of 15 'w' or spaced 'w' (e.g., w-w-w) with question marks.
    - Removes columns with only '?' or '-' in all rows.
    - Replaces dash blocks flanked by a nucleotide and a question mark with question marks.

    Parameters:
    -----------
    alignment_file : str
        Path to the MAFFT-aligned FASTA file to be processed.
    
    Returns:
    --------
    str
        Path to the cleaned alignment file.
    """
    alignment = list(SeqIO.parse(alignment_file, "fasta"))
    updated_records = []

    # Step 1: Replace exact block of 15 'w' or 'W' with 15 '?'
    for record in alignment:
        seq = str(record.seq)
        seq_cleaned = seq.replace("w" * 15, "?" * 15).replace("W" * 15, "?" * 15)
        record.seq = Seq(seq_cleaned)
        updated_records.append(record)

    # Step 2: Replace non-contiguous 'w' blocks (e.g., w-w-w) with '?'
    for record in updated_records:
        chars = list(str(record.seq))
        i = 0
        while i < len(chars):
            if chars[i].lower() == 'w':
                count = 1
                indices = [i]
                j = i + 1
                while j < len(chars) and count < 15:
                    if chars[j] == '-':
                        indices.append(j)
                    elif chars[j].lower() == 'w':
                        indices.append(j)
                        count += 1
                    else:
                        break
                    j += 1
                if count == 15:
                    for idx in indices:
                        chars[idx] = '?'
                i = j
            else:
                i += 1
        record.seq = Seq(''.join(chars))

    # Step 3: Remove columns with only '?' or '-' in all rows
    sequences = [list(str(record.seq)) for record in updated_records]
    if len(set(len(seq) for seq in sequences)) > 1:
        raise ValueError("Sequences are not of the same length!")

    valid_columns = []
    for i in range(len(sequences[0])):
        column = [seq[i] for seq in sequences]
        if any(base not in ['?', '-'] for base in column):
            valid_columns.append(i)

    # Rebuild records with cleaned sequences
    cleaned_records = []
    for record in updated_records:
        cleaned_seq = ''.join(str(record.seq)[i] for i in valid_columns)
        record.seq = Seq(cleaned_seq)
        cleaned_records.append(record)

    # Step 4: Replace dash blocks flanked by nucleotide and '?' with '?'
    for record in cleaned_records:
        chars = list(str(record.seq))
        i = 0
        while i < len(chars):
            if chars[i] == '-':
                start = i
                while i < len(chars) and chars[i] == '-':
                    i += 1
                end = i - 1

                # Check flanking characters safely
                left = chars[start - 1] if start > 0 else ''
                right = chars[end + 1] if end + 1 < len(chars) else ''

                if ((left in 'ACGTacgt' and right in '?Nn') or 
                    (left in '?Nn' and right in 'ACGTacgt')):
                    for j in range(start, end + 1):
                        chars[j] = '?'
            else:
                i += 1
        record.seq = Seq(''.join(chars))

    # Step 5: Write to cleaned output
    cleaned_file = alignment_file.replace(".fasta", "_GB2MSA.fasta")
    with open(cleaned_file, "w") as out_handle:
        SeqIO.write(cleaned_records, out_handle, "fasta")

    return cleaned_file



def GB2MSA(input_file, output_prefix, delimiter=',', write_names=True, log=False):
    """
    Complete GenBank-to-MSA pipeline:
    1. Downloads sequences from GenBank and aligns them by gene using MAFFT.
    2. Cleans the alignments by replacing internal missing data and removing empty columns.
    3. Deletes intermediate files ending with '_aligned.fasta'.
    4. Optionally logs wall clock and CPU time to a log file named '<output_prefix>_log.txt'.

    Parameters:
    -----------
    input_file : str
        Path to the input CSV/TSV file with sample names and GenBank accessions.

    output_prefix : str
        Prefix for naming all output files.

    delimiter : str, optional (default=',')
        Delimiter used in the input file.

    write_names : bool, optional (default=True)
        Whether to write a file listing sequence names.

    log : bool, optional (default=False)
        If True, logs wall clock time and CPU time to a file named '<output_prefix>_log.txt'.

    Returns:
    --------
    cleaned_files : list of str
        List of paths to the cleaned aligned FASTA files for each gene.
    """
    start_wall = time.time()
    start_cpu = time.process_time()

    # Step 1: Generate aligned FASTA files
    aligned_files = GB2MSA_1(input_file, output_prefix, delimiter=delimiter, write_names=write_names)

    # Step 2: Clean each aligned FASTA file
    cleaned_files = []
    for aligned_file in aligned_files:
        cleaned_file = GB2MSA_2(aligned_file)
        cleaned_files.append(cleaned_file)

    # Step 3: Delete intermediate *_aligned.fasta files
    for aligned_file in aligned_files:
        if aligned_file.endswith("_aligned.fasta") and os.path.exists(aligned_file):
            os.remove(aligned_file)

    # Step 4: Log timing if requested
    if log:
        end_wall = time.time()
        end_cpu = time.process_time()
        wall_time = end_wall - start_wall
        cpu_time = end_cpu - start_cpu

        log_file = f"{output_prefix}_log.txt"
        with open(log_file, "a") as lf:
            lf.write(f"--- GB2MSA run for '{output_prefix}' ---\n")
            lf.write(f"Wall clock time: {wall_time:.2f} seconds\n")
            lf.write(f"CPU time: {cpu_time:.2f} seconds\n\n")

    return cleaned_files

#############################################
# Step 1.1: Delete gap artifacts from Mafft #
#############################################

def remove_all_gap_columns(alignment):
    """
    Remove columns from the alignment where all terminal sequences have a gap ('-').
    
    Parameters:
        alignment (dict): A dictionary where keys are sequence names and values are sequence strings.
        
    Returns:
        dict: Updated alignment with columns removed where all terminal sequences have a gap.
    """
    # Convert the alignment to a list of sequences
    sequences = list(alignment.values())
    num_sequences = len(sequences)
    seq_length = len(sequences[0])  # Assuming all sequences are the same length

    # Identify columns that need to be removed (where all terminal sequences have a gap)
    columns_to_remove = []
    for col in range(seq_length):
        # Check if all terminal sequences (sp1, sp2, ...) have a gap ('-') in this column
        if all(seq[col] == '-' for seq in sequences):
            columns_to_remove.append(col)
    
    # Remove the identified columns from each sequence
    for seq_name, seq in alignment.items():
        # Create a new sequence with the columns removed
        new_seq = ''.join(seq[col] for col in range(seq_length) if col not in columns_to_remove)
        alignment[seq_name] = new_seq
    
    return alignment

# Print
remove_all_gap_columns(alignment)
print("\n") # Jump a line
print("Step 1.1 Delete columns presenting gaps in all taxa (artifacts from MAFFT)")
print_colored_alignment(alignment)

###################################################
# Step 1.2: Delete orphan nucleotides iteratively #
###################################################

def calculate_orphan_threshold_from_percentile(alignment, percentile=25, log=False, terminal_only=True):
    """
    Calculate orphan threshold based on a specific percentile of gap lengths in the alignment.
    
    Args:
        alignment (dict): A dictionary with sequence IDs as keys and sequences as values.
        percentile (float): The percentile to use for setting the orphan threshold (e.g., 75 for the 75th percentile).
        log (bool): If True, print the list of gap lengths and the computed orphan threshold.
        terminal_only (bool): If True, only consider gap blocks at the terminal positions (start and end of sequences).
        
    Returns:
        int: Calculated orphan threshold based on the specified percentile.
    """
    gap_lengths = []

    # Loop through each sequence to calculate gap lengths
    for sequence in alignment.values():
        sequence_length = len(sequence)
        gap_count = 0
        in_gap = False

        # If terminal_only is True, we'll check only the first and last blocks of gaps
        if terminal_only:
            # Check for gaps starting at the beginning of the sequence
            if sequence[0] == '-':
                gap_count = 1
                in_gap = True
                for nucleotide in sequence[1:]:
                    if nucleotide == '-':
                        gap_count += 1
                    else:
                        break  # End of the first terminal gap block
                gap_lengths.append(gap_count)  # Add the terminal gap block at the start
                
            # Check for gaps starting at the end of the sequence
            gap_count = 0
            in_gap = False
            if sequence[-1] == '-':
                gap_count = 1
                in_gap = True
                for nucleotide in reversed(sequence[:-1]):
                    if nucleotide == '-':
                        gap_count += 1
                    else:
                        break  # End of the last terminal gap block
                gap_lengths.append(gap_count)  # Add the terminal gap block at the end
        else:
            # Loop through the sequence to find contiguous blocks of gaps (not restricted to terminals)
            for nucleotide in sequence:
                if nucleotide == '-':
                    if not in_gap:
                        in_gap = True  # Start of a new gap block
                        gap_count = 1  # Start counting the length of the new gap block
                    else:
                        gap_count += 1  # Continue counting the gap block length
                else:
                    if in_gap:
                        gap_lengths.append(gap_count)  # End of a gap block, save its length
                        in_gap = False
                        gap_count = 0  # Reset the gap count for the next block

            # If the sequence ends with a gap block, make sure to add the last block's length
            if in_gap:
                gap_lengths.append(gap_count)
        
    # Calculate the specified percentile of gap lengths
    orphan_threshold = int(np.percentile(gap_lengths, percentile))  # Use the given percentile (e.g., 75th percentile)
    
    # Print the list of gap lengths
    if log:
        print("List of gap lengths:", gap_lengths)
        print("Orphan threshold:", orphan_threshold)
        
    return orphan_threshold
    
def delete_orphan_nucleotides2(alignment, orphan_threshold, log_changes=False):
    """
    Iteratively eliminates orphan nucleotides from the first and last block of contiguous nucleotides 
    for each sequence in the alignment until no changes occur. Optionally generates a log to track changes.
    
    Args:
        alignment (dict): A dictionary with sequence IDs as keys and sequences as values.
        orphan_threshold (int): The threshold length for blocks of nucleotides and the maximum number of gaps allowed between blocks.
        log_changes (bool): Flag to determine whether to generate a log of the changes made.

    Returns:
        dict: A dictionary with the same keys, but orphan nucleotides replaced with dashes in the first and last blocks.
        str (optional): A log string capturing all the changes made during the process (if `log_changes=True`).
    """
    alignment_changed = True  # Flag to track if any changes were made
    change_log = []  # List to store log messages

    while alignment_changed:
        alignment_changed = False  # Reset flag to False before starting a new iteration

        # Loop through the alignment and check each sequence
        for seq_id, sequence in alignment.items():
            new_sequence = list(sequence)  # Convert sequence to list for easier modification
            sequence_length = len(sequence)

            # Identify all blocks of contiguous nucleotides (blocks of non-gap characters)
            blocks = []
            i = 0
            while i < sequence_length:
                if new_sequence[i] != '-':
                    block_start = i
                    while i < sequence_length and new_sequence[i] != '-':
                        i += 1
                    block_end = i
                    blocks.append((block_start, block_end))
                else:
                    i += 1

            # If there's only one block of contiguous nucleotides, skip the sequence
            if len(blocks) <= 1:
                continue

            # Check the first block (if orphaned)
            first_block_start, first_block_end = blocks[0]
            if first_block_end - first_block_start < orphan_threshold:  # Check if block length is smaller than orphan_threshold
                if len(blocks) > 1:  # Check if there's a next block
                    # Check the number of gaps between the first block and the second block
                    gap_count = 0
                    for i in range(first_block_end, blocks[1][0]):
                        if new_sequence[i] == '-':
                            gap_count += 1
                    if gap_count > orphan_threshold:  # If more than orphan_threshold gaps, replace the first block with dashes
                        new_sequence[first_block_start:first_block_end] = ['-'] * (first_block_end - first_block_start)
                        alignment_changed = True  # Mark that the alignment has been changed
                        if log_changes:
                            change_log.append(f"Modified {seq_id}: First block from position {first_block_start} to {first_block_end} replaced with dashes")

            # Check the last block (if orphaned)
            last_block_start, last_block_end = blocks[-1]
            if last_block_end - last_block_start < orphan_threshold:  # Check if block length is smaller than orphan_threshold
                if len(blocks) > 1:  # Check if there's a previous block
                    # Check the number of gaps between the last block and the previous block
                    gap_count = 0
                    for i in range(blocks[-2][1], last_block_start):
                        if new_sequence[i] == '-':
                            gap_count += 1
                    if gap_count > orphan_threshold:  # If more than orphan_threshold gaps, replace the last block with dashes
                        new_sequence[last_block_start:last_block_end] = ['-'] * (last_block_end - last_block_start)
                        alignment_changed = True  # Mark that the alignment has been changed
                        if log_changes:
                            change_log.append(f"Modified {seq_id}: Last block from position {last_block_start} to {last_block_end} replaced with dashes")

            # Convert list back to string and update the alignment if it changed
            alignment[seq_id] = ''.join(new_sequence)

    # If log_changes is enabled, return the change log along with the alignment
    if log_changes:
        change_log_text = "\n".join(change_log)
        return alignment, change_log_text
    else:
        return alignment

print('\n')
print("Step 1.2. Delete non-terminal orphan nucleotides (given a threshold)")
print_colored_alignment(delete_orphan_nucleotides2(alignment, orphan_threshold=5))

#########################################################
# Step 2.1: Classify terminal (?) and internal gaps (-) #
#########################################################

def replace_terminal_gaps_dict(alignment):
    """
    Replaces terminal gaps ('-') with '?' in a sequence alignment stored as a dictionary,
    while keeping internal gaps as '-'.

    Parameters:
        alignment (dict): Dictionary with sequence names as keys and sequences as values.

    Returns:
        dict: Modified alignment with terminal gaps replaced by '?'.
    """
    # Rename 'modified_alignment' to 'alignment'
    for name, sequence in alignment.items():
        # Find the first and last non-gap characters
        first_non_gap = next((i for i, char in enumerate(sequence) if char != "-"), None)
        last_non_gap = next((i for i, char in enumerate(reversed(sequence), 1) if char != "-"), None)
        
        if first_non_gap is not None and last_non_gap is not None:
            last_non_gap = len(sequence) - last_non_gap  # Adjust reversed index
            
            # Replace terminal gaps with '?' and keep internal gaps as '-'
            new_sequence = (
                "?" * first_non_gap +
                sequence[first_non_gap:last_non_gap + 1] +
                "?" * (len(sequence) - last_non_gap - 1)
            )
            alignment[name] = new_sequence
        else:
            # Handle sequences with only gaps
            alignment[name] = "?" * len(sequence)
    
    return alignment

replace_terminal_gaps_dict(alignment)
print("\n") # Jump a line
print("Step 2.1 Classify terminal (?) and internal gaps (-)")
print_colored_alignment(alignment)

##################################################
# Step 1.3: Trim terminal invariants iteratively #
##################################################
def is_parsimony_non_informative(column):
    """
    Check if a column is invariant.
    A column is non-informative if:
    - All characters are the same.
    - The characters include '?' and only one other character (e.g., 'A' and '?').

    Parameters:
        column (list): A list of characters in a column (e.g., ['A', 'A', 'A', '?']).
    
    Returns:
        bool: True if the column is non-informative, False if informative.
    """
    unique_characters = set(column)
        
    # Check if the column has only one unique character (parsimony non-informative)
    if len(unique_characters) == 1:
        return True
    
    # Check if the column contains '?' and exactly one other unique character
    if '?' in unique_characters and len(unique_characters) == 2:
        return True
    
    # Otherwise, the column is informative (multiple unique characters without '?')
    return False


def remove_non_informative_positions(alignment):
    """
    Remove parsimony non-informative positions from both the start and the end of the alignment.
    
    Parameters:
        alignment (dict): Dictionary where keys are sequence names and values are sequences (strings).
        
    Returns:
        dict: Updated alignment with non-informative positions removed.
    """
    # Convert the alignment to a list of sequences
    sequences = list(alignment.values())
    seq_length = len(sequences[0])
   
    # Remove non-informative positions from the start
    first_position = 0
    while first_position < seq_length and is_parsimony_non_informative([seq[first_position] for seq in sequences]):
        first_position += 1
    
    # Remove non-informative positions from the end
    last_position = seq_length - 1
    while last_position >= first_position and is_parsimony_non_informative([seq[last_position] for seq in sequences]):
        last_position -= 1
    
    # Build the updated alignment and rename the variable to 'alignment'
    for seq_name, seq in alignment.items():
        alignment[seq_name] = seq[first_position:last_position + 1]
    
    return alignment

# Print
remove_non_informative_positions(alignment)
print("\n") # Jump a line
print("Step 1.3. Trim parsimony non-informative characters in terminal position")
print_colored_alignment(alignment)

##################################
# Step 2.2 Internal missing data #
##################################

def replace_dashes_with_question_marks(alignment, internal_column_ranges=None, internal_leaves="all", internal_method="manual", internal_threshold=None):
    """
    Replace dashes with question marks in the specified column ranges for each sequence in the alignment.
    
    Args:
        alignment (dict): A dictionary with sequence IDs as keys and sequences as values.
        internal_column_ranges (list of tuples, optional): A list of tuples where each tuple defines a range of columns 
                                                          (inclusive) to check for dashes. E.g., [(5, 10), (50, 60)].
        internal_leaves (str or list, optional): If "all", replace dashes in all sequences. If a list of sequence IDs 
                                                   is provided, replace dashes in those sequences only.
        internal_method (str, optional): Defines how to replace dashes.
            - "manual": Specify column ranges and terminal sequences to replace dashes.
            - "semi": Replace internal blocks of contiguous dashes larger than the threshold with question marks.
        internal_threshold (int, optional): The threshold for "semi" method. Only internal blocks of contiguous gaps 
                                             larger than this threshold are replaced with question marks.
        
    Returns:
        dict: Updated alignment with dashes replaced by question marks in the specified columns.
    """
    
    # If internal_leaves is a list, only consider those sequences
    if internal_leaves != "all":
        sequences_to_process = set(internal_leaves)
    else:
        sequences_to_process = set(alignment.keys())
    
    # Convert the alignment into a list of sequences for easier indexing
    alignment = {seq_id: list(seq) for seq_id, seq in alignment.items()}  # Convert sequences to lists for mutability

    if internal_method == "manual":
        # Replace dashes with question marks in the specified column ranges
        for seq_id, seq in alignment.items():
            if seq_id not in sequences_to_process:
                continue  # Skip sequences not in the internal_leavesinternal_terminals list
            
            for start, end in internal_column_ranges:
                # Ensure the range is within the bounds of the sequence length
                start = max(0, start)
                end = min(len(seq), end)

                # Replace dashes with question marks within the specified range
                for i in range(start, end + 1):  # +1 because the end is inclusive
                    if seq[i] == '-':
                        seq[i] = '?'
        
    elif internal_method == "semi" and internal_threshold is not None:
        # Replace internal blocks of contiguous dashes larger than the internal_threshold with question marks
        for seq_id, seq in alignment.items():
            if seq_id not in sequences_to_process:
                continue  # Skip sequences not in the internal_leavesinternal_terminals list

            # Identify contiguous blocks of gaps (internal and terminal)
            gap_block_start = None
            for i in range(len(seq)):
                if seq[i] == '-':
                    if gap_block_start is None:
                        gap_block_start = i  # Start of a new gap block
                else:
                    if gap_block_start is not None:
                        # End of a gap block
                        gap_length = i - gap_block_start
                        if gap_length > internal_threshold and gap_block_start != 0 and gap_block_start != len(seq) - gap_length:
                            # It's an internal block larger than threshold, replace with '?'
                            for j in range(gap_block_start, i):
                                seq[j] = '?'
                        gap_block_start = None  # Reset for the next block
            # Check for a gap block at the end of the sequence
            if gap_block_start is not None:
                gap_length = len(seq) - gap_block_start
                if gap_length > internal_threshold and gap_block_start != 0:
                    for j in range(gap_block_start, len(seq)):
                        seq[j] = '?'
        
    # Convert the list back to a string
    alignment = {seq_id: ''.join(seq) for seq_id, seq in alignment.items()}

    return alignment

# Print
#alignment = replace_dashes_with_question_marks(alignment, internal_column_ranges=[(1,30),(33,40)], internal_leaves=["sp2"], internal_method="manual")
alignment = replace_dashes_with_question_marks(alignment, internal_leaves=["sp2"], internal_method="semi", internal_threshold=2)
print("\n") # Jump a line
print("Step 1.3. Treat internal missing data")
print_colored_alignment(alignment)

#########################
# Step 3.1 Partitioning #
#########################

def classify_and_insert_hashtags(alignment, 
                                 partitioning_round=1, 
                                 log_csv_output=False, 
                                 csv_file_path="contiguous_invariant_blocks.csv"):
    # Step 1: Classify columns as invariant or variant
    num_sequences = len(alignment)
    num_columns = len(next(iter(alignment.values())))  # Get the number of columns from one sequence

    column_types = []  # To store the type of each column (invariant/variant)
    contiguous_invariant_blocks = []  # To store lengths and positions of invariant blocks

    # Classify each column
    for col_idx in range(num_columns):
        column = [seq[col_idx] for seq in alignment.values()]  # Get the column from all sequences
        unique_values = set(column)
        
        # A column is invariant if all values are the same (or have only '?' and one nucleotide)
        if len(unique_values - {'?'} ) == 1:  # Exclude '?' when checking for unique values
            column_types.append('invariant')
        else:
            column_types.append('variant')

    # Step 2: Identify contiguous invariant columns and their lengths
    current_invariant_block = None
    for col_idx in range(num_columns):
        if column_types[col_idx] == 'invariant':
            if current_invariant_block is None:
                current_invariant_block = {'start': col_idx, 'length': 1}
            else:
                current_invariant_block['length'] += 1
        else:
            if current_invariant_block:
                contiguous_invariant_blocks.append(current_invariant_block)
                current_invariant_block = None
    
    if current_invariant_block:
        contiguous_invariant_blocks.append(current_invariant_block)  # Append the last block if any
    
    # Step 3: Sort blocks by length (descending order)
    contiguous_invariant_blocks.sort(key=lambda x: x['length'], reverse=True)
    
    # Group the blocks by length
    block_lengths = {}
    for block in contiguous_invariant_blocks:
        block_lengths.setdefault(block['length'], []).append(block)
    
    # Step 4: Track the positions for inserting hashtags
    hashtag_positions = []

    # Get the largest block sizes we need to insert hashtags into
    block_lengths_sorted = sorted(block_lengths.keys(), reverse=True)
    for block_length in block_lengths_sorted[:partitioning_round]:
        blocks = block_lengths[block_length]
        for block in blocks:
            start_idx = block['start']
            end_idx = start_idx + block['length'] - 1
            
            # Calculate the middle index (for blocks of length 4, the middle is between positions 1 and 2)
            middle_idx = (start_idx + end_idx) // 2
            
            # Store the position to insert the hashtag in the block
            hashtag_positions.append(middle_idx)

    # Step 5: Insert hashtag at the collected positions in all sequences
    for seq_id, seq in alignment.items():
        # Sort positions to insert hashtags from left to right
        sorted_positions = sorted(hashtag_positions)
        # Keep track of shifts caused by hashtag insertions
        shift = 0
        for middle_idx in sorted_positions:  
            # Adjust the middle index due to shifts from previous hashtag insertions
            adjusted_idx = middle_idx + shift
            # Insert the hashtag at the middle of the block
            seq = seq[:adjusted_idx + 1] + '#' + seq[adjusted_idx + 1:]
            # Update the shift value (increased by 1 for each insertion)
            shift += 1
        alignment[seq_id] = seq

    # Step 6: Optionally, write the contiguous invariant blocks to a CSV file
    if log_csv_output:
        # Ensure the file path is valid, adjust it if necessary (to save locally)
        file_dir = os.path.dirname(csv_file_path)
        if not os.path.exists(file_dir) and file_dir:
            os.makedirs(file_dir)  # Create the directory if it doesn't exist

        # Write the contiguous invariant blocks log to CSV
        with open(csv_file_path, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['Start', 'End', 'Length'])  # Write the header row
            for block in contiguous_invariant_blocks:
                start_idx = block['start']
                end_idx = start_idx + block['length'] - 1  # Calculate the end index
                writer.writerow([start_idx, end_idx, block['length']])

        print(f"Log of contiguous invariant blocks written to {csv_file_path}")

    # Return the modified alignment and information about invariant blocks
    return alignment, contiguous_invariant_blocks


# Print
classify_and_insert_hashtags(alignment,partitioning_round=1, log_csv_output=False)
print("\n") # Jump a line
print("Step 3.1. Insert pound sign in the n-largest block of contiguous invariants")
print_colored_alignment(alignment)

########################
# Step 3.2: Refinement #
########################

def refinement_question2hyphen(alignment):
    """
    Replaces contiguous '?' characters flanked by '#' with '-' in the sequence alignment.
    This includes blocks surrounded by '#' as well as the first and last blocks that are flanked only on one side.

    Parameters:
        alignment (dict): Dictionary with sequence names as keys and sequences as values.

    Returns:
        dict: Modified alignment with '?' replaced by '-' where flanked by '#'.
    """
    # Iterate through each sequence in the alignment
    for name, sequence in alignment.items():
        # Replace '?' surrounded by '#' on both sides (internal blocks)
        modified_sequence = re.sub(r'(?<=#)\?+(?=#)', lambda m: '-' * len(m.group(0)), sequence)
        
        # Replace '?' in the first block (flanked by # on the right)
        modified_sequence = re.sub(r'^(\?+)(?=#)', lambda m: '-' * len(m.group(0)), modified_sequence)
        
        # Replace '?' in the last block (flanked by # on the left)
        modified_sequence = re.sub(r'(?<=#)(\?+)$', lambda m: '-' * len(m.group(0)), modified_sequence)

        # Update the alignment dictionary with the modified sequence
        alignment[name] = modified_sequence

    return alignment

# Print
refinement_question2hyphen(alignment) # Replaces blocks of missing data (consecutive '?' characters flanked by '#') with '-'.
print("\n") # Jump a line
print("Step 3.2 Replace ? flanked by # with -")
print_colored_alignment(alignment)  

Step 0. Align sequences statically
sp1: [34mt[0m[32mg[0m[33mc[0m[31ma[0m[33mc[0m[33mc[0m[32mg[0m[34mt[0m[33mc[0m[32mg[0m[33mc[0m[33mc[0m[31ma[0m[31ma[0m[33mc[0m[31ma[0m[32mg[0m[34mt[0m[31ma[0m[32mg[0m[34mt[0m[33mc[0m[33mc[0m[34mt[0m[33mc[0m[33mc[0m[31ma[0m[33mc[0m[33mc[0m[32mg[0m[34mt[0m[33mc[0m[32mg[0m[33mc[0m[33mc[0m[31ma[0m[33mc[0m[33mc[0m[32mg[0m[34mt[0m[33mc[0m[32mg[0m[33mc[0m[33mc[0m[31ma[0m[31ma[0m[33mc[0m[31ma[0m[32mg[0m[34mt[0m[31ma[0m
sp2: [34mt[0m[33mc[0m[33mc[0m[31ma[0m[33mc[0m[33mc[0m[32mg[0m[34mt[0m[33mc[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[32mg[0m[97mn[0m[32mg[0m[34mt[0m[31ma[0m[32mg[0m[34mt[0m[33mc[0m[34mt[0m[33mc[0m[33mc[0m[31ma[0m[33mc[0m[33mc[0m[32mg[0m[34mt[0m[33mc[0m[32mg[0m[33mc[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[32mg[0m[33mc[0m[33mc[0m[31

## prepDyn: Integrated function

Integrating all functions into a straight-forward workflow.

In [132]:
def prepDyn(input_file=None, 
             GB_input=None,
             input_format="fasta",
             orphan_method=None,
             orphan_threshold=5,
             percentile=25,
             del_inv=True,
             internal_method=None,
             internal_column_ranges=None,
             internal_leaves="all",
             internal_threshold=None,
             partitioning_round=0,
             output_file=None,
             output_format="fasta",
             log=False):
    """
    Preprocess missing data for dynamic homology in PhyG. First, columns containing 
    only gaps, orphan nucleotides, and invariant columns can be trimmed. Second, 
    missing data is coded with question marks. Third, partitions are delimited in 
    highly conserved regions.

    Args:
        input_file (str): Path to the input alignment file or directory. Ignored if GB_input is provided.
        GB_input (str): Path to a CSV/TSV file containing GenBank accession numbers. If provided,
                        sequences will be downloaded from GenBank and aligned before preprocessing.
        input_format (str): Format of the input alignment. Options: 'fasta' (default), 
                            'clustal', 'phylip', or any format accepted by Biopython. 
        orphan_method (str): The trimming method. By default, trimming orphan nucleotides
                             is not performed. Options:
                            - 'auto': trim using the 25th percentile;
                            - 'semi': trim with a manual threshold.
        orphan_threshold (int): Threshold used to trim orphan nucleotides if orphan_method = 'semi'.
        percentile (float): Used with orphan_method = 'auto' to define trimming threshold.
        del_inv (bool): Whether to trim invariant terminal columns. Default is True.
        internal_method (str): Defines how to identify internal missing data. Automatic identificaton
                               of missing data is made if GB_input is provided. Otherwise, naive 
                               options to identify internal missing data are:
                               - "manual": Use column ranges;
                               - "semi": Use a threshold for gaps.
        internal_column_ranges (list): Column ranges (inclusive) if internal_method = 'manual'.
        internal_leaves (str or list): Sequences to apply internal missing data replacement 
                                       if internal_method is not "None".
        internal_threshold (int): Used with internal_method = 'semi' to define gap threshold.
                                  Contiguous '-' larger than the threshold are replaced with '?'.
        partitioning_round (int): Number of partitioning round. Invariant regions are sorted by length
                                  in descendant order and the n-largest blocks partitioned using '#'.
        output_file (str): Custom prefix for output files. If None, base_name from input_file is used.
        output_format (str): Output format. Default is 'fasta'.
        log (bool): Whether to write a log with wall-clock time. Default is False.

    Returns:
        dict: The preprocessed unaligned sequences.
    """

    # Start timers if logging is enabled
    if log:
        start_wall_time = time.time()
        start_cpu_time = time.process_time()

    # Step 1: Run GB2MSA if GenBank input is provided
    if GB_input is not None:
        print("Running GB2MSA on GenBank input...")
        gb_output_prefix = output_file if output_file else "output"
        cleaned_files = GB2MSA(GB_input, output_prefix=gb_output_prefix, log=False)

        for file in cleaned_files:
            # Extract gene name from filename (e.g., "output_geneX.fasta" -> "geneX")
            gene_name = os.path.splitext(os.path.basename(file))[0].replace("output_", "")
            # Construct gene-specific output prefix
            if output_file:
                specific_output_prefix = f"{output_file}_{gene_name}"
            else:
                specific_output_prefix = f"{gene_name}"
            prepDyn(input_file=file,
                     input_format="fasta",
                     orphan_method=orphan_method,
                     orphan_threshold=orphan_threshold,
                     percentile=percentile,
                     del_inv=del_inv,
                     internal_method=internal_method,
                     internal_column_ranges=internal_column_ranges,
                     internal_leaves=internal_leaves,
                     internal_threshold=internal_threshold,
                     partitioning_round=partitioning_round,
                     output_format=output_format,
                     log=log,
                     output_file=specific_output_prefix)
        return

    # Step 2: If a folder is provided, process each alignment inside
    if os.path.isdir(input_file):
        for file_name in os.listdir(input_file):
            if file_name.endswith(f".{input_format}"):
                file_path = os.path.join(input_file, file_name)
                lumen(file_path,
                      input_format=input_format,
                      orphan_method=orphan_method,
                      orphan_threshold=orphan_threshold,
                      percentile=percentile,
                      del_inv=del_inv,
                      internal_method=internal_method,
                      internal_column_ranges=internal_column_ranges,
                      internal_leaves=internal_leaves,
                      internal_threshold=internal_threshold,
                      output_format=output_format)
        return

    # Step 3: Read and process alignment
    alignment = AlignIO.read(input_file, input_format)
    alignment = {record.id: str(record.seq) for record in alignment}

    remove_all_gap_columns(alignment)

    if orphan_method == "auto":
        orphan_threshold = calculate_orphan_threshold_from_percentile(alignment, percentile, terminal_only=True)
        alignment = delete_orphan_nucleotides2(alignment, orphan_threshold)
    elif orphan_method == "semi":
        alignment = delete_orphan_nucleotides2(alignment, orphan_threshold)

    alignment = replace_terminal_gaps_dict(alignment)

    if del_inv:
        alignment = remove_non_informative_positions(alignment)
    alignment = replace_terminal_gaps_dict(alignment)

    if internal_method == "manual":
        alignment = replace_dashes_with_question_marks(alignment=alignment, 
                                                       internal_column_ranges=internal_column_ranges, 
                                                       internal_leaves=internal_leaves, 
                                                       internal_method="manual")
    elif internal_method == "semi":
        alignment = replace_dashes_with_question_marks(alignment=alignment, 
                                                       internal_leaves=internal_leaves, 
                                                       internal_method="semi", 
                                                       internal_threshold=internal_threshold)

    classify_and_insert_hashtags(alignment, partitioning_round=partitioning_round)
    refinement_question2hyphen(alignment)

    # Step 4: Write output file
    records = [SeqRecord(Seq(seq), id=key, description="") for key, seq in alignment.items()]
    base_name = os.path.splitext(os.path.basename(input_file))[0]

    if output_file:
        output_prefix = f"{output_file}"
    else:
        output_prefix = base_name

    output_path = f"{output_prefix}_preprocessed.{output_format}"
    with open(output_path, "w") as output_handle:
        SeqIO.write(records, output_handle, output_format)

    # Step 5: Write log
    if log:
        end_wall_time = time.time()
        end_cpu_time = time.process_time()
        wall_time = end_wall_time - start_wall_time
        cpu_time = end_cpu_time - start_cpu_time
        log_path = f"{output_prefix}_log.txt"
        with open(log_path, "w") as log_file:
            log_file.write(f"Wall-clock time: {wall_time:.8f} seconds\n")
            log_file.write(f"CPU time: {cpu_time:.8f} seconds\n")

    return alignment
    
###########
# Example #
###########
prepDyn(GB_input = "Data/prepDyn_Test1_GB2MSA/teste.csv", output_file="Data/prepDyn_Test1_GB2MSA/output",
         orphan_method="semi", orphan_threshold=6,
         partitioning_round=1,
         log=True)

Running GB2MSA on GenBank input...


            Email address is not specified.

            To make use of NCBI's E-utilities, NCBI requires you to specify your
            email address with each request.  As an example, if your email address
            is A.N.Other@example.com, you can specify it as follows:
               from Bio import Entrez
               Entrez.email = 'A.N.Other@example.com'
            In case of excessive usage of the E-utilities, NCBI will attempt to contact
            a user at the email address provided before blocking access to the
            E-utilities.
outputhat23=16
treein = 0
compacttree = 0
stacksize: 8192 kb
generating a scoring matrix for nucleotide (dist=200) ... done
All-to-all alignment.
tbfast-pair (nuc) Version 7.520
alg=L, model=DNA200 (2), 2.00 (6.00), -0.10 (-0.30), noshift, amax=0.0
0 thread(s)

outputhat23=16
Loading 'hat3.seed' ... 
done.
Writing hat3 for iterative refinement
generating a scoring matrix for nucleotide (dist=200) ... done
Gap Penalty = -1.53, +0.00, +

## Time complexity

To understand time complexity, the following combination of parameters were used to simulate XX MSAs: 

- Number of leaves = [50, 100, 150, 200, 250, 300, 350, 450, 500]
- Number of characters = [500, 1000, 1500, 2000, 2500, 5000, 10000, 50000]
- Percentage of terminal missing data = [0.05, 0.1, 0.15, 0.2].

## Empirical example 1: Sanger sequences

## Empirical example 2: Phylogenomic sequences

Empirical example using 352 loci sequeced with AHE from XX species of the Old World treefrogs Rhacophoridae. Note that only files with the extension ".fasta" in the specified directory will be considered as input data.

In [497]:
# Load an alignment (FASTA, Clustal, Phylip, etc.)
lumen(input_file = "/Users/labanfibios/Desktop/Doutorado/Project/B3_PreprocessingPHYG/Example2_Rhacophoridae/", 
      input_format="fasta",
      orphan_method="semi",
      orphan_threshold=10, 
      internal_method="semi",
      internal_threshold=5)

## Empirical example 3: Ancient DNA sequences

## TRASH

Other functions used in previous versions of this code (now discarded).

In [None]:
#######################################################################
# Step 1.2: Delete orphan nucleotides in the first and last positions #
#######################################################################

def delete_orphan_nucleotides(alignment, orphan_threshold=10):
    """
    Orphan nucleotides are artifacts from static alignment, in which one or a few nucleotides
    are accidentally not aligned in the beginning or end of the sequences. Manual deletion or
    local alignment should be performed, but here I provide a heuristic, optional function to
    tentatively identify and delete orphan nucleotides in terminal positions.
    
    For each sequence in the alignment, replaces orphan nucleotides at the 
    beginning or end of the sequence with a hyphen if they are followed or preceded 
    by a number of gaps equal to or greater than the orphan_threshold, but only if 
    the number of orphan nucleotides are fewer than or equal to the orphan_threshold in length.
    
    Parameters:
        alignment (dict): Dictionary with sequence names as keys and sequences as values.
        orphan_threshold (int): The number of consecutive gaps after the orphan nucleotide
                                 required for it to be considered an orphan.
        
    Returns:
        dict: Modified alignment with orphan nucleotides replaced by '-'.
    """
    
    for seq_name, seq in alignment.items():
        # Check the first position for orphan nucleotides (beginning of sequence)
        if seq[0] != "-":
            i = 0
            # Find contiguous orphan nucleotides at the beginning of the sequence
            while i < len(seq) and seq[i] != "-" and seq[i] != "#":
                i += 1
            
            # If the contiguous orphan nucleotides are followed by enough gaps
            if i > 0 and seq[i:i+orphan_threshold] == "-" * orphan_threshold:
                # Only replace if the length of contiguous orphan nucleotides is <= orphan_threshold
                if i <= orphan_threshold:
                    seq = "-" * i + seq[i:]
        
        # Check the last position for orphan nucleotides (end of sequence)
        if seq[-1] != "-":
            i = len(seq) - 1
            # Find contiguous orphan nucleotides at the end of the sequence
            while i >= 0 and seq[i] != "-" and seq[i] != "#":
                i -= 1
            
            # If the contiguous orphan nucleotides are preceded by enough gaps
            if i < len(seq) - 1 and seq[i - orphan_threshold:i] == "-" * orphan_threshold:
                # Only replace if the length of contiguous orphan nucleotides is <= orphan_threshold
                if len(seq) - i - 1 <= orphan_threshold:
                    seq = seq[:i+1] + "-" * (len(seq) - i - 1)
        
        # Update the sequence in the alignment dictionary
        alignment[seq_name] = seq
    
    return alignment

print('\n')
print("Step 1.2. Delete terminal orphan nucleotides (given a threshold)")
print_colored_alignment(delete_orphan_nucleotides(alignment, orphan_threshold=5))

###########################################################
# Step 2.2: Insert breaks in terminal gap opening/closure #
###########################################################
def add_breaks_terminal(alignment):
    """
    Add # in all instances of terminal gap opening/closure.
       
    Parameters:
        alignment (dict): Dictionary where keys are sequence names and values are sequences.
    
    Returns:
        dict: Updated alignment with '#' before terminal gap opening and after terminal gap closure.
    """
    # Determine the length of the sequences
    seq_length = len(next(iter(alignment.values())))

    # Initialize a list to keep track of positions that need '#' in all sequences
    hash_positions = [False] * seq_length

    # Iterate through each sequence to find gap regions
    for seq in alignment.values():
        i = 0
        while i < seq_length:
            if seq[i] == '?':
                # Found the start of a gap region
                start = i
                while i < seq_length and seq[i] == '?':
                    i += 1
                end = i
                # Mark the positions for this gap region (avoid marking the first column)
                if start > 0:
                    hash_positions[start] = True
                if end < seq_length:
                    hash_positions[end] = True
            else:
                i += 1

    # Update each sequence with '#' at the identified positions
    for key in alignment:
        new_seq = []
        for i in range(seq_length):
            if hash_positions[i]:
                new_seq.append('#')
            new_seq.append(alignment[key][i])
        # Handle the case where the last position is a gap
        if hash_positions[-1]:
            new_seq.append('#')
        alignment[key] = ''.join(new_seq)
        
# Print
add_breaks_terminal(alignment)
print("\n") # Jump a line
print("Step 2.2 Insert pound signs in all cases of TERMINAL gap opening/closure")
print_colored_alignment(alignment)
        
###############################################################################
# Step 2.3: Insert breaks in internal gap opening/closure if block length > X #
###############################################################################

def add_breaks_internal(alignment, X=1, Y=None):
    """
    Add '#' flanking internal gap blocks if the gap block size exceeds X.
    
    Parameters:
        alignment (dict): Dictionary where keys are sequence names and values are sequences.
        X (int): Threshold for the minimum gap size to trigger '#' insertion (default: X = 1).
        Y (list of ranges): Optional list of ranges (start, end) defining blocks of gaps that must be flanked by '#'. Indexing of columns in the matrix starts with zero (not one). In internal gap blocks, Y should be specified if the minimum size of known missing data blocks (e.g. incomplete sequences due to primers) is not larger than the maximum size of gap blocks.

    Returns:
        dict: Updated alignment with '#' flanking internal gap blocks.
    """
    # Determine the length of the sequences
    seq_length = len(next(iter(alignment.values())))

    # Initialize a list to keep track of positions that need '#' in all sequences
    hash_positions = [False] * seq_length

    # Iterate through each sequence to find internal gap block
    for seq in alignment.values():
        i = 0  # Start from position 0
        while i < seq_length:
            if seq[i] == '-':
                # Find the start of a gap block
                start = i
                while i < seq_length and seq[i] == '-':
                    i += 1
                end = i
                # Mark the position for this gap opening/closure ONLY if the gap block is larger than X
                if (end - start) > X:
                    hash_positions[start] = True  # Mark the start of gap block
                    if end < seq_length:
                        hash_positions[end] = True  # Mark the end of gap block
            else:
                i += 1

    # Add '#' around the ranges specified in Y
    if Y:
        for range_start, range_end in Y:
            if range_start >= 0 and range_end < seq_length:
                hash_positions[range_start] = True
                if range_end + 1 < seq_length:
                    hash_positions[range_end + 1] = True
                    
    # Add '#' at the identified positions in each sequence
    for key in alignment:
        new_seq = []
        for i in range(seq_length):
            if i > 0 and hash_positions[i]:  # Avoid position 1 and single-gap blocks
                new_seq.append('#')
            new_seq.append(alignment[key][i])
        if hash_positions[-1]:
            new_seq.append('#')  # Ensure we append '#' at the end if necessary
        
        # After modifying the sequence, remove consecutive '#' characters
        modified_seq = ''.join(new_seq)
        modified_seq = re.sub(r'#+', '#', modified_seq)  # Replace multiple '#' with a single '#'
        
        alignment[key] = modified_seq
    
    return alignment  # Return the modified alignment


# Print
add_breaks_internal(alignment, X=2) # insert '#' flanking internal gap blocks larger than 2
#add_breaks_internal(alignment, X=2, Y=[(37, 42)]) # insert '#' in columns 37 and 42
print("\n") # Jump a line
print("Step 2.3 Insert pound signs if INTERNAL block length > X (e.g. X = 1)")
print_colored_alignment(alignment)

Some functions to use the trimal method. However, even when we use the "terminal_only" option, internal columns are deleted.

In [335]:


# Input data
alignment = {
    'sp1': "tt-------------agtagt-ctccaccg-cgccaccgtcgc-------a",
    'sp2': "-ccaccgtc------gngtag-ctccaccg-cgc-------gccaacagta",
    'sp3': "-ccaccgtcgccaacagtagt------ccg--------tt-----------",
    'sp4': "------ct-----------gt-ctccaccg-cgccacc-tcgcca-aagta",
    'sp5': "--caccgtcgccaacagtagt-ctccaccg-cgccaccgtcgcca-cag-a",
    'sp6': "--caccgtc-ccaacagtagt-ctccaccgtcgcaccg-tcgcaa-c----"
} 

print("Step 0. Align sequences statically")
print_colored_alignment(alignment)

#print('\n')
#print("Step 1.1 Delete columns presenting gaps in all taxa")
#print_colored_alignment(remove_all_gap_columns(alignment))

print('\n')
print("Step 1.2. Trim problematic positions using trimai")

# Convert dictionary to Biopython objects
alignment = dict_to_multiple_seq_alignment(alignment)
# Convert Biopython to pytrimal objects
alignment = Alignment.from_biopython(alignment)
# Trim using trimal
trimmer = AutomaticTrimmer(method="gappyout") 
#trimmer = AutomaticTrimmer(method="automated1")
alignment = trimmer.trim(alignment)
alignment = pytrimal.TrimmedAlignment.terminal_only(alignment)


# Convert the trimal to Biopython
alignment = Alignment.to_biopython(alignment)
# Convert MultipleSeqAlignment to dictionary
alignment = {record.id: str(record.seq) for record in alignment}
print_colored_alignment(alignment)

# Print
#show_alignment(alignment)

# Write in fasta format
#Alignment.dump(alignment, file="Example_trimai.fasta", format="fasta")

def calculate_statistics(values):
    mean = sum(values) / len(values)  # Mean = Sum of values / Number of values
    min_value = min(values)           # Minimum value
    max_value = max(values)           # Maximum value
    
    return mean, min_value, max_value

Step 0. Align sequences statically
sp1: [34mt[0m[34mt[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[31ma[0m[32mg[0m[34mt[0m[31ma[0m[32mg[0m[34mt[0m[97m-[0m[33mc[0m[34mt[0m[33mc[0m[33mc[0m[31ma[0m[33mc[0m[33mc[0m[32mg[0m[97m-[0m[33mc[0m[32mg[0m[33mc[0m[33mc[0m[31ma[0m[33mc[0m[33mc[0m[32mg[0m[34mt[0m[33mc[0m[32mg[0m[33mc[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[31ma[0m
sp2: [97m-[0m[33mc[0m[33mc[0m[31ma[0m[33mc[0m[33mc[0m[32mg[0m[34mt[0m[33mc[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[32mg[0m[97mn[0m[32mg[0m[34mt[0m[31ma[0m[32mg[0m[97m-[0m[33mc[0m[34mt[0m[33mc[0m[33mc[0m[31ma[0m[33mc[0m[33mc[0m[32mg[0m[97m-[0m[33mc[0m[32mg[0m[33mc[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[97m-[0m[32mg[0m[33mc[0m[33mc[0m[31