# Functional Validation of MutagenesisForge

This notebook demonstrates that `MutagenesisForge` runs successfully on example data. It serves as manual verification for the functionality of the software submitted to JOSS.

In [4]:
# Install MutagenesisForge
!pip install MutagenesisForge

# Import MutagenesisForge
import src.MutagenesisForge as mf


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [5]:
# Import example data
bed = 'example/example.bed'
vcf = 'example/real.vcf'
fasta = 'example/example.fa'

In [6]:
# Test MutagenesisForge functionality

# Test mf.exhaustive
try:
    exhaust_ratio = mf.exhaustive(
        fasta=fasta
    )
    print(f"Exhaustive ratio: {exhaust_ratio}")
except Exception as e:
    print(f"MutagenesisForge failed with error: {e}")

# Test mf.context
try:
    context_ratio = mf.context(
        fasta=fasta,
        vcf=vcf,
        context_model='blind'
    )
    print(f"Context ratio: {context_ratio}")
except Exception as e:
    print(f"MutagenesisForge failed with error: {e}")

2025-07-23 22:08:42,055 - INFO - Loaded 1 regions
2025-07-23 22:08:42,055 - INFO - Processing record: 21:47545973 C -> ('T',)
[W::vcf_parse] Contig '21' is not defined in the header. (Quick workaround: index the file with tabix.)


Exhaustive ratio: 3.84514502890531


2025-07-23 22:08:59,451 - INFO - Processing record: 21:37431093 G -> ('T',)
2025-07-23 22:09:16,877 - INFO - Processing record: 21:37787598 T -> ('A',)
2025-07-23 22:09:34,404 - INFO - Processing record: 21:46696962 G -> ('A',)
2025-07-23 22:09:51,919 - INFO - Processing record: 21:47611115 G -> ('A',)
2025-07-23 22:10:09,638 - INFO - Processing record: 21:31066230 C -> ('A',)


KeyboardInterrupt: 

In [7]:
# Debug the MutationModel to understand what parameters it needs

# Test 1: Try creating the model with default parameters
try:
    model1 = mf.MutationModel(model_type="random")
    print("Model created successfully with defaults")
    
    # Test mutation
    result = model1.mutate('C')
    print(f"Mutation result: C -> {result}")
    
except Exception as e:
    print(f"Default model failed: {e}")
    import traceback
    traceback.print_exc()

print("\n" + "="*50 + "\n")

# Test 2: Try with explicit base frequencies
try:
    model2 = mf.MutationModel(
        model_type="random",
        pi_a=0.25,
        pi_c=0.25,
        pi_g=0.25,
        pi_t=0.25
    )
    print("Model created successfully with base frequencies")
    
    # Test mutation
    result = model2.mutate('C')
    print(f"Mutation result: C -> {result}")
    
except Exception as e:
    print(f"Model with base frequencies failed: {e}")
    import traceback
    traceback.print_exc()

print("\n" + "="*50 + "\n")

# Test 3: Check what the MutationModel constructor expects
import inspect
sig = inspect.signature(mf.MutationModel.__init__)
print("MutationModel constructor signature:")
for param_name, param in sig.parameters.items():
    if param_name != 'self':
        print(f"  {param_name}: {param.annotation} = {param.default}")

Model created successfully with defaults
Mutation result: C -> T


Model created successfully with base frequencies
Mutation result: C -> T


MutationModel constructor signature:
  model_type: <class 'str'> = <class 'inspect._empty'>
  gamma: <class 'float'> = 1.0
  alpha: <class 'float'> = None
  beta: <class 'float'> = None
  pi_a: <class 'float'> = None
  pi_c: <class 'float'> = None
  pi_g: <class 'float'> = None
  pi_t: <class 'float'> = None


In [8]:
# Debug what's happening in the context function
import pysam

# Test with a single VCF record to see what's going wrong
try:
    with pysam.VariantFile(vcf) as vcf_file, pysam.FastaFile(fasta) as fasta_file:
        
        # Get the first record that's causing issues
        for record in vcf_file:
            if record.chrom == '21' and record.pos in [47545973, 37431093]:  # Known problematic positions
                print(f"Debugging record: {record.chrom}:{record.pos} {record.ref} -> {record.alts}")
                
                # Check trinucleotide context
                try:
                    
                    
                    # Create the mutation model (same as in context function)
                    mutation_model = mf.MutationModel(model_type="random")
                    
                    # Get trinucleotide context (same logic as in process_vcf_record)
                    chrom_length = fasta_file.get_reference_length(record.chrom)
                    print(f"Chromosome length: {chrom_length}")
                    print(f"Position bounds check: {record.pos} >= 2: {record.pos >= 2}, {record.pos} < {chrom_length}: {record.pos < chrom_length}")
                    
                    if record.pos < 2 or record.pos >= chrom_length:
                        print("Position out of bounds for trinucleotide context")
                        continue
                        
                    before = fasta_file.fetch(record.chrom, record.pos - 2, record.pos - 1).upper()
                    current = fasta_file.fetch(record.chrom, record.pos - 1, record.pos).upper()
                    after = fasta_file.fetch(record.chrom, record.pos, record.pos + 1).upper()
                    
                    print(f"Trinucleotide context: {before}-{current}-{after}")
                    print(f"Record ref: {record.ref}")
                    print(f"Context current matches record ref: {current == record.ref}")
                    
                    # Test the mutation directly
                    print(f"Testing mutation of '{current}' (type: {type(current)})")
                    
                    # This is where the error likely occurs
                    mutated = mutation_model.mutate(current)
                    print(f"Mutation successful: {current} -> {mutated}")
                    
                except Exception as e:
                    print(f"Error in debugging: {e}")
                    import traceback
                    traceback.print_exc()
                
                break  # Only test first problematic record
                
except Exception as e:
    print(f"Error opening files: {e}")
    import traceback
    traceback.print_exc()

Debugging record: 21:47545973 C -> ('T',)
Chromosome length: 48129895
Position bounds check: 47545973 >= 2: True, 47545973 < 48129895: True
Trinucleotide context: G-C-G
Record ref: C
Context current matches record ref: True
Testing mutation of 'C' (type: <class 'str'>)
Mutation successful: C -> G


[W::vcf_parse] Contig '21' is not defined in the header. (Quick workaround: index the file with tabix.)


In [9]:
# Test context with a smaller dataset and without relying on cache
try:
    # Process only a few records to isolate the issue
    with pysam.VariantFile(vcf) as vcf_file, pysam.FastaFile(fasta) as fasta_file:
        
        # Create a fresh mutation model
        mutation_model = mf.MutationModel(model_type="random")
        
        # Load regions
        regions = []
        for chrom in fasta_file.references:
            length = fasta_file.get_reference_length(chrom)
            regions.append(f"{chrom}\t0\t{length}")
        
        print(f"Loaded {len(regions)} regions")
        
        # Process just the first few records manually
        processed_count = 0
        for i, record in enumerate(vcf_file):
            if i >= 3:  # Only process first 3 records
                break
                
            print(f"Processing record {i+1}: {record.chrom}:{record.pos} {record.ref} -> {record.alts}")
            
            try:
                # Get trinucleotide context
                if record.pos < 2 or record.pos >= fasta_file.get_reference_length(record.chrom):
                    print(f"  Skipping - position too close to boundary")
                    continue
                    
                before = fasta_file.fetch(record.chrom, record.pos - 2, record.pos - 1).upper()
                current = fasta_file.fetch(record.chrom, record.pos - 1, record.pos).upper()
                after = fasta_file.fetch(record.chrom, record.pos, record.pos + 1).upper()
                
                print(f"  Trinucleotide: {before}-{current}-{after}")
                
                # Skip if invalid bases
                if not all(base in {'A', 'C', 'G', 'T'} for base in [before, current, after]):
                    print(f"  Skipping - invalid bases")
                    continue
                
                # Create a NEW mutation model for each record (to avoid any state issues)
                fresh_model = mf.MutationModel(model_type="random")
                
                # Test mutation
                print(f"  Testing mutation of {current}...")
                mutated_base = fresh_model.mutate(current)
                print(f"  Mutation: {current} -> {mutated_base}")
                
                processed_count += 1
                
            except Exception as e:
                print(f"  Error processing record: {e}")
                import traceback
                traceback.print_exc()
        
        print(f"Successfully processed {processed_count} records manually")
        
except Exception as e:
    print(f"Manual processing failed: {e}")
    import traceback
    traceback.print_exc()

Loaded 1 regions
Processing record 1: 21:47545973 C -> ('T',)
  Trinucleotide: G-C-G
  Testing mutation of C...
  Mutation: C -> T
Processing record 2: 21:37431093 G -> ('T',)
  Trinucleotide: G-G-T
  Testing mutation of G...
  Mutation: G -> G
Processing record 3: 21:37787598 T -> ('A',)
  Trinucleotide: A-T-A
  Testing mutation of T...
  Mutation: T -> A
Successfully processed 3 records manually


[W::vcf_parse] Contig '21' is not defined in the header. (Quick workaround: index the file with tabix.)


In [11]:
# Test with different context models to see if 'blind' is the issue
context_models_to_test = ['ra', 'ra_ba', 'ra_aa']

for context_model in context_models_to_test:
    try:
        print(f"\nTesting context model: {context_model}")
        context_ratio = mf.context(
            fasta=fasta,
            vcf=vcf,
            context_model=context_model,
            model='random'
        )
        print(f"Context ratio for {context_model}: {context_ratio}")
        break  # If one works, we found a workaround
        
    except Exception as e:
        print(f"Context model {context_model} failed: {e}")

# If all fail, try with explicit parameters
print("\nTrying with explicit mutation model parameters...")
try:
    context_ratio = mf.context(
        fasta=fasta,
        vcf=vcf,
        context_model='blind',
        model='random',
        pi_a=0.25,
        pi_c=0.25,
        pi_g=0.25,
        pi_t=0.25
    )
    print(f"Context ratio with explicit parameters: {context_ratio}")
except Exception as e:
    print(f"Still failed with explicit parameters: {e}")

2025-07-23 22:13:37,828 - INFO - Loaded 1 regions
[W::vcf_parse] Contig '21' is not defined in the header. (Quick workaround: index the file with tabix.)
2025-07-23 22:13:37,829 - INFO - Processing record: 21:47545973 C -> ('T',)



Testing context model: ra


2025-07-23 22:13:55,484 - INFO - Processing record: 21:37431093 G -> ('T',)
2025-07-23 22:14:13,557 - INFO - Processing record: 21:37787598 T -> ('A',)
2025-07-23 22:14:31,241 - INFO - Processing record: 21:46696962 G -> ('A',)
2025-07-23 22:14:49,474 - INFO - Processing record: 21:47611115 G -> ('A',)


KeyboardInterrupt: 

In [12]:
# Let's monkey-patch the process_vcf_record function to add debugging
import pysam

# Create a debugging wrapper for the mutation model
class DebuggingMutationModel:
    def __init__(self, original_model):
        self.original_model = original_model
        self.call_count = 0
    
    def mutate(self, base):
        self.call_count += 1
        print(f"[DEBUG {self.call_count}] Mutating base: '{base}' (type: {type(base)})")
        try:
            result = self.original_model.mutate(base)
            print(f"[DEBUG {self.call_count}] Mutation successful: {base} -> {result}")
            return result
        except Exception as e:
            print(f"[DEBUG {self.call_count}] Mutation failed: {e}")
            print(f"[DEBUG {self.call_count}] Base value: {repr(base)}")
            print(f"[DEBUG {self.call_count}] Model state: {vars(self.original_model)}")
            raise

# Test with debugging wrapper
try:
    with pysam.VariantFile(vcf) as vcf_file, pysam.FastaFile(fasta) as fasta_file:
        
        # Create original model
        original_model = mf.MutationModel(model_type="random")
        debug_model = DebuggingMutationModel(original_model)
        
        # Load regions
        regions = []
        for chrom in fasta_file.references:
            length = fasta_file.get_reference_length(chrom)
            regions.append(f"{chrom}\t0\t{length}")
        
        print(f"Loaded {len(regions)} regions")
        
        # Process first few records with detailed debugging
        for i, record in enumerate(vcf_file):
            if i >= 2:  # Only process first 2 records
                break
                
            print(f"\n{'='*60}")
            print(f"Processing record {i+1}: {record.chrom}:{record.pos} {record.ref} -> {record.alts}")
            
            try:
                # Simulate the exact same logic as process_vcf_record
                chrom = record.chrom
                pos = record.pos
                ref_base = record.ref
                alt_base = record.alts[0] if record.alts else None
                
                if alt_base is None:
                    print("No alternative allele, skipping")
                    continue
                
                # Get trinucleotide context (same as in process_vcf_record)
                if pos < 2 or pos >= fasta_file.get_reference_length(chrom):
                    print("Position out of bounds, skipping")
                    continue
                    
                before_base = fasta_file.fetch(chrom, pos - 2, pos - 1).upper()
                ref_base_context = fasta_file.fetch(chrom, pos - 1, pos).upper()
                after_base = fasta_file.fetch(chrom, pos, pos + 1).upper()
                
                print(f"Context: {before_base}-{ref_base_context}-{after_base}")
                
                # Skip if invalid bases
                if not all(base and base in {'A', 'C', 'G', 'T'} for base in [before_base, ref_base_context, after_base]):
                    print("Invalid bases in context, skipping")
                    continue
                
                # This is where get_matching_positions is called in the real function
                print("About to call get_matching_positions...")
                
                # Simulate get_matching_positions call (simplified)
                matching_positions = []
                context_model = 'blind'
                
                # Look for positions in first region only (to limit scope)
                region = regions[0]  # Just test with first region
                parts = region.split()
                chrom_name, start, end = parts[0], int(parts[1]), int(parts[2])
                
                # Limit search to first 10000 positions to avoid long processing
                end = min(start + 10000, end)
                print(f"Searching region {chrom_name}:{start}-{end}")
                
                for test_pos in range(max(2, start), min(end, fasta_file.get_reference_length(chrom_name) - 1)):
                    try:
                        # This calls get_base multiple times (potential issue location)
                        pos_before = fasta_file.fetch(chrom_name, test_pos - 2, test_pos - 1).upper()
                        pos_current = fasta_file.fetch(chrom_name, test_pos - 1, test_pos).upper()
                        pos_after = fasta_file.fetch(chrom_name, test_pos, test_pos + 1).upper()
                        
                        if all(base and base in {'A', 'C', 'G', 'T'} for base in [pos_before, pos_current, pos_after]):
                            matching_positions.append((chrom_name, test_pos))
                            if len(matching_positions) >= 10:  # Just get 10 matches
                                break
                                
                    except Exception as e:
                        print(f"Error checking position {test_pos}: {e}")
                        continue
                
                if not matching_positions:
                    print("No matching positions found")
                    continue
                
                print(f"Found {len(matching_positions)} matching positions")
                
                # Randomly select a position
                import numpy as np
                random_chr, random_pos = matching_positions[np.random.randint(len(matching_positions))]
                print(f"Selected random position: {random_chr}:{random_pos}")
                
                # Get codon context at random position
                random_before = fasta_file.fetch(random_chr, random_pos - 2, random_pos - 1).upper()
                random_current = fasta_file.fetch(random_chr, random_pos - 1, random_pos).upper()
                random_after = fasta_file.fetch(random_chr, random_pos, random_pos + 1).upper()
                
                print(f"Random position context: {random_before}-{random_current}-{random_after}")
                
                # This is where the mutation error occurs
                print("About to call mutate...")
                random_alt_base = debug_model.mutate(random_current)
                
                print(f"Mutation completed: {random_current} -> {random_alt_base}")
                
            except Exception as e:
                print(f"Error in detailed processing: {e}")
                import traceback
                traceback.print_exc()
                break  # Stop on first error
        
        print(f"\nTotal mutation calls: {debug_model.call_count}")
        
except Exception as e:
    print(f"Error in debugging: {e}")
    import traceback
    traceback.print_exc()

Loaded 1 regions

Processing record 1: 21:47545973 C -> ('T',)
Context: G-C-G
About to call get_matching_positions...
Searching region 21:0-10000
No matching positions found

Processing record 2: 21:37431093 G -> ('T',)
Context: G-G-T
About to call get_matching_positions...
Searching region 21:0-10000
No matching positions found

Total mutation calls: 0


[W::vcf_parse] Contig '21' is not defined in the header. (Quick workaround: index the file with tabix.)


In [15]:
# Find where the actual sequence starts on chromosome 21
import pysam

try:
    with pysam.FastaFile(fasta) as fasta_file:
        chrom = "21"
        chrom_length = fasta_file.get_reference_length(chrom)
        
        print(f"Searching for start of valid sequence on chromosome {chrom}")
        
        # Binary search approach to find the start of valid sequence
        # We know position 47545973 has valid sequence, so search before that
        
        # First, let's check some strategic positions
        test_positions = [
            1000000,   # 1M
            5000000,   # 5M  
            10000000,  # 10M
            15000000,  # 15M
            20000000,  # 20M
            25000000,  # 25M
            30000000,  # 30M
            35000000,  # 35M
            40000000,  # 40M
            45000000,  # 45M
            47000000,  # 47M
        ]
        
        valid_start = None
        
        for pos in test_positions:
            if pos >= chrom_length:
                continue
                
            try:
                # Check a small window around this position
                sequence = fasta_file.fetch(chrom, pos, pos + 100).upper()
                valid_count = sum(1 for base in sequence if base in {'A', 'C', 'G', 'T'})
                
                print(f"Position {pos:8d}: {valid_count:2d}/100 valid bases - {sequence[:20]}...")
                
                if valid_count > 50:  # More than 50% valid bases
                    valid_start = pos
                    print(f"Found valid sequence starting around position {pos}")
                    break
                    
            except Exception as e:
                print(f"Error checking position {pos}: {e}")
                
        if valid_start:
            # Create a BED file with the valid region
            bed_content = f"21\t{valid_start}\t{chrom_length}\n"
            temp_bed = "chr21_valid_region.bed"
            
            with open(temp_bed, 'w') as f:
                f.write(bed_content)
            
            print(f"\nCreated BED file with valid region: {valid_start}-{chrom_length}")
            print(f"BED file content: {bed_content.strip()}")
            
            # Test the context function with this BED file
            try:
                print("Testing context function with valid region BED file...")
                context_ratio = mf.context(
                    fasta=fasta,
                    vcf=vcf,
                    bed=temp_bed,
                    context_model='blind',
                    model='random'
                )
                print(f"SUCCESS! Context ratio: {context_ratio}")
                
            except Exception as e:
                print(f"Context function still failed: {e}")
                # Leave the BED file for manual inspection
                print(f"BED file saved as: {temp_bed}")
                
        else:
            print("Could not find start of valid sequence!")
            
            # As a last resort, create a BED file around our known good positions
            vcf_positions = [47545973, 37431093, 37787598]  # From your VCF
            
            print(f"Creating BED file around known VCF positions: {vcf_positions}")
            
            bed_regions = []
            for pos in vcf_positions:
                # Create 1MB region around each position
                start = max(0, pos - 500000)
                end = min(chrom_length, pos + 500000)
                bed_regions.append(f"21\t{start}\t{end}")
            
            bed_content = "\n".join(bed_regions) + "\n"
            temp_bed = "chr21_vcf_regions.bed"
            
            with open(temp_bed, 'w') as f:
                f.write(bed_content)
            
            print(f"Created BED file around VCF positions:")
            print(bed_content)
            
            # Test with this BED file
            try:
                print("Testing context function with VCF-region BED file...")
                context_ratio = mf.context(
                    fasta=fasta,
                    vcf=vcf,
                    bed=temp_bed,
                    context_model='blind',
                    model='random'
                )
                print(f"SUCCESS! Context ratio: {context_ratio}")
                
            except Exception as e:
                print(f"Still failed: {e}")
                print(f"BED file saved as: {temp_bed}")
        
except Exception as e:
    print(f"Error: {e}")
    import traceback
    traceback.print_exc()

2025-07-23 22:19:23,741 - INFO - Loaded 1 regions
[W::vcf_parse] Contig '21' is not defined in the header. (Quick workaround: index the file with tabix.)
2025-07-23 22:19:23,742 - INFO - Processing record: 21:47545973 C -> ('T',)
2025-07-23 22:19:23,745 - INFO - Processing record: 21:37431093 G -> ('T',)
2025-07-23 22:19:23,747 - INFO - Processing record: 21:37787598 T -> ('A',)
2025-07-23 22:19:23,750 - INFO - Processing record: 21:46696962 G -> ('A',)
2025-07-23 22:19:23,753 - INFO - Processing record: 21:47611115 G -> ('A',)
2025-07-23 22:19:23,756 - INFO - Processing record: 21:31066230 C -> ('A',)
2025-07-23 22:19:23,759 - INFO - Processing record: 21:27097008 A -> ('G',)
2025-07-23 22:19:23,762 - INFO - Processing record: 21:44448948 G -> ('A',)
2025-07-23 22:19:23,765 - INFO - Processing record: 21:33312522 C -> ('A',)
2025-07-23 22:19:23,765 - INFO - Processing record: 21:43783495 C -> ('T',)
2025-07-23 22:19:23,768 - INFO - Processing record: 21:32595762 C -> ('T',)
2025-07-23

Searching for start of valid sequence on chromosome 21
Position  1000000:  0/100 valid bases - NNNNNNNNNNNNNNNNNNNN...
Position  5000000:  0/100 valid bases - NNNNNNNNNNNNNNNNNNNN...
Position 10000000: 100/100 valid bases - GAATGTAGGAGGGTAGGGAG...
Found valid sequence starting around position 10000000

Created BED file with valid region: 10000000-48129895
BED file content: 21	10000000	48129895
Testing context function with valid region BED file...
SUCCESS! Context ratio: inf
