# 需要下载的文件： 

- dnabert fine-tuned model

    https://drive.google.com/file/d/1BJjqb5Dl2lNMg2warsFQ0-Xvn1xxfFXC/view

    ```
    unzip 6-new-12w-0.zip
    ```

# 使用的镜像

    pytorch_dnabert

# 如果报错transformer就阅读DNABERT的readme，安装相关的包

In [None]:
import os
import argparse
import numpy as np
import pandas as pd
from os.path import join
from torch.utils.data import DataLoader
from transformers import BertTokenizer, BertModel, DNATokenizer

# pip install transformers -U

# hyperParams functions

In [None]:
class hyperParameter:
    def __init__(self):
        self.identity_level = 0.8
        self.shuffle_times = 1
        self.root_dir = '/lizutan/code/MCANet/dataset'#"/tangzehua/tangzehua/RecCrossNet/dataset"
        # self.dataset_path = "{}/pipeline_7/identity_{}_shuffle_{}".format(self.root_dir, self.identity_level, self.shuffle_times)
        self.dataset_path = "{}/pipeline_10/rec_{}_shuffle_{}".format(self.root_dir, self.identity_level, self.shuffle_times)
        self.dnabert_embed_out_dir = "/lizutan/code/MCANet/preprocess/dnabert_embedding/pipeline_10/rec_{}_shuffle_{}".format(self.identity_level, self.shuffle_times)
        self.kmer = 6
        self.model_path = "/lizutan/code/MCANet/preprocess/DNAbert_file/6-new-12w-0"


DefaultArgs = hyperParameter()

if os.path.exists(DefaultArgs.dnabert_embed_out_dir):
    os.rmdir(DefaultArgs.dnabert_embed_out_dir)
    # print('OutputDir is exitsted')
else:
    os.makedirs(DefaultArgs.dnabert_embed_out_dir)
    print('success create dir test')


# helper function

In [5]:
def get_kmer_sentence(original_string, kmer=1, stride=1):
    # 'eqweqweqweqweqeqeqweqweqe'
    # 'e q w e q w e q w e q w e q e q e q w e q w e q e'
    if kmer == -1:
        return original_string
    sentence = ""
    original_string = original_string.replace("\n", "")
    i = 0
    while i <= len(original_string)-kmer:
        sentence += original_string[i:i+kmer] + " "
        i += stride
    return sentence[:-1].strip("\"")
#
def get_dna_fasta_info(input_hyperparam: hyperParameter):
    dataset_table = []
    for filename in os.listdir(input_hyperparam.dataset_path):
        # input_table = pd.read_csv(join(input_hyperparam.dataset_path, filename), sep="\t", header=None)
        input_table = pd.read_csv(join(input_hyperparam.dataset_path, filename), sep="\t")
        # input_table.columns = ['attP', 'attB', 'Rec', 'attP_str', 'attB_str', 'Rec_str', 'label']
        dataset_table.append(input_table)
    dataset_table = pd.concat(dataset_table, axis=0)
    dataset_table.index = list(range(dataset_table.shape[0]))
    dna_fasta_dict = {"attB": {}, "attP": {}}
    for site in dna_fasta_dict.keys():
        for index in dataset_table.index:
            site_index = dataset_table.loc[index, site]
            dna_fasta_dict[site][site_index] = dataset_table.loc[index, "{}_str".format(site)]
    for site, site_string in dna_fasta_dict.items():
        dna_fasta_dict[site] = pd.Series(site_string)
    return dna_fasta_dict


#  Dataset class

In [6]:
class CustomeDataset:
    def __init__(self, data, args: hyperParameter):
        self.data = data
        self.args_param = args
        self.data_index = np.array(data.index)
        self.tokenizer = DNATokenizer.from_pretrained('dna'+str(args.kmer), do_lower_case=False)
    def __len__(self):
        return len(self.data)
    def pad_sequence(self, sequence):
        sequence = sequence[: 50] + max(50 - len(sequence), 0) * "N"
        return sequence
    def __getitem__(self, index):
        data_index = self.data_index[index]
        raw_sequence = self.pad_sequence(self.data[data_index])
        sentence_a = get_kmer_sentence(raw_sequence, self.args_param.kmer)
        inputs = self.tokenizer.encode_plus(sentence_a, 
                                            sentence_b=None, 
                                            return_tensors='pt', 
                                            add_special_tokens=True)
        input_ids = inputs['input_ids'][0]
        return data_index, input_ids

# 根据数据计算DNAbert的嵌入特征

In [7]:
dna_fasta_dict = get_dna_fasta_info(DefaultArgs)

for site, site_data in dna_fasta_dict.items():
    site_dataset = CustomeDataset(site_data, DefaultArgs)
    site_dataloader = DataLoader(site_dataset, 
                                 shuffle=False, 
                                 drop_last=False, 
                                 num_workers=8, 
                                 batch_size=16)
    model = BertModel.from_pretrained(DefaultArgs.model_path, output_hidden_states=True)
    model = model.to("cuda")
    site_dir = join(DefaultArgs.dnabert_embed_out_dir, site)
    if not os.path.exists(site_dir):
        os.makedirs(site_dir)
    processed = 0
    for idx, batch in enumerate(site_dataloader):
        index_list, input_array = batch
        input_array = input_array.to("cuda")
        hidden_states = model(input_array)[-1][-1]
        hidden_states = hidden_states.cpu().data.numpy()
        for array_id, index_name in enumerate(index_list):
            output_file = "{}/{}.npz".format(site_dir, index_name)
            result = dict(
                index = index_name,
                representations = hidden_states[array_id]
            )
            np.savez(output_file, **result)
        processed += len(index_list)
        print("\r{} / {} done!".format(processed, len(site_dataset)), end='')
    print("\n")
    

<class 'transformers.tokenization_dna.DNATokenizer'>
6469 / 6469 done!

<class 'transformers.tokenization_dna.DNATokenizer'>
6469 / 6469 done!



# 重新开始=========================>>>

### 更改可以编码任意DNA序列

In [None]:
import pandas as pd
import numpy as np

def make_dataframe(data):
    new_dic = {}
    for i in range(len(data)):
        new_dic[i] = data[1]
    # data_pd = pd.concat([pd.Series(range(len(data))),pd.Series(data)],axis=1)
    return pd.Series(new_dic)


def load_data_bicoding(Path):
    data = np.loadtxt(Path,dtype=list)
    data_result = []
    for seq in data:
        seq = seq.upper()
        seq = str(seq.strip('\n'))
        data_result.append(seq)
    return data_result

path = '/lizutan/code/MetAc4C/ac4c_data/ac4c_data/ac4c_train_test'

pos_train = load_data_bicoding(path+'/ac4c_positive_train.fa')
pos_test = load_data_bicoding(path+'/ac4c_positive_test.fa')
neg_train = load_data_bicoding(path+'/ac4c_negative_train.fa')
neg_test = load_data_bicoding(path+'/ac4c_negative_test.fa')



#保存文件路径
site_path = '/lizutan/code/MetAc4C/ac4c_data/ac4c_data_bert/'
if not os.path.exists(site_path):
    os.makedirs(site_path)

#=============================================================>>
# site_dir = site_path + 'pos_train.csv'
# data_pd = make_dataframe(pos_train)

# site_dir = site_path + 'pos_test.csv'
# data_pd = make_dataframe(pos_test)

# site_dir = site_path + 'neg_train.csv'
# data_pd = make_dataframe(neg_train)

site_dir = site_path + 'neg_test.csv'
data_pd = make_dataframe(neg_test)

site_dataset = CustomeDataset(data_pd, DefaultArgs)
site_dataloader = DataLoader(site_dataset, shuffle=False, drop_last=False, num_workers=8, batch_size=16)
model = BertModel.from_pretrained(DefaultArgs.model_path, output_hidden_states=True)
model = model.to("cuda")


processed = 0
csv_pd = pd.DataFrame()
for idx, batch in enumerate(site_dataloader):
    index_list,input_array = batch
    # index_list  = range(len(input_array))
    input_array = input_array.to("cuda")
    hidden_states = model(input_array)[-1][-1]
    hidden_states = hidden_states.cpu().data.numpy()
    hidden_states = hidden_states.reshape(hidden_states.shape[0],-1) #可选，打平 #36096 #[3, 47, 768]
    csv_pd = pd.concat([csv_pd,pd.DataFrame(hidden_states)],axis=0)  #将所的表格拼在一起保存

    # for array_id, index_name in enumerate(index_list): #每一个条分别保存
    #     output_file = "{}/{}.npz".format(site_dir, index_name)
    #     result = dict(index = index_name,representations = hidden_states[array_id])
    #     np.savez(output_file, **result)
    
    processed += len(index_list)
    print("\r{} / {} done!".format(processed, len(site_dataset)), end='')

csv_pd.to_csv(site_dir , index=True,header=True, sep=',')

# print("\n")
