# 使用训练的模型生成序列

In [None]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = "3"
import torch
import pandas as pd
from torch.utils.data import Dataset
from peft import PeftModel
from transformers import AutoTokenizer, AutoModelForCausalLM
from tokenizers import Tokenizer
import gc

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

tokenizer = Tokenizer.from_file("your path/tokenizer.json")
BOS_TOKEN_ID = tokenizer.token_to_id("<|bos|>") 
EOS_TOKEN_ID = tokenizer.token_to_id("<|eos|>")
PAD_TOKEN_ID = tokenizer.token_to_id("<|pad|>") 

CONTEXT = "<|bos|>"  # 使用抗菌肽控制符

GENERATION_PARAMS = {
    "max_length": 50,  # 最长生成长度
    "do_sample": True,  # 是否使用采样生成
    "top_p": 0.9,       # nucleus sampling 参数
    "temperature": 1.2,   # 温度参数
    "pad_token_id": PAD_TOKEN_ID,  # 填充 token ID
    "eos_token_id": EOS_TOKEN_ID   # 结束 token ID
}
def cleaned_sequence(seqs):
    VALID_AMINO_ACIDS = set("ACDEFGHIKLMNPQRSTVWY")
    s = []
    for seq in seqs:
        cleaned = ''.join(char for char in seq.upper() if char in VALID_AMINO_ACIDS)
        s.append(cleaned)
    return s
def main():
    # 加载基础模型
    base_model = AutoModelForCausalLM.from_pretrained("hugohrban/progen2-large", trust_remote_code=True).to(DEVICE)
        
    # 加载 LoRA 适配器
    model = PeftModel.from_pretrained(
        base_model,
        "your path/best_model"
    ).to(DEVICE)
    
    # 编码输入
    input_ids = torch.tensor(
        tokenizer.encode(CONTEXT).ids,
        device=DEVICE
    ).unsqueeze(0)

    clean_sequences = []
    batch_size = 100  # 每次生成的序列数量
    total_sequences = 5000  # 总共生成的序列数量
    num_batches = total_sequences // batch_size  # 计算批次

    for batch_index in range(num_batches):
        with torch.no_grad():  # 禁用梯度计算
            outputs = model.generate(
                input_ids=input_ids,
                num_return_sequences=batch_size,  # 当前批次生成数量
                repetition_penalty=1.2,  # 惩罚重复token
                no_repeat_ngram_size=2,  # 避免重复的n-gram
                **GENERATION_PARAMS
            )
        # print(outputs)
        
        for output in outputs:
            tokens = output.cpu().numpy().tolist()
            # 查找 EOS 的位置，若不存在则取全部
            eos_pos = tokens.index(EOS_TOKEN_ID) if EOS_TOKEN_ID in tokens else len(tokens)
            # 移除 BOS 和 EOS，保留中间 token
            valid_tokens = tokens[1: eos_pos]  # 去除 BOS
            if not valid_tokens:  # 跳过空序列
                continue
            # 转换为字符串并移除 PAD
            seq = tokenizer.decode(valid_tokens).replace(str(PAD_TOKEN_ID), "")
            clean_sequences.append(seq)
        
        # 清理显存
        del outputs
        gc.collect()
        torch.cuda.empty_cache()

        print(f"Batch {batch_index + 1}/{num_batches} completed.")
    rs = cleaned_sequence(clean_sequences)
    # print(rs)
    # 保存生成的序列到 CSV 文件
    df = pd.DataFrame({'Sequence': rs})
    df.to_csv('your path/sequence.csv', index=False)

if __name__ == "__main__":
    main()

# 评估生成的序列的AMP、MIC、Toxic、Perplexity

In [None]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = "2"
from transformers import AutoModelForCausalLM, AutoTokenizer
import sys
import torch
from tqdm import tqdm
import pandas as pd
from glob import glob
from amp.utils import basic_model_serializer
import amp.data_utils.sequence as du_sequence
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..')))
from toxinpred3.toxic import ToxinPred3
from peft import PeftModel
import torch.nn as nn
from torch.nn.parallel import DataParallel

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# 加载基础模型
#加载评估perplexity模型
model = AutoModelForCausalLM.from_pretrained("hugohrban/progen2-large", trust_remote_code=True).to(device)
    
tokenizer = AutoTokenizer.from_pretrained("hugohrban/progen2-large", trust_remote_code=True)

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

# 初始化预测器 加载打分模型
# 加载毒性模型
toxic_predictor = ToxinPred3(threshold=0.5, model=1)

# 加载AMP、MIC评估模型
bms = basic_model_serializer.BasicModelSerializer()
amp_classifier = bms.load_model('/geniusland/home/wanglijuan/sci_proj/models/amp_classifier')
amp_classifier_model = amp_classifier()
mic_classifier = bms.load_model('/geniusland/home/wanglijuan/sci_proj/models/mic_classifier/')
mic_classifier_model = mic_classifier()


df = pd.read_csv('your path/sequence.csv')
seqs = df['Sequence'].tolist()
pad_seq = du_sequence.pad(du_sequence.to_one_hot(seqs))
pred_amp = amp_classifier_model.predict(pad_seq)
pred_mic = mic_classifier_model.predict(pad_seq)
amp_list = []
mic_list = []
toxic_list = []
perplexity_list = []
for sequence in tqdm(seqs):
        input_ids = tokenizer.encode(sequence, return_tensors="pt", padding=True, truncation=True, max_length=64).to(device)
        
        with torch.no_grad():
            outputs = model(input_ids, labels=input_ids)
            logits = outputs.logits
            loss = outputs.loss
            perplexity = torch.exp(loss)
            
            perplexity_list.append(perplexity.item()/100)
for s in tqdm(seqs):
    r_tox = toxic_predictor.predict_sequence(s)
    toxic_list.append(r_tox['ML Score'])
for i in range(len(seqs)):
    amp_list.append(pred_amp[i][0])
    mic_list.append(pred_mic[i][0])

data = {
    'Sequence': seqs,
    'AMP': amp_list,
    'MIC': mic_list,
    'Toxic': toxic_list,
    'Perplexity': perplexity_list
}
# print(data)
df = pd.DataFrame(data)
df.to_csv('your path/sequence.csv', index=False)

# 成功率计算

In [None]:
import os
import pandas as pd

# 定义文件夹路径和输出文件路径
folder_path = "your path"  # 替换为你的文件夹路径
output_file = "your path/success_rate_results.csv"  # 输出文件名

# 获取文件夹中所有 CSV 文件
csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]

# 定义一个函数来计算成功率
def calculate_success_rate(data):
    # 筛选满足条件的序列
    successful_sequences = data[
        (data['AMP'] > 0.8) & 
        (data['MIC'] > 0.8) & 
        (data['Toxic'] < 0.38)
    ]
    # 计算成功率
    success_rate = len(successful_sequences) / len(data) if len(data) > 0 else 0
    return success_rate

# 初始化结果字典
results = {}

# 遍历每个 CSV 文件并计算成功率
for csv_file in csv_files:
    # 读取文件
    file_path = os.path.join(folder_path, csv_file)
    data = pd.read_csv(file_path)
    
    # 确保包含 AMP, MIC, Toxic 列
    if all(col in data.columns for col in ['AMP', 'MIC', 'Toxic']):
        # 计算成功率
        success_rate = calculate_success_rate(data)
    else:
        print(f"文件 {csv_file} 缺少 AMP, MIC 或 Toxic 列，跳过。")
        success_rate = None  # 如果缺少列，则标记为 None
    
    # 保存结果（列名为文件名去掉路径和后缀）
    column_name = os.path.splitext(csv_file)[0]
    results[column_name] = [success_rate]

# 将结果保存到 CSV 文件
results_df = pd.DataFrame(results)
results_df.to_csv(os.path.join(folder_path, output_file), index=False)

# Diversity 计算

In [None]:
import os
import pandas as pd
from Levenshtein import distance as levenshtein_distance

# 定义文件夹路径
folder_path = "your path"  # 替换为你的文件夹路径
output_file = "your path/edit_distance.csv"  # 输出文件名

# 获取文件夹中所有 CSV 文件
csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]

# 找到原始数据集文件（假设命名为 "original.csv"）
original_file = [f for f in csv_files if 'src' in f.lower()]
if not original_file:
    raise FileNotFoundError("未找到原始数据集文件（文件名中包含 'src' 的文件）")
original_file = original_file[0]

# 读取原始数据集
original_data = pd.read_csv(os.path.join(folder_path, original_file))
original_sequences = original_data.iloc[:, 0].astype(str).tolist()  # 假设原始数据集的序列在第一列

# 初始化结果字典
results_df = pd.DataFrame()

# 遍历其他 CSV 文件
for csv_file in csv_files:
    if csv_file == original_file:
        continue  # 跳过原始数据集文件

    # 读取当前文件
    file_path = os.path.join(folder_path, csv_file)
    current_data = pd.read_csv(file_path)
    current_sequences = current_data.iloc[:, 0].astype(str).tolist()  # 假设序列在第一列

    # 计算每条序列的编辑距离（与原始数据集中最小的编辑距离）
    min_distances = []
    for seq in current_sequences:
        distances = [levenshtein_distance(seq, orig_seq) for orig_seq in original_sequences]
        min_distances.append(min(distances))

    # 保存结果（列名为文件名去掉路径和后缀）
    column_name = os.path.splitext(os.path.basename(csv_file))[0]
    results_df[column_name] = pd.Series(min_distances)

# 将结果保存到 CSV 文件
# results_df = pd.DataFrame(results)
results_df.to_csv(os.path.join(folder_path, output_file), index=False)
