# Scoring Function Testing

In [33]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import pysam
import random
import pyBigWig
import enum
import os
import argparse
import sys

from align_and_score import (
    getSequences_synthetic,
    gotoh,
    vcf_to_alignment,
    get_mutation_data,
    calculate_total_score,
    seqProfile_modified
)

In [51]:
def format_alignment(seq1_list, seq2_list, chunk_size=120):
    '''
    Input:
        two lists of aligned sequences
    Used to format the sequence alignment outputs

    Output:
        Formatted strings
    '''
    if len(seq1_list) != len(seq2_list):
        raise ValueError("Input lists must be of the same length")
    
    seq1 = ''.join(seq1_list)
    seq2 = ''.join(seq2_list)
    
    # Generate the marker line
    marker_line = []
    for c1, c2 in zip(seq1_list, seq2_list):
        if c1 == c2 and c1 != '-' and c2 != '-':
            marker_line.append('|')
        else:
            marker_line.append(' ')
    marker_line = ''.join(marker_line)
    
    # Function to split into chunks
    def chunk_string(s, size):
        return [s[i:i+size] for i in range(0, len(s), size)]
    
    # Split each sequence and marker into chunks
    seq1_chunks = chunk_string(seq1, chunk_size)
    marker_chunks = chunk_string(marker_line, chunk_size)
    seq2_chunks = chunk_string(seq2, chunk_size)
    
    # Combine chunks, adding a blank line after each except the last
    output_lines = []
    chunks = list(zip(seq1_chunks, marker_chunks, seq2_chunks))
    for idx, (s1, m, s2) in enumerate(chunks):
        output_lines.extend([s1, m, s2])
        # Add a blank line after all chunks except the last
        if idx < len(chunks) - 1:
            output_lines.append('')
    
    # Print each line directly instead of returning a string
    for line in output_lines:
        print(line)


class Element:
    def __init__(self, sequence, profile):
        self.sequence = sequence
        self.profile = profile
    def __str__(self):
        return "{0:s} {1:f}".format(self.sequence, self.profile)

def extract_sequence_element(element):
    if isinstance(element, str):
        return str(element).upper()
    else: 
        return str(element.sequence).upper()


In [64]:
# Get mutation data from the alignment 

def get_mutation_data(alnA, alnB):
    '''
    Input: alnA and alnB (list of strings)
    
    Outputs the mutation information for each base in A and B. 
    Iterates over each base and checks if it has been substituted or deleted in the alternative sequence. 
    Info column contains information on base substitution. 

    Output: two tables with three columns (base index, mutation type, info)
    
    '''
    
    mutation_data_seqA = []
    mutation_data_seqB = []

    # Alignment outputs (contains gaps)
    # Convert all in caps to ensure mismatch is not due to case difference 

    #alnA_algo_seqs = [extract_sequence_element(element) for element in alnA]
    #alnB_algo_seqs = [extract_sequence_element(element) for element in alnB]
    alnA = [str(element).upper() for element in alnA]  
    alnB = [str(element).upper() for element in alnB]
    
    
    gap = '-'

    # Input sequences
    seqA =  [element for element in alnA if element != gap]
    seqB =  [element for element in alnB if element != gap]

    current_base_idx_A = 0

    # TABLE 1
    # Mutation data of the reference sequence A 
    for element_idx in range(len(alnA)): 
        # Iterate through each aligned pairs

        # If we have a base (rather than an indel)
        if alnA[element_idx] != gap:
    
            # Check for substitution/no mutations
            # When the aligned pair is a base
            if alnB[element_idx] != gap:
                
                # Check if the aligned element matches
                if alnA[element_idx] == alnB[element_idx]:
                    mutation_data_seqA.append([current_base_idx_A, 'No Mutation', 'N/A'])
                    
                # If there is a mismatch between the aligned pairs --> substitution
                else: 
                    # Store the mutation info in the 3rd column
                    mutation_data_seqA.append([current_base_idx_A, 'Substitution', alnB[element_idx]])



            # If aligned pair is a gap (in seqB), it's a deletion mutation:
            else: 
                mutation_data_seqA.append([current_base_idx_A, 'Deletion', 'N/A'])

            current_base_idx_A += 1

        # If we have a gap in seqA, then it's an insertion and we skip


    current_base_idx_B = 0 
    
    # TABLE 2
    # Mutation data of the alternative sequence B
    for element_idx in range(len(alnB)): 
        # Iterate through each aligned pairs

        # If we have a base (rather than an indel)
        if alnB[element_idx] != gap:
            
    
            # Check for substitution/no mutations
            # When the aligned pair is a base
            if alnA[element_idx] != gap:
                
                # Check if the aligned element matches
                if alnB[element_idx] == alnA[element_idx]:
                    mutation_data_seqB.append([current_base_idx_B, 'No Mutation', 'N/A'])
                    
                # If there is a mismatch between the aligned pairs --> substitution
                else: 
                    # Store the mutation info in the 3rd column
                    mutation_data_seqB.append([current_base_idx_B, 'Substitution', alnA[element_idx]])

    

            # If aligned pair is a gap (in seqA), it's a deletion mutation:
            if alnA[element_idx] == gap:
                mutation_data_seqB.append([current_base_idx_B, 'Insertion', 'N/A'])

            current_base_idx_B += 1
    

    return mutation_data_seqA, mutation_data_seqB




In [36]:
def score_mutation_tables(algo_table, true_table, is_table2=False):
    """
    Compare mutation tables (Table 1 or Table 2) and calculate the score.
    
    Args:
        algo_table: Algorithm-generated mutation data
        true_table: Ground-truth mutation data
        is_table2: If True, only compare insertions (ignores substitutions/no mutations)
    
    Returns:
        Score (int)
    """
    # Convert tables to dictionaries: {base_idx: (mutation_type, info)}
    algo_dict = {entry[0]: (entry[1], entry[2]) for entry in algo_table}
    true_dict = {entry[0]: (entry[1], entry[2]) for entry in true_table}
    
    # All indices (union of algo and truth indices)
    all_indices = set(algo_dict.keys()).union(true_dict.keys())
    score = 0
    
    for idx in all_indices:
        algo_entry = algo_dict.get(idx, None)
        true_entry = true_dict.get(idx, None)
        
        # Skip if comparing Table 2 and mutation is not an insertion
        if is_table2:
            if algo_entry and algo_entry[0] != "Insertion":
                algo_entry = None  # Treat as non-existent for scoring
            if true_entry and true_entry[0] != "Insertion":
                true_entry = None
        
        # Case 1: Both entries exist
        if algo_entry and true_entry:
            if algo_entry == true_entry:
                score += 1  # Correct match
            else:
                score -= 1  # Mismatch
        # Case 2: Missing entry in one of the tables
        elif algo_entry != true_entry:
            score -= 1  # Penalize missing/mismatched entries
    
    return score

def calculate_total_score(algo_table1, algo_table2, true_table1, true_table2):
    """Compute total score by comparing Table 1 and Table 2."""
    # Score Table 1 (substitutions/deletions)
    table1_score = score_mutation_tables(algo_table1, true_table1)
    #print(table1_score)
    # Score Table 2 (insertions only)
    table2_score = score_mutation_tables(algo_table2, true_table2, is_table2=True)
    #print(table2_score)
    return table1_score + table2_score


In [61]:
# Test 1: Identical Alignments

# True Alignemnt
true_alnA = 'ACCGA--TC'
true_alnB = 'ATCCATT--'

# Profile Aligner output
algo_alnA = 'ACCGA--TC'
algo_alnB = 'ATCCATT--'

# Tabulated true mutation data 
true_mutation_data_A, true_mutation_data_B = get_mutation_data(true_alnA, true_alnB)

# Tabulated predicted mutation data 
algo_mutation_data_A, algo_mutation_data_B = get_mutation_data(algo_alnA, algo_alnB)

# Calculate total Score
expected_score = 9
total_score =  calculate_total_score(algo_mutation_data_A, algo_mutation_data_B, true_mutation_data_A, true_mutation_data_B)

if total_score == expected_score:
    print('✅ Scoring Function Passed the Test')
else: 
    print('❌ Scoring Function Failed the Test')
    print(f'Expected {expected_score}, computed {total_score}')

['A', 'C', 'C', 'G', 'A', '-', '-', 'T', 'C']
['A', 'C', 'C', 'G', 'A', '-', '-', 'T', 'C']
✅ Scoring Function Passed the Test


In [38]:
# Test 2: Alignment with a mistake 

# True Alignemnt
true_alnA = 'ACCGA--TCA-'
true_alnB = 'ATCCATT---C'

# Profile Aligner output
algo_alnA = 'ACCGA--TCA'
algo_alnB = 'ATCCATT--C'

# Tabulated true mutation data 
true_mutation_data_A, true_mutation_data_B = get_mutation_data(true_alnA, true_alnB)

# Tabulated predicted mutation data 
algo_mutation_data_A, algo_mutation_data_B = get_mutation_data(algo_alnA, algo_alnB)

# Calculate total score
expected_score = 7
total_score =  calculate_total_score(algo_mutation_data_A, algo_mutation_data_B, true_mutation_data_A, true_mutation_data_B)

if total_score == expected_score:
    print('✅ Scoring Function Passed the Test')
else: 
    print('❌ Scoring Function Failed the Test')
    print(f'Expected {expected_score}, computed {total_score}')

✅ Scoring Function Passed the Test


In [39]:
# Test 3: Completely Different Predicted Alignment 
# With cases where substitution mutations happen but with different alt bases 

# True Alignemnt
true_alnA = 'ACCGA--TCA-'
true_alnB = 'ATCCATT---C'

# Profile Aligner output
algo_alnA = 'ACCGA--TCA-'
algo_alnB = '-ATCCATT--C'

# Tabulated true mutation data 
true_mutation_data_A, true_mutation_data_B = get_mutation_data(true_alnA, true_alnB)

# Tabulated predicted mutation data 
algo_mutation_data_A, algo_mutation_data_B = get_mutation_data(algo_alnA, algo_alnB)

# Calculate total score
expected_score = -2
total_score =  calculate_total_score(algo_mutation_data_A, algo_mutation_data_B, true_mutation_data_A, true_mutation_data_B)

if total_score == expected_score:
    print('✅ Scoring Function Passed the Test')
else: 
    print('❌ Scoring Function Failed the Test')
    print(f'Expected {expected_score}, computed {total_score}')

✅ Scoring Function Passed the Test


# VCF to Alignment Testing

In [40]:
def vcf_to_alignment(vcf_file, seq_path):
    
    raw_seq = ""
    with open(seq_path, 'r') as file:
        for line in file:
            # Skip the header line (starts with >)
            if not line.startswith('>'):
                # Add sequence lines to string, removing whitespace
                raw_seq += line.strip()

    vcf_data = []
    with open(vcf_file, 'r') as f:
        for line in f:
            # Skip headers
            if line.startswith('#'):
                continue

            fields = line.strip().split('\t')
            vcf_data.append({
                'idx': int(fields[1]) - 1,  # Mutation position
                'ref': fields[3],  # Reference sequence
                'alt': fields[4],  # Mutated sequence
                'info': fields[7]  # Indel mutation info
            })

  
    
    alnA = []
    alnB = []

    raw_seq_pos = 0 # index along the raw sequence
    for entry in vcf_data:

        vcf_pos = entry['idx']

        # Copy everything to alnA and alnB until reaching a mutation
        while raw_seq_pos < len(raw_seq) and raw_seq_pos < vcf_pos:
            alnA.append(raw_seq[raw_seq_pos])
            alnB.append(raw_seq[raw_seq_pos])
            raw_seq_pos += 1

        # Insertion mutations
        if 'SVTYPE=INS' in entry['info']:
            # add the first base in both
            alnA.append(raw_seq[raw_seq_pos])
            
            # Add gaps in reference (alnA) and inserted bases in alternative (alnB)
            ins_len = len(entry['alt']) - len(entry['ref'])
            alnA.extend(['-']*ins_len)
            alnB.extend(entry['alt'])

            raw_seq_pos += 1
        
        elif 'SVTYPE=DEL' in entry['info']:
            # add the first base in both
            alnB.append(raw_seq[raw_seq_pos])
            
            del_len = len(entry['ref']) - 1
            alnA.extend(entry['ref'])
            alnB.extend(['-']*del_len)

            raw_seq_pos += len(entry['ref']) 
        # Substitution mutations
        else:
            alnA.append(entry['ref'])
            alnB.append(entry['alt'])
            raw_seq_pos += 1

    # Handle the remaining bases after the last vcf entry
    while raw_seq_pos < len(raw_seq):  
        alnA.append(raw_seq[raw_seq_pos])  
        alnB.append(raw_seq[raw_seq_pos])  
        raw_seq_pos += 1  

    # Convert the list of characters into all caps and then merged string
    alnA = [str(element).upper() for element in alnA] 
    alnB = [str(element).upper() for element in alnB] 
    alnA_str = ''.join(alnA)
    alnB_str = ''.join(alnB)


    #print(alnA_str)
    #print(alnB_str)
    
    return alnA, alnB # keeping it as a list for later use

In [41]:
'''
------Test Case Description--------
Raw Seq: ACCGATCGACTAA (13 bp)

Mutations (1-indexed)
1: Insertion A -> AGGG (to test how insertions at the edges)
2: Substitution C -> T (to test the substitutions)
5: Insertion A -> ATT 
6: Deletion TC -> T (to test deletion mutations, and adjacent mutations)
11: Substitution T -> A
12: Deletion AA -> A (to test deletion at the edges)

Expected Alignment
A---CCGA--TCGACTAA
AGGGTCGATTT-GACAA-
'''
vcf_file_path = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/pipeline_testing/test_1_general.vcf"
reference_seq_path = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/pipeline_testing/test_1_general.fasta"



print('Expected Alignment')
expected_alnA = 'A---CCGA--TCGACTAA'
expected_alnB = 'AGGGTCGATTT-GACAA-'
print(expected_alnA)
print(expected_alnB)
print()
print('VCF to Alignment')
alnA_true, alnB_true  = vcf_to_alignment(vcf_file_path, reference_seq_path)


Expected Alignment
A---CCGA--TCGACTAA
AGGGTCGATTT-GACAA-

VCF to Alignment


In [42]:
'''
------- Test Case 2 Description ------
Goal: Check if the sequence is being truncated 

Ensure that all bases after the last mutation is copied
Same as test case 1 except I removed the last four mutations
The final mutation is at base 2

'''
vcf_file_path = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/pipeline_testing/test_2_truncated.vcf"
print('Expected Alignment')
expected_alnA = 'A---CCGATCGACTAA'
expected_alnB = 'AGGGTCGATCGACTAA'
print(expected_alnA)
print(expected_alnB)
print()
print('VCF to Alignment')
alnA_true, alnB_true  = vcf_to_alignment(vcf_file_path, reference_seq_path)

Expected Alignment
A---CCGATCGACTAA
AGGGTCGATCGACTAA

VCF to Alignment


# Examining a Case

In [65]:

vcf_file = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/test_2/input_data/subs_0.25_indel_0.05/run_7/variants_7.vcf"

bigwig_file = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/BW/reference_bw_to_use.bw"
fasta_reference = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/FASTA/dm6_reference.fasta"

alt_bigwig_file = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/test_2/input_data/subs_0.25_indel_0.05/run_7/tf_profile_7.bw"
fasta_mutated =  "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/test_2/input_data/subs_0.25_indel_0.05/run_7/mutated_7.fasta"


genomes = ["Drosophila_reference", "Drosophila_synthetic"] 

genomeFnames = {"Drosophila_reference": fasta_reference,
               "Drosophila_synthetic" : fasta_mutated}



contributionsFnames = {"Drosophila_reference" : bigwig_file,  
                    "Drosophila_synthetic" : alt_bigwig_file}



In [44]:
'''
# Get the true alignment 
alnA_true, alnB_true  = vcf_to_alignment(vcf_file, fasta_reference)
alnA_algo = alnA_true.copy()
#alnA_algo[12] = 'C'
alnB_algo = alnB_true.copy()
'''

"\n# Get the true alignment \nalnA_true, alnB_true  = vcf_to_alignment(vcf_file, fasta_reference)\nalnA_algo = alnA_true.copy()\n#alnA_algo[12] = 'C'\nalnB_algo = alnB_true.copy()\n"

In [45]:
"""
# NO indels
vcf_file = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/no_indels/variants_no_indel.vcf"

bigwig_file = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/BW/reference_all_neutral.bw"
fasta_reference = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/FASTA/dm6_reference.fasta"

alt_bigwig_file = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/no_indels/tf_profile_no_indel.bw"
fasta_mutated =  "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/no_indels/dm6_reference_ms.fasta"


genomes = ["Drosophila_reference", "Drosophila_synthetic"] 

genomeFnames = {"Drosophila_reference": fasta_reference,
               "Drosophila_synthetic" : fasta_mutated}



contributionsFnames = {"Drosophila_reference" : bigwig_file,  
                    "Drosophila_synthetic" : alt_bigwig_file}

"""


'\n# NO indels\nvcf_file = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/no_indels/variants_no_indel.vcf"\n\nbigwig_file = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/BW/reference_all_neutral.bw"\nfasta_reference = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/FASTA/dm6_reference.fasta"\n\nalt_bigwig_file = "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/no_indels/tf_profile_no_indel.bw"\nfasta_mutated =  "/mnt/c/Users/ninoz_5pamwj8/OneDrive/Desktop/capstone_all/synthetic_data/no_indels/dm6_reference_ms.fasta"\n\n\ngenomes = ["Drosophila_reference", "Drosophila_synthetic"] \n\ngenomeFnames = {"Drosophila_reference": fasta_reference,\n               "Drosophila_synthetic" : fasta_mutated}\n\n\n\ncontributionsFnames = {"Drosophila_reference" : bigwig_file,  \n                    "Drosophila_synthetic" : alt_bigwig_file}\n\n'

In [66]:
alpha_values = [0.8] 

seqprof, _ = getSequences_synthetic(
    normalize=lambda x: (x/np.sum(x) * x.shape[0]),
    genomes=genomes,
    genomeFnames=genomeFnames,
    contributionsFnames=contributionsFnames
)

#alnA_algo = alnA_true.copy()
#alnB_algo = alnB_true.copy()

# Process all alpha values
for alpha in alpha_values:
    alnA_algo, alnB_algo, dist = gotoh(
        seqprof["Drosophila_reference"],
        seqprof["Drosophila_synthetic"],
        u=2,  # gap extension penalty
        v=5,  # gap open penalty
        delta=seqProfile_modified(alpha, relative=False, threshold=0)
    )


#alnA_algo_seqs = [extract_sequence_element(element) for element in alnA_algo]
#alnB_algo_seqs = [extract_sequence_element(element) for element in alnB_algo]
print(alnA_algo)
alnA_true, alnB_true  = vcf_to_alignment(vcf_file, fasta_reference)
table1_true, table2_true = get_mutation_data(alnA_true, alnB_true)
table1_algo, table2_algo = get_mutation_data(alnA_algo, alnB_algo)

score = calculate_total_score(table1_algo, table2_algo, table1_true, table2_true)
print(score)

[<align_and_score.Element object at 0x7fc6866f5c40>, <align_and_score.Element object at 0x7fc686797d70>, <align_and_score.Element object at 0x7fc686795760>, <align_and_score.Element object at 0x7fc686797170>, <align_and_score.Element object at 0x7fc68675a090>, <align_and_score.Element object at 0x7fc684becfb0>, <align_and_score.Element object at 0x7fc684bec260>, <align_and_score.Element object at 0x7fc684bec110>, <align_and_score.Element object at 0x7fc684bec0e0>, <align_and_score.Element object at 0x7fc684bec320>, <align_and_score.Element object at 0x7fc684bec200>, <align_and_score.Element object at 0x7fc684bec2c0>, <align_and_score.Element object at 0x7fc684bec290>, <align_and_score.Element object at 0x7fc684bec3b0>, <align_and_score.Element object at 0x7fc684bec3e0>, <align_and_score.Element object at 0x7fc684bec830>, <align_and_score.Element object at 0x7fc684bed430>, <align_and_score.Element object at 0x7fc684bec380>, <align_and_score.Element object at 0x7fc684bec560>, <align_and_

In [None]:
def score_mutation_tables_with_history(algo_table, true_table, is_table2=False):
    """
    Compare mutation tables (Table 1 or Table 2) and calculate the score.
    
    Args:
        algo_table: Algorithm-generated mutation data
        true_table: Ground-truth mutation data
        is_table2: If True, only compare insertions (ignores substitutions/no mutations)
    
    Returns:
        Score (int)
    """
    
    # Convert tables to dictionaries: {base_idx: (mutation_type, info)}
    algo_dict = {entry[0]: (entry[1], entry[2]) for entry in algo_table}
    true_dict = {entry[0]: (entry[1], entry[2]) for entry in true_table}
    
    # All indices (union of algo and truth indices)
    all_indices = set(algo_dict.keys()).union(true_dict.keys())
    score = 0
    score_history = [score]
    for idx in all_indices:
        algo_entry = algo_dict.get(idx, None)
        true_entry = true_dict.get(idx, None)
        
        # Skip if comparing Table 2 and mutation is not an insertion
        if is_table2:
            if algo_entry and algo_entry[0] != "Insertion":
                algo_entry = None  # Treat as non-existent for scoring
            if true_entry and true_entry[0] != "Insertion":
                true_entry = None
        
        # Case 1: Both entries exist
        if algo_entry and true_entry:
            if algo_entry == true_entry:
                score += 1  # Correct match
            else:
                score -= 1  # Mismatch
        # Case 2: Missing entry in one of the tables
        elif algo_entry != true_entry:
            score -= 1  # Penalize missing/mismatched entries
        score_history.append(score)
    
    return score_history

def calculate_total_score_with_history(algo_table1, algo_table2, true_table1, true_table2):
    """Compute total score by comparing Table 1 and Table 2."""
    # Score Table 1 (substitutions/deletions)
    score_history_t1 = score_mutation_tables_with_history(algo_table1, true_table1)
    #print(table1_score)
    # Score Table 2 (insertions only)
    score_history_t2 = score_mutation_tables_with_history(algo_table2, true_table2, is_table2=True)
    #print(table2_score)

    
    return score_history_t1, score_history_t2

In [None]:
score_history_t1, score_history_t2 = calculate_total_score_with_history(table1_algo, table2_algo, table1_true, table2_true)
x_axis_1 = np.arange(0, len(score_history_t1))
x_axis_2 = np.arange(0, len(score_history_t2))
plt.plot(x_axis_1, score_history_t1)
plt.plot(x_axis_2, score_history_t2)       

print(max(score_history_t2))