In [None]:
import subprocess
import os
import random
import re
import itertools
import pandas as pd

In [None]:
path_to_mpnn_repository = ""

In [None]:

# Function to diversify Specified Contigs
def expand_list(lists_of_lists, range_expand, sample_size, use_expanded=True, prefix_limit=120, sample=False):
    def expand_sequence(sequence, range_expand):
        expanded_sequence = set(sequence)  # Use a set to avoid duplicates
        for item in sequence:
            match = re.match(r"([A-Z])(\d+)", item)  # Parse 
            if match:
                prefix, num = match.groups()
                num = int(num)
                # Expand within range
                for i in range(1, range_expand + 1):
                    # Expand upwards
                    if num + i <= prefix_limit:
                        expanded_sequence.add(f"{prefix}{num + i}")
                    # Expand downwards
                    if num - i > 0:  # Assuming numbers > 0 are valid
                        expanded_sequence.add(f"{prefix}{num - i}")
        return list(expanded_sequence)

    # Expand the lists based on the range_expand and limit conditions
    expanded_lists = [expand_sequence(sub_list, range_expand) for sub_list in lists_of_lists]

    # If use_expanded is True, sample from the expanded lists, otherwise sample from original
    if sample:
        sampled_lists = []
        for i, sub_list in enumerate(expanded_lists if use_expanded else lists_of_lists):
            if sample_size > 0:
                sampled_lists.append(random.sample(sub_list, min(sample_size, len(sub_list))))
            else:
                sampled_lists.append(sub_list)
    
        # Join all lists into one list and return the joined string
        joined_list = ' '.join([item for sublist in sampled_lists for item in sublist])
    else:
        joined_list = ' '.join([item for sublist in expanded_lists for item in sublist])
    
    return joined_list

# Test usage:
lists_of_lists = [["B99", "B100", "B101"], ["D105", "D106", "D107"], ["D105", "D106", "D107"]]
result = expand_list(lists_of_lists, range_expand=5, sample_size=3, use_expanded=True, sample=True)
print(result)

In [None]:
expand_list(lists_of_lists, range_expand=7, sample_size=3, use_expanded=True, sample=False)  # test again

In [None]:
def nanobody_sample(t, sc_context, pdb_path,chain_parse, fixed = None, model=None, extra=None):
    base_name = os.path.splitext(os.path.basename(pdb_path))[0]

    # Output and save path modifications included design params and unique identifiers for running in a loop
    if fixed:
        output = f"fixed_cdr_t_{t}"
    else:
        output = f"t_{t}"



    command = [
        'python', f'/{path_to_mpnn_repository}/LigandMPNN/run.py', 
        '--pdb_path', f'{pdb_path}',        
        '--batch_size', str(1),#str(1024), # increase number of redesigns       
        '--number_of_batches', str(1), #str(200),
        '--temperature', str(t),
        '--parse_these_chains_only', chain_parse]
    if fixed:
        command = command + ['--redesigned_residues', fixed]  # redesign the selected residues
        print(fixed)

    if sc_context:
        command = command + ['--ligand_mpnn_use_side_chain_context' ,str(1) , '--model_type', "ligand_mpnn"]
        output = "sc_ligand_mpnn_" + output
    if model:
        command = command + ['--model_type', model]
        output = f"{model}_" + output
    if extra:
        output = extra + output
    save_path = os.path.join(base_name+"_test", output+"_round_4")
    command = command + ['--out_folder', f"mpnn_generated/{save_path}"]
    print(command)
    if os.path.exists(save_path):  #if interupted dont regenerate
        return
    else:
        result = subprocess.run(command, capture_output=True, text=True)
        print("Output:", result.stdout)
        print("Error:", result.stderr)
# Variable regions
fixed_res_4krl = [["B27","B28","B29","B30","B31","B32","B34","B35"],["B50","B51","B52","B53","B54","B55","B56","B57"],["B99","B100","B101","B102","B103","B104","B105","B106","B107","B108","B109","B110","B111","B112","B113"]]

# Potentially Fixed regions
fw_4krl = [[f"B{i+1}" for i in range(26)], [f"B{i+1}" for i in range(35, 49)],
           [f"B{i+1}" for i in range(57, 98)], [f"B{i+1}" for i in range(113, 122)]]



def generate_combinations(lists_3, lists_4):
    # Ensure that lists_3 has exactly 3 sublists and lists_4 has exactly 4 sublists
    # CDRs for lists_3 and Frameworks for lists_4
    assert len(lists_3) == 3, "should have 3 sublists"
    assert len(lists_4) == 4, "should have 4 sublists"

    # Fixed sublists from lists_3 (positions 0 and 2)
    fixed_sublists = [lists_3[0], lists_3[2]]
    
    # Collect all possible combinations of the middle sublist (position 1 of lists_3) and all sublists from lists_4
    all_optional_sublists = lists_3[1:2] + lists_4  # Middle sublist (index 1) + all sublists from lists_4
    
    # Get all subsets (including the empty set) of the optional sublists
    all_combinations_of_optional = []
    for r in range(len(all_optional_sublists) + 1):
        all_combinations_of_optional.extend(itertools.combinations(all_optional_sublists, r))
    
    # Initialize a list to store the final combinations
    all_combinations = []
    
    # For each combination of optional sublists, combine it with the fixed sublists
    for optional_subset in all_combinations_of_optional:
        combination = fixed_sublists + list(optional_subset)
        
        # Create a compact identifier based on the included sublists
        included_lists = [f"L3_0", f"L3_2"]  # Always include these
        for i, sublist in enumerate(optional_subset):
            if sublist in lists_3[1:2]:
                included_lists.append(f"L3_1")
            else:
                index_in_l4 = lists_4.index(sublist)
                included_lists.append(f"L4_{index_in_l4}")
        
        identifier = "_".join(included_lists)
        
        # Append the combination and its identifier to the result list
        all_combinations.append((identifier, combination))
    
    return all_combinations


combinations = generate_combinations(fixed_res_4krl, fw_4krl)  # generate combinations
pdb_path_4krl = "4krl.pdb"


In [None]:
# Iterate through all design parameters 
for pdb_path, fixed_res in zip([pdb_path_4krl], [fixed_res_4krl]):
    for temp in [2,3]:
        for unique, combo in combinations + [('', False)]:
            for fixed in [True, False]:
                for sc in [False]:
                    for model in ["soluble_mpnn", "ligand_mpnn"]:
                        if combo == False:
                            if fixed:
                                for expand in [0, 3]:
                                    if expand == 0:
                                        expand_bool=False
                                    else:
                                        expand_bool=True
                                    for sample in [0, 5]:
                                        if sample == 0:
                                            fixed_res2 = expand_list(fixed_res, expand, sample, expand_bool, prefix_limit=120, sample=False)
                                        else:
                                            fixed_res2 = expand_list(fixed_res, expand, sample, expand_bool, prefix_limit=120, sample=True)
                                nanobody_sample(temp, sc, pdb_path,"B", fixed = fixed_res2,model=model, extra=f"expand_{expand}_sample_{sample}_")
                            else:  
                                nanobody_sample(temp, sc, pdb_path, "B", model=model)
                        else:
                            if fixed:
                                
                                for expand in [0, 3]:
                                    if expand == 0:
                                        expand_bool=False
                                    else:
                                        expand_bool=True
                                    for sample in [0, 5]:
                                        if sample == 0:
                                            fixed_res2 = expand_list(combo, expand, sample, expand_bool, prefix_limit=120, sample=False)
                                        else:
                                            fixed_res2 = expand_list(combo, expand, sample, expand_bool, prefix_limit=120, sample=True)
                                nanobody_sample(temp, sc, pdb_path,"B", fixed = fixed_res2,model=model, extra=f"expand_{expand}_sample_{sample}_combo_{unique}_")

                            

In [None]:

# Function to parse a the results into a df
def parse_fasta(file_path):
    try:
        with open(file_path, 'r') as file:
            lines = file.readlines()
        
        # Skip the first two lines
        lines = lines[2:]
        
        data = []
        for i in range(0, len(lines), 2):  # Process every pair of lines
            header = lines[i].strip()
            sequence = lines[i + 1].strip()
            
            # Extract information from header
            header_info = {}
            header_parts = header.split(',')
            header_info['name'] = header_parts[0].replace('>', '').strip()
            header_info['id'] = header_parts[1].split('=')[1].strip()
            header_info['T'] = float(header_parts[2].split('=')[1].strip())
            header_info['seed'] = header_parts[3].split('=')[1].strip()
            header_info['overall_confidence'] = float(header_parts[4].split('=')[1].strip())
            header_info['ligand_confidence'] = float(header_parts[5].split('=')[1].strip())
            header_info['seq_rec'] = float(header_parts[6].split('=')[1].strip())
            header_info['sequence'] = sequence
            
            data.append(header_info)

        return data

    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None

In [None]:
dataframes = []

In [None]:
for root, dirs, files in os.walk("mpnn_generated/bioml"):
    for file in files:
        if file.endswith('.fa'):
            file_path = os.path.join(root, file)
            if'.ipynb' not in file_path:
                print(f"Processing {file_path}")
                fasta_data = parse_fasta(file_path)
                
                # Convert to DataFrame and append to list
                df = pd.DataFrame.from_records(fasta_data)
                df["file_path"] = file_path
                dataframes.append(df)


#all_data = pd.concat(dataframes, ignore_index=True)
# Save to CSV
#all_data.to_csv(f'{base_name}_generated.csv', index=False)


In [None]:
results = []
for df in dataframes:
    df = df[df["seq_rec"]>=0.60]  # filter by a sequence recovery threshold
    df = df.sort_values(by=["overall_confidence", "seq_rec"], ascending=False)  # sort by metrics
    df = df.drop_duplicates(subset="sequence")  # drop duplicates
    if not df.empty:
        results.append(df)


In [None]:
results = pd.concat(results)
results = results.sort_values(by=["overall_confidence", "seq_rec"], ascending=False)
results = results.drop_duplicates(subset="sequence")  # drop duplicates
results.reset_index(inplace=True, drop=True)

In [None]:
with open("temp.fasta", "w") as f:  # write temporary fasta

    for i, row in results_df2.iterrows():
        f.write(f">{i}\n")
        f.write(f"{row.sequence}\n")


In [None]:
from sadie.renumbering import Renumbering
# We wrap these in a function so we can use multiprocessing
def sadie_run() -> pd.DataFrame:
    # setup API  object
    renumbering_api = Renumbering(scheme="chothia", region_assign="imgt", run_multiproc=True)

    # run the renumbering on a file
    numbering_table = renumbering_api.run_file(fasta)

    return numbering_table


In [None]:
sadie_results = sadie_run()

In [None]:
results = results.merge(sadie_results, on ="sequence", how="left")
results = results.sort_values(by=["overall_confidence", "seq_rec"], ascending=False)

In [None]:
results =results.drop_duplicates(subset="cdr3_aa_no_gaps") # Filter on CDR1 and CDR3 as they are the parent contacts to the antigen
results =results.drop_duplicates(subset="cdr1_aa_no_gaps")

In [None]:
# Clean the columns and names
results["method"] = ["soluble_mpnn" if "soluble" in i else i for i in results["file_path"]]
results["method"] = ["ligand_mpnn" if "ligand" in i else i for i in results["method"]]
results["method"] = ["protein_mpnn" if "protein" in i else i for i in results["method"]]
results=results.rename(columns={"name": "parent"})
results["parameters"] = [i.split("/")[-3].replace(".fa", "") for i in results["file_path"]]

In [None]:
results["method"].value_counts()

In [None]:
results.to_csv("mpnn_strict_filtered.csv", index=False)