### 数据读取

In [None]:
import pandas as pd
import numpy as np
import os
from datasets import Dataset, DatasetDict 
from collections import Counter


file="/home/u20111010010/Project/DNA-Pretraining/Level1/001.Genomics_dataset/Dataset_HERV/VCF_hprc-1000G/Train_Test/data_all_model_HERV-Classification_Need.fa"
df1=pd.read_csv(file,sep="\t",header=None).rename(columns = {0: "dset", 1: "multi",2:"binary", 3: "seq",4:"Type",5: "detail"})
df = df1.loc[:, ['dset', 'multi','seq']]
df = df[df['dset'] == 'test']

print("+++++++++++++++++++++++++++++Test sets")
print(df.shape)


### label间转换
id2label={"0":"Non-HERV_Coding","1":"HERV_Coding","2":"Non-HERV_Non-Coding","3":"HERV_Non-Coding"}
labels_raw=list(df['multi'])
labels = [id2label[str(i)] for i in labels_raw]

print(Counter(labels))

sequences=list(df['seq'])

In [None]:
dataset_name="BERT_HERV_Multi_RUN0"
output_dir="/home/u20111010010/Project/DNA-Pretraining/Level1/003.Sequence_Visualization/Dataset_HERV"

### 模型学习到的Attention矩阵

In [None]:
def process_scores(attention_scores, kmer):
    # 初始化两个全零数组，用于存储处理后的注意力分数
    scores = np.zeros([attention_scores.shape[0], attention_scores.shape[-1]])
    unnorm = np.zeros([attention_scores.shape[0], attention_scores.shape[-1]])

    # 遍历每个样本的注意力分数
    # attention_scores: (batch_size, num_heads, seq_len, seq_len)
    for index, attention_score in enumerate(attention_scores):
        # 初始化一个空列表来存储每个位置的总分数
        attn_score = []
        # 从第二个位置开始遍历到最后一个位置
        for i in range(1, attention_score.shape[-1] - kmer + 2):
            # 对每个位置，累加所有头的分数，并将结果添加到 attn_score 列表中
            attn_score.append(float(attention_score[:, 0, i].sum()))

        # 如果后一个分数为0，将前一个分数设置为0，并退出循环
        for i in range(len(attn_score) - 1):
            if attn_score[i + 1] == 0:
                attn_score[i] = 0
                break

        # 初始化两个数组，一个用于计数，另一个用于存储真实的分数
        counts = np.zeros([len(attn_score) + kmer - 1])
        real_scores = np.zeros([len(attn_score) + kmer - 1])
        # 遍历 attn_score，并更新 counts 和 real_scores
        for i, score in enumerate(attn_score):
            for j in range(kmer):
                counts[i + j] += 1.0
                real_scores[i + j] += score
        # 计算真实的平均分数
        real_scores = real_scores / counts
        # 存储未归一化的分数
        unnorm[index] = real_scores
        # 对分数进行L2归一化
        real_scores = real_scores / np.linalg.norm(real_scores)
        # 存储归一化后的分数
        scores[index] = real_scores
        
    return scores, unnorm

In [None]:
def process_multi_score(attention_scores, kmer):
    # 初始化三维全零数组，用于存储每个样本、每个头的处理后的注意力分数
    scores = np.zeros(
        [
            attention_scores.shape[0],
            attention_scores.shape[1],
            attention_scores.shape[-1],
        ]
    )

    # 遍历每个样本的注意力分数
    # attention_scores: (batch_size, num_heads, seq_len, seq_len)
    for index, attention_score in enumerate(attention_scores):
        # 初始化二维数组，用于存储每个头的处理后的注意力分数
        head_scores = np.zeros([attention_scores.shape[1], attention_scores.shape[-1]])
        
        # 遍历每个注意力头
        for head in range(0, len(attention_score)):
            attn_score = []

            # 从第二个位置开始遍历到最后一个位置，计算该头的每个位置的总分数
            for i in range(1, attention_score.shape[-1] - kmer + 2):
                attn_score.append(float(attention_score[head, 0, i]))

            # 如果后一个分数为0，将前一个分数设置为0，并退出循环
            for i in range(len(attn_score) - 1):
                if attn_score[i + 1] == 0:
                    attn_score[i] = 0
                    break

            # 初始化两个数组，一个用于计数，另一个用于存储真实的分数
            counts = np.zeros([len(attn_score) + kmer - 1])
            real_scores = np.zeros([len(attn_score) + kmer - 1])
            
            # 遍历 attn_score，并更新 counts 和 real_scores
            for i, score in enumerate(attn_score):
                for j in range(kmer):
                    counts[i + j] += 1.0
                    real_scores[i + j] += score
            
            # 计算真实的平均分数
            real_scores = real_scores / counts
            # 对分数进行L2归一化
            real_scores = real_scores / np.linalg.norm(real_scores)
            
            # 存储该头的处理后的分数
            head_scores[head] = real_scores

        # 存储该样本的每个头的处理后的分数
        scores[index] = head_scores
        
    return scores

In [None]:
from transformers import AutoModel, AutoTokenizer
import torch


# 检查是否有可用的GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 初始化模型和分词器，并将模型移动到GPU上
model_name = "/home/u20111010010/Project/DNA-Pretraining/Level1/002.Model_Classification/Dataset_HERV/Model/BERT_HERV_Multi_RUN0"
model = AutoModel.from_pretrained(model_name).to(device)
tokenizer = AutoTokenizer.from_pretrained(model_name)

# 6-mers编码
def kmers(s, k=6):
    return [s[i:i + k] for i in range(0, len(s)-k+1)]

def transform_seq(seq):
    return " ".join(kmers(seq)[:300] + kmers(seq)[-212:])

def pad_to_length(arr, target_length, axis=-1):
    """
    用0填充数组以达到目标长度。
    """
    padding_shape = list(arr.shape)
    padding_shape[axis] = target_length - arr.shape[axis]
    padding = np.zeros(padding_shape, dtype=arr.dtype)
    return np.concatenate([arr, padding], axis=axis)

# 特征提取
def extract_attentions(model, tokenizer, sequences, batch_size=32):
    """
    使用给定的模型和分词器从序列中提取特征，分批进行预测。
    :param model: 使用的模型
    :param tokenizer: 使用的分词器
    :param sequences: 序列列表
    :param batch_size: 每个批次的序列数量
    :return: 
    """
    score_len=512
    num_sequences = len(sequences)

    single_attentions = np.zeros((num_sequences, score_len))
    unnorm_attentions = np.zeros((num_sequences, score_len))
    #pred_results = np.zeros((len(sequences), num_labels))
    multi_attentions = np.zeros((num_sequences, 12, score_len))


    for i in range(0,num_sequences, batch_size):  # 每batch_size个序列进行一次预测

        with torch.no_grad():

            # 获取实际批量大小
            actual_batch_size = min(batch_size, num_sequences - i)
            batch_sequences = sequences[i:i+actual_batch_size]

            # 批量编码
            transformed_reads = [transform_seq(seq) for seq in batch_sequences]
            inputs = tokenizer(transformed_reads, return_tensors="pt", truncation=True, padding=True).to(device)
            outputs = model(**inputs,output_attentions=True)

            # 保存注意力分数,最后一层的注意力矩阵。这个注意力矩阵是与最后一层的隐藏层相对应的。这通常是模型中最有信息量的层，因为它捕获了所有前面层的信息
            out_attns = (outputs.attentions[-1]).cpu().numpy()
            #out_attns = outputs["attentions"].cpu().numpy()
            
            single_attn, unnormed_attn = process_scores(out_attns,kmer=6)
            multi_attn = process_multi_score(out_attns,kmer=6)

            #  确保数据具有正确的形状
            single_attn = pad_to_length(single_attn, score_len)
            unnormed_attn = pad_to_length(unnormed_attn, score_len)
            multi_attn = pad_to_length(multi_attn, score_len, axis=-1)

            # 注意确保这里的形状匹配
            single_attentions[i:i + actual_batch_size, :] = single_attn[:actual_batch_size, :]
            unnorm_attentions[i:i + actual_batch_size, :] = unnormed_attn[:actual_batch_size, :]
            multi_attentions[i:i + actual_batch_size, :, :] = multi_attn[:actual_batch_size, :, :]
     
    return single_attentions,unnorm_attentions,multi_attentions


### 行数等于输入序列的数量,列数等于模型的last_hidden_state中的特征数量(768)
single,unnorm,multi = extract_attentions(model, tokenizer, sequences)
np.save(os.path.join(output_dir, "BERT_HERV_Multi_RUN0_extract_single_attentions.npy"), single)
np.save(os.path.join(output_dir, "BERT_HERV_Multi_RUN0_extract_unnorm_attentions.npy"), unnorm)
np.save(os.path.join(output_dir, "BERT_HERV_Multi_RUN0_extract_multi_attentions.npy"), multi)