### **Before using this notebook, you need to run the `STEP1_Relaxation.ipynb` first.**

In [1]:
import os
import csv

from pyrosetta import init, pose_from_pdb
from pyrosetta.rosetta.core.scoring import get_score_function
from pyrosetta.rosetta.core.pack.task import TaskFactory, operation
from pyrosetta.rosetta.core.select.residue_selector import ResidueIndexSelector, NeighborhoodResidueSelector, NotResidueSelector
from pyrosetta.rosetta.protocols.minimization_packing import PackRotamersMover

In [None]:
def parse_mutation_file(file_path):
    """
    Parses a mutation file where each line contains: line_number mutation[ mutation ...]
    Returns:
        List of (line_number, mutation_dict) where mutation_dict maps PDB position to target AA.
    """
    mutations = []
    with open(file_path, 'r') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            parts = line.split()
            if len(parts) < 2:
                print(f"⚠️ Skipping malformed line: {line}")
                continue
            line_number = parts[0]
            mutation_str = parts[1]
            mut_dict = {}
            for mut in mutation_str.split(' '):
                wt = mut[0]
                aa = mut[-1]
                pos = int(mut[1:-1])
                mut_dict[pos] = aa
            mutations.append((line_number, mut_dict))
    return mutations


In [3]:
def pack(pose, posi, amino, scorefxn):
    mut_posi = ResidueIndexSelector()
    mut_posi.set_index(posi)

    nbr_selector = NeighborhoodResidueSelector()
    nbr_selector.set_focus_selector(mut_posi)
    nbr_selector.set_include_focus_in_subset(True)

    not_design = NotResidueSelector(mut_posi)

    tf = TaskFactory()
    tf.push_back(operation.InitializeFromCommandline())
    tf.push_back(operation.IncludeCurrent())
    tf.push_back(operation.NoRepackDisulfides())

    tf.push_back(operation.OperateOnResidueSubset(operation.PreventRepackingRLT(), nbr_selector, True))
    tf.push_back(operation.OperateOnResidueSubset(operation.RestrictToRepackingRLT(), not_design))

    aa_to_design = operation.RestrictAbsentCanonicalAASRLT()
    aa_to_design.aas_to_keep(amino)
    tf.push_back(operation.OperateOnResidueSubset(aa_to_design, mut_posi))

    packer = PackRotamersMover(scorefxn)
    packer.task_factory(tf)
    packer.apply(pose)

In [None]:
def run_mutations_from_file(pdb_path, mutation_file, output_dir, chain_id="A"):
    """
    Reads mutation instructions from a file and applies them to a PDB structure.

    Args:
        pdb_path (str): Path to the input PDB file.
        mutation_file (str): Path to the mutation file.
        output_dir (str): Directory to save the mutated PDB files.
        chain_id (str): Chain identifier used in the PDB file.
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    pose = pose_from_pdb(pdb_path)
    scorefxn = get_score_function()
    results = []

    # Parse mutation file with line number and mutations
    mutations = []
    with open(mutation_file, 'r') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            parts = line.split()
            if len(parts) < 2:
                print(f"⚠️ Skipping malformed line: {line}")
                continue
            line_number = parts[0]
            mutation_str = parts[1]
            mut_dict = {}
            for mut in mutation_str.split(','):
                wt = mut[0]
                aa = mut[-1]
                try:
                    pos = int(mut[1:-1])
                    mut_dict[pos] = aa
                except ValueError:
                    print(f"⚠️ Invalid mutation format: {mut} in line {line}")
                    mut_dict = {}
                    break
            if mut_dict:
                mutations.append((line_number, mut_dict))

    for line_number, mut_dict in mutations:
        test_pose = pose.clone()
        success = True
        for pdb_res, target_aa in mut_dict.items():
            pose_res = test_pose.pdb_info().pdb2pose(chain_id, pdb_res)
            if pose_res <= 0:
                # Check the ⚠️ in the output. If your PDB file has missing residues, they will be skipped. Not a big deal, but it's better to double check.
                # We can do nothing for those missing residues. Because they are not absent from raw PDB files.
                # For me, there are several residues missing on cpmApple.
                print(f"⚠️ Skipping {line_number}: Cannot find residue {chain_id}:{pdb_res} in pose.") 
                success = False
                break
            pack(test_pose, pose_res, target_aa, scorefxn)

        if success:
            energy = scorefxn(test_pose)
            out_pdb = os.path.join(output_dir, f"{line_number}.pdb")
            test_pose.dump_pdb(out_pdb)
            results.append((line_number, energy))
            print(f"✅ Line {line_number}: Energy = {energy:.2f}")

    # Save all energy results to CSV
    csv_file = os.path.join(output_dir, "mutation_energy_results.csv")
    with open(csv_file, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["LineNumber", "Energy"])
        writer.writerows(results)


In [None]:

def main():
    # Initialize PyRosetta
    init()

    # Path to your relaxed PDB file
    pdb_path = "/home/shuyu/Development/PyRosetta/prepared_structures/4I2Y_relaxed_chain_A.pdb" 
    # Path to your mutation list file, it should contain lines like: 1 A123A A124G， it should be a .txt file
    # This notebook suppports multiple mutations, like: 1 A123A A124G. Use a space to separate them.
    # The first column is the index number, the final output file will be named by this number.
    mutation_file = "/home/shuyu/Development/PyRosetta/variants_list.txt" 
    # Output directory for mutated structures
    output_dir = "/home/shuyu/Development/PyRosetta/mutated_structures"
    # Chain ID in the PDB file, it dependes on you, if you have a multi-chain PDB file, you can specify the chain ID here
    run_mutations_from_file(pdb_path, mutation_file, output_dir, chain_id="A")


if __name__ == "__main__":
    main()

┌──────────────────────────────────────────────────────────────────────────────┐
│                                 PyRosetta-4                                  │
│              Created in JHU by Sergey Lyskov and PyRosetta Team              │
│              (C) Copyright Rosetta Commons Member Institutions               │
│                                                                              │
│ NOTE: USE OF PyRosetta FOR COMMERCIAL PURPOSES REQUIRE PURCHASE OF A LICENSE │
│         See LICENSE.PyRosetta.md or email license@uw.edu for details         │
└──────────────────────────────────────────────────────────────────────────────┘
PyRosetta-4 2025 [Rosetta PyRosetta4.conda.ubuntu.cxx11thread.serialization.Ubuntu.python313.Release 2025.06+release.029c6a159b896477003a14f78f472d4cd2cead46 2025-02-04T15:14:13] retrieved from: http://www.pyrosetta.org
core.init: Checking for fconfig files in pwd and ./rosetta/flags
core.init: Rosetta version: PyRosetta4.conda.ubuntu.cxx11thread.ser

core.chemical.GlobalResidueTypeSet: Finished initializing fa_standard residue type set.  Created 985 residue types
core.chemical.GlobalResidueTypeSet: Total time to initialize 1.06285 seconds.
core.import_pose.import_pose: File '/home/shuyu/Development/PyRosetta/prepared_structures/4I2Y_relaxed_chain_A.pdb' automatically determined to be of type PDB
core.scoring.ScoreFunctionFactory: SCOREFUNCTION: ref2015
core.scoring.etable: Starting energy table calculation
core.scoring.etable: smooth_etable: changing atr/rep split to bottom of energy well
core.scoring.etable: smooth_etable: spline smoothing lj etables (maxdis = 6)
core.scoring.etable: smooth_etable: spline smoothing solvation etables (max_dis = 6)
core.scoring.etable: Finished calculating energy tables.
basic.io.database: Database file opened: scoring/score_functions/hbonds/ref2015_params/HBPoly1D.csv
basic.io.database: Database file opened: scoring/score_functions/hbonds/ref2015_params/HBFadeIntervals.csv
basic.io.database: Databa