SETUP & IMPORTS

In [1]:
import sys
import os
sys.path.append('../src')

import pandas as pd
import numpy as np
from collections import Counter
from utils import setup_logging, clean_sequence
from config import Config

print("UNIT TESTING FOR DATA PROCESSING")
print("="*40)

UNIT TESTING FOR DATA PROCESSING


TEST DATA

In [2]:
def create_test_data():
    """Create simple test data"""
    
    base_seq = "ATGCCCGGATCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCCAAATTTGGGCCC"
    
    test_data = [
        {"accession_id": "TEST001", "sequence": base_seq, "sequence_length": len(base_seq)},
        {"accession_id": "TEST002", "sequence": base_seq.replace("ATG", "ACG"), "sequence_length": len(base_seq)},
        {"accession_id": "TEST003", "sequence": base_seq, "sequence_length": len(base_seq)},  # Duplicate
        {"accession_id": "TEST004", "sequence": "SHORT", "sequence_length": 5},  # Too short
        {"accession_id": "TEST005", "sequence": "N" * len(base_seq), "sequence_length": len(base_seq)},  # Too many Ns
    ]
    
    return pd.DataFrame(test_data)

test_df = create_test_data()
print(f"Created {len(test_df)} test sequences")

Created 5 test sequences


COPY FUNCTIONS FROM NOTEBOOK 03

In [3]:
def calculate_gc_content(sequence):
    """Calculate GC content"""
    if not sequence:
        return 0.0
    seq_upper = sequence.upper()
    gc_count = seq_upper.count('G') + seq_upper.count('C')
    return (gc_count / len(sequence)) * 100

def simple_qc(df):
    """Simple quality control"""
    if df.empty:
        return df, {"initial": 0, "final": 0}
    
    initial = len(df)
    
    # Remove duplicates
    df_clean = df.drop_duplicates(subset=['sequence']).copy()
    
    # Length filter
    df_clean = df_clean[
        (df_clean['sequence_length'] >= Config.MIN_SEQUENCE_LENGTH) &
        (df_clean['sequence_length'] <= Config.MAX_SEQUENCE_LENGTH)
    ].copy()
    
    # Remove high ambiguous content
    def ambiguous_percentage(seq):
        return (sum(1 for c in seq if c in 'NRYSWKMBDHV') / len(seq)) * 100
    
    df_clean['amb_pct'] = df_clean['sequence'].apply(ambiguous_percentage)
    df_clean = df_clean[df_clean['amb_pct'] <= 10].copy()
    
    stats = {"initial": initial, "final": len(df_clean)}
    return df_clean, stats

def simple_haplotypes(df):
    """Simple haplotype identification"""
    if df.empty:
        return df, []
    
    sequences = df['sequence'].tolist()
    seq_counts = Counter(sequences)
    
    haplotypes = []
    seq_to_hap = {}
    
    for i, (seq, count) in enumerate(seq_counts.most_common(), 1):
        hap_id = f"Hap_{i:03d}"
        seq_to_hap[seq] = hap_id
        haplotypes.append({
            'haplotype_id': hap_id,
            'count': count,
            'frequency': count / len(sequences)
        })
    
    df_hap = df.copy()
    df_hap['haplotype_id'] = df_hap['sequence'].map(seq_to_hap)
    
    return df_hap, haplotypes

SIMPLE TESTS

In [4]:
def test_gc_content():
    """Test GC content calculation"""
    print("\nTesting GC content...")
    
    test_cases = [
        ("ATGC", 50.0),
        ("AAAA", 0.0),
        ("GGGG", 100.0)
    ]
    
    for seq, expected in test_cases:
        result = calculate_gc_content(seq)
        assert abs(result - expected) < 0.1, f"GC test failed for {seq}"
    
    print("✓ GC content tests passed")

def test_quality_control():
    """Test quality control"""
    print("\nTesting quality control...")
    
    df_clean, stats = simple_qc(test_df)
    
    assert stats['initial'] == 5, "Initial count wrong"
    assert stats['final'] <= stats['initial'], "Final count too high"
    assert stats['final'] >= 0, "Final count negative"
    
    if not df_clean.empty:
        # Check length requirements
        assert df_clean['sequence_length'].min() >= Config.MIN_SEQUENCE_LENGTH
        assert df_clean['sequence_length'].max() <= Config.MAX_SEQUENCE_LENGTH
    
    print(f"✓ QC passed: {stats['final']}/{stats['initial']} sequences retained")

def test_haplotype_identification():
    """Test haplotype identification"""
    print("\nTesting haplotype identification...")
    
    df_clean, _ = simple_qc(test_df)
    
    if not df_clean.empty:
        df_hap, haplotypes = simple_haplotypes(df_clean)
        
        assert len(haplotypes) > 0, "No haplotypes found"
        assert len(haplotypes) <= len(df_clean), "Too many haplotypes"
        assert not df_hap['haplotype_id'].isna().any(), "Missing haplotype IDs"
        
        # Check frequencies sum to 1
        total_freq = sum(h['frequency'] for h in haplotypes)
        assert abs(total_freq - 1.0) < 0.01, "Frequencies don't sum to 1"
        
        print(f"✓ Haplotypes passed: {len(haplotypes)} haplotypes identified")
    else:
        print("! No sequences for haplotype test")

def test_empty_data():
    """Test empty dataframe handling"""
    print("\nTesting empty data handling...")
    
    empty_df = pd.DataFrame()
    
    # Test QC
    df_clean, stats = simple_qc(empty_df)
    assert df_clean.empty, "Empty QC failed"
    assert stats['initial'] == 0, "Empty initial count wrong"
    
    # Test haplotypes
    df_hap, haplotypes = simple_haplotypes(empty_df)
    assert df_hap.empty, "Empty haplotype failed"
    assert len(haplotypes) == 0, "Empty haplotype list failed"
    
    print("✓ Empty data handling passed")

RUN ALL TESTS

In [8]:
def run_all_tests():
    """Run all tests"""
    print(f"\n{'='*40}")
    print("RUNNING ALL TESTS")
    print(f"{'='*40}")
    
    tests = [
        test_gc_content,
        test_quality_control,
        test_haplotype_identification,
        test_empty_data
    ]
    
    passed = 0
    failed = 0
    
    for test in tests:
        try:
            test()
            passed += 1
        except Exception as e:
            print(f"❌ {test.__name__} FAILED: {e}")
            failed += 1
    
    print(f"\n{'='*40}")
    print("TEST RESULTS")
    print(f"{'='*40}")
    print(f"Passed: {passed}")
    print(f"Failed: {failed}")
    
    if failed == 0:
        print("ALL TESTS PASSED!")
    else:
        print("SOME TESTS FAILED")
    
    return failed == 0

INTEGRATION TEST

In [6]:
def integration_test():
    """Run full pipeline test"""
    print(f"\n{'='*40}")
    print("INTEGRATION TEST")
    print(f"{'='*40}")
    
    # Run full pipeline
    df_clean, qc_stats = simple_qc(test_df)
    
    if not df_clean.empty:
        df_final, haplotypes = simple_haplotypes(df_clean)
        
        print(f"✓ Pipeline complete:")
        print(f"  Input: {qc_stats['initial']} sequences")
        print(f"  QC passed: {qc_stats['final']} sequences")
        print(f"  Haplotypes: {len(haplotypes)}")
        
        return True
    else:
        print("Integration test failed: No sequences passed QC")
        return False

RUN TESTS

In [7]:
# Run unit tests
success = run_all_tests()

# Run integration test
integration_success = integration_test()

# Final summary
print(f"\n{'='*40}")
print("FINAL SUMMARY")
print(f"{'='*40}")

if success and integration_success:
    print("✅ ALL TESTS PASSED - Pipeline ready for use!")
else:
    print("❌ SOME TESTS FAILED - Fix issues before using pipeline")

print("Unit testing complete!")


RUNNING ALL TESTS

Testing GC content...
✓ GC content tests passed

Testing quality control...
✓ QC passed: 2/5 sequences retained

Testing haplotype identification...
✓ Haplotypes passed: 2 haplotypes identified

Testing empty data handling...
✓ Empty data handling passed

TEST RESULTS
Passed: 4
Failed: 0
ALL TESTS PASSED!

INTEGRATION TEST
✓ Pipeline complete:
  Input: 5 sequences
  QC passed: 2 sequences
  Haplotypes: 2

FINAL SUMMARY
✅ ALL TESTS PASSED - Pipeline ready for use!
Unit testing complete!
