In [3]:
import pandas as pd
from Bio import SeqIO
from Bio.Seq import Seq
import re

def parse_mutation_notation(mutation_string):
    """
    Parse HGVS mutation notation like 'c.2674T>A'
    Returns: (position, ref_base, alt_base)
    """
    match = re.match(r'c\.(\d+)([ACGT])>([ACGT])', mutation_string)
    if match:
        position = int(match.group(1))
        ref_base = match.group(2)
        alt_base = match.group(3)
        return position, ref_base, alt_base
    else:
        raise ValueError(f"Invalid mutation format: {mutation_string}")

def create_mutant_sequence(reference_seq, mutations_list):
    """
    Create mutant sequence by applying multiple mutations
    """
    # Convert to mutable list for easier manipulation
    seq_list = list(str(reference_seq))
    
    # Track mutations for validation
    applied_mutations = []
    failed_mutations = []
    
    for mutation in mutations_list:
        try:
            pos, ref_base, alt_base = parse_mutation_notation(mutation)
            
            # Convert to 0-based indexing for Python
            py_pos = pos - 1
            
            # Check if position is within sequence bounds
            if py_pos >= len(seq_list) or py_pos < 0:
                failed_mutations.append(f"{mutation} (position out of bounds)")
                continue
            
            # Validate reference base matches
            if seq_list[py_pos].upper() != ref_base.upper():
                failed_mutations.append(f"{mutation} (ref base mismatch: expected {ref_base}, found {seq_list[py_pos]})")
                continue
            
            # Apply mutation
            seq_list[py_pos] = alt_base
            applied_mutations.append(mutation)
            
        except ValueError as e:
            failed_mutations.append(f"{mutation} (parse error: {str(e)})")
            continue
    
    # Convert back to string
    mutant_seq = ''.join(seq_list)
    return mutant_seq, applied_mutations, failed_mutations

def add_mutant_sequences_to_dataframe(fasta_file, dataframe, mutation_column='mutation', 
                                    inplace=False, verbose=True):
    """
    Main function to add mutant sequences as new columns in the DataFrame
    
    Parameters:
    - fasta_file: Path to reference FASTA file
    - dataframe: Input DataFrame containing mutations
    - mutation_column: Name of column with mutation notations
    - inplace: Whether to modify the original DataFrame
    - verbose: Whether to print progress information
    
    Returns:
    - DataFrame with new columns added
    """
    
    # Read reference sequence
    reference_record = next(SeqIO.parse(fasta_file, "fasta"))
    reference_seq = reference_record.seq
    reference_id = reference_record.id
    
    if verbose:
        print(f"Reference sequence: {reference_id}")
        print(f"Reference length: {len(reference_seq)}")
    
    # Create a copy if not inplace
    if not inplace:
        df = dataframe.copy()
    else:
        df = dataframe
    
    # Initialize new columns
    df['mutant_sequence'] = None
    df['applied_mutations'] = None
    df['failed_mutations'] = None
    df['all_mutations_valid'] = False
    
    # Process each row in the dataframe
    for idx in df.index:
        mutation = df.loc[idx, mutation_column]
        
        # Handle case where mutation might be a list or single string
        if pd.isna(mutation):
            if verbose:
                print(f"Row {idx}: No mutation found")
            continue
            
        if isinstance(mutation, list):
            mutations_to_apply = mutation
        elif isinstance(mutation, str):
            mutations_to_apply = [mutation]
        else:
            if verbose:
                print(f"Row {idx}: Invalid mutation type: {type(mutation)}")
            continue
        
        try:
            mutant_seq, applied_muts, failed_muts = create_mutant_sequence(
                reference_seq, mutations_to_apply
            )
            
            # Update dataframe
            df.loc[idx, 'mutant_sequence'] = mutant_seq
            df.loc[idx, 'applied_mutations'] = applied_muts
            df.loc[idx, 'failed_mutations'] = failed_muts
            df.loc[idx, 'all_mutations_valid'] = (len(failed_muts) == 0)
            
            if verbose:
                print(f"Row {idx}: Processed {len(mutations_to_apply)} mutations")
                if applied_muts:
                    print(f"  Applied: {applied_muts}")
                if failed_muts:
                    print(f"  Failed: {failed_muts}")
                    
        except Exception as e:
            if verbose:
                print(f"Error processing row {idx}: {e}")
            continue
    
    # Add reference information as DataFrame attributes
    df.attrs['reference_id'] = reference_id
    df.attrs['reference_length'] = len(reference_seq)
    
    if verbose:
        valid_count = df['all_mutations_valid'].sum()
        total_count = len(df)
        print(f"\nSummary: {valid_count}/{total_count} sequences have all mutations applied successfully")
    
    return df

# Example usage with sample data
def create_example_dataframe():
    """Create an example DataFrame for testing"""
    data = {
        'sample_id': ['sample_001', 'sample_002', 'sample_003', 'sample_004', 'sample_005'],
        'mutation': [
            'c.2674T>A', 
            'c.123G>C', 
            ['c.2674T>A', 'c.123G>C'],  # Multiple mutations
            'c.500A>G',
            'c.10000A>T'  # This might fail if sequence is too short
        ],
        'clinical_significance': ['Pathogenic', 'Benign', 'VUS', 'Pathogenic', 'Unknown']
    }
    return pd.DataFrame(data)

# Complete workflow example
if __name__ == "__main__":
    # Create example dataframe
    df = create_example_dataframe()
    
    print("Original DataFrame:")
    print(df)
    print("\n" + "="*50 + "\n")
    
    # Generate mutant sequences and add to dataframe
    try:
        # Replace 'your_gene.fasta' with your actual FASTA file path
        result_df = add_mutant_sequences_to_dataframe(
            fasta_file='reference_gene.fasta',
            dataframe=df,
            mutation_column='mutation',
            inplace=False,
            verbose=True
        )
        
    except FileNotFoundError:
        print("Reference FASTA file not found. Creating example...")
        
        # Create example reference sequence (3000 bp)
        example_reference = "A" * 100 + "T" + "G" * 500 + "C" * 500 + "A" * 1898  # Total: 100+1+500+500+1898=2999
        example_reference += "T"  # Make it 3000 bp total
        
        with open("example_reference.fasta", "w") as f:
            f.write(">example_gene\n")
            # Add some variety to the sequence
            import random
            bases = ['A', 'T', 'G', 'C']
            varied_seq = ''.join(random.choice(bases) for _ in range(1000)) + example_reference[1000:]
            f.write(varied_seq + "\n")
        
        # Now run with the created file
        result_df = add_mutant_sequences_to_dataframe(
            fasta_file='example_reference.fasta',
            dataframe=df,
            mutation_column='mutation',
            inplace=False,
            verbose=True
        )
    
    print("\n" + "="*50)
    print("Final DataFrame with Mutant Sequences:")
    print(result_df[['sample_id', 'mutation', 'all_mutations_valid', 'applied_mutations']])
    
    # Display full results for a sample
    print("\n" + "="*50)
    print("Detailed results for first sample:")
    first_sample = result_df.iloc[0]
    print(f"Sample ID: {first_sample['sample_id']}")
    print(f"Mutation: {first_sample['mutation']}")
    print(f"All valid: {first_sample['all_mutations_valid']}")
    print(f"Applied mutations: {first_sample['applied_mutations']}")
    print(f"Failed mutations: {first_sample['failed_mutations']}")
    if first_sample['mutant_sequence']:
        print(f"Mutant sequence length: {len(first_sample['mutant_sequence'])}")
        print(f"First 100 bases: {first_sample['mutant_sequence'][:100]}...")

Original DataFrame:
    sample_id               mutation clinical_significance
0  sample_001              c.2674T>A            Pathogenic
1  sample_002               c.123G>C                Benign
2  sample_003  [c.2674T>A, c.123G>C]                   VUS
3  sample_004               c.500A>G            Pathogenic
4  sample_005             c.10000A>T               Unknown


Reference FASTA file not found. Creating example...
Reference sequence: example_gene
Reference length: 3000
Error processing row 0: Must have equal len keys and value when setting with an iterable
Error processing row 1: Must have equal len keys and value when setting with an iterable


ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

In [2]:
import sys
print(sys.path)

['/usr/lib/python312.zip', '/usr/lib/python3.12', '/usr/lib/python3.12/lib-dynload', '', '/usr/local/lib/python3.12/dist-packages', '/usr/lib/python3/dist-packages']


In [4]:
import pandas as pd
from Bio import SeqIO
import re

def simple_mutation_processor(fasta_file, df, mutation_column):
    """
    Ultra-simple version that just adds mutant sequences
    """
    # Read reference
    ref_record = next(SeqIO.parse(fasta_file, "fasta"))
    ref_seq = str(ref_record.seq)
    
    # Create copy of dataframe
    result_df = df.copy()
    mutant_seqs = []
    
    for i in range(len(result_df)):
        mutation = result_df.iloc[i][mutation_column]
        
        # Skip if mutation is missing
        if pd.isna(mutation):
            mutant_seqs.append(None)
            continue
        
        # Convert to list of mutations
        if isinstance(mutation, list):
            mut_list = mutation
        else:
            mut_list = [mutation]
        
        # Start with reference sequence
        current_seq = list(ref_seq)
        success = True
        
        for mut in mut_list:
            try:
                # Parse mutation
                match = re.match(r'c\.(\d+)([ACGT])>([ACGT])', str(mut))
                if match:
                    pos = int(match.group(1)) - 1
                    ref_base = match.group(2)
                    alt_base = match.group(3)
                    
                    # Check bounds and reference
                    if pos < len(current_seq) and current_seq[pos] == ref_base:
                        current_seq[pos] = alt_base
                    else:
                        success = False
                        break
                else:
                    success = False
                    break
            except:
                success = False
                break
        
        if success:
            mutant_seqs.append(''.join(current_seq))
        else:
            mutant_seqs.append(None)
    
    result_df['mutant_sequence'] = mutant_seqs
    return result_df

# Usage:
# result_df = simple_mutation_processor('your_gene.fasta', your_df, 'mutation')

In [5]:
your_df = pd.read_csv('fbn_snp.csv')
result_df = simple_mutation_processor('NM_000138.5.fasta', your_df, 'mutation_1')

In [9]:
result_df['mutant_sequence'].nunique()

1110

In [11]:
result_df = result_df.dropna(subset = 'mutant_sequence')

In [13]:
len(result_df)

1110

In [14]:
result_df.to_csv('library.csv')