In [1]:
import torch

import torch.nn as nn
import pandas as pd
import numpy as np
import shutil
import sys   

from Bio import SeqIO
from transformers import BertTokenizer, BertForSequenceClassification, BertModel
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from sklearn.metrics import average_precision_score
from skmultilearn.model_selection import IterativeStratification

2023-08-14 14:23:10.350533: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
device = torch.device('cuda:2') if torch.cuda.is_available() else torch.device('cpu')

In [3]:
# hyperparameters
MAX_LEN = 256
BATCH_SIZE = 8
EPOCHS = 100
LEARNING_RATE = 1e-05
NUMS_LABELS = 6
OUTPUT_SIZE = 6
LABEL_LENGTH = 3

## Data preprocessing

In [4]:
def create_dataframe(string_list):
    # Create an empty DataFrame
    df = pd.DataFrame()

    # Iterate over each string in the list
    for string in string_list:
        # Create a dictionary to hold the letters of the string
        string_dict = {}

        # Iterate over each letter in the string
        for i, letter in enumerate(string):
            # Create column name (e.g., 'Letter 1', 'Letter 2', etc.)
            col_name = f'label{i+1}'

            # Add the letter to the dictionary
            string_dict[col_name] = letter

        # Append the dictionary as a row to the DataFrame
        df = df.append(string_dict, ignore_index=True)
    return df

In [5]:
def truncate_sequence(sequence, max_length):
    if len(sequence) <= max_length:
        return sequence
    else:
        return sequence[-max_length:]

def process_dataset(dataset, max_length=1800):
    processed_dataset = []
    for sequence in dataset:
        processed_sequence = truncate_sequence(sequence, max_length)
        processed_dataset.append(processed_sequence)
    return np.array(processed_dataset)

In [6]:
model_name = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(model_name)

In [7]:
def loss_fn(outputs, targets):
    return torch.nn.BCEWithLogitsLoss()(outputs, targets)

In [8]:
val_targets=[]
val_outputs=[]

In [9]:
class CustomDataset(torch.utils.data.Dataset):

    def __init__(self, df_x, df_y, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.df_x = df_x
        self.df_y = df_y
        self.max_len = max_len

    def __len__(self):
        return len(self.df_x)

    def __getitem__(self, index):
        df_x = str(self.df_x[index])
        df_x = " ".join(df_x.split())

        inputs = self.tokenizer.encode_plus(
            df_x,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            return_token_type_ids=True,
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        return {
            'features': inputs['input_ids'].flatten(),
            'attention_mask': inputs['attention_mask'].flatten(),
            'token_type_ids': inputs["token_type_ids"].flatten(),
            'labels': torch.FloatTensor(self.df_y[index])
        }

In [10]:
class BERTClass(torch.nn.Module):
    def __init__(self):
        super(BERTClass, self).__init__()
        self.bert_model = BertModel.from_pretrained('bert-base-uncased', return_dict=True)
        self.dropout = torch.nn.Dropout(0.3)
        self.linear = torch.nn.Linear(768, LABEL_LENGTH)
    
    def forward(self, input_ids, attn_mask, token_type_ids):
        if input_ids.shape[1] > 256:
            num_sub_sequences = (input_ids.shape[1] - 1) // 256 + 1
            outputs = []
            for i in range(num_sub_sequences):
                start_idx = i * 256
                end_idx = min((i + 1) * 256, input_ids.shape[1])
                sub_input_ids = input_ids[:, start_idx:end_idx]
                sub_attn_mask = attn_mask[:, start_idx:end_idx]
                sub_token_type_ids = token_type_ids[:, start_idx:end_idx]

                output = self.bert_model(
                    input_ids=sub_input_ids, 
                    attention_mask=sub_attn_mask, 
                    token_type_ids=sub_token_type_ids
                )
                output_dropout = self.dropout(output.pooler_output)
                outputs.append(output_dropout)
            print(outputs)
            output_final = torch.cat(outputs, dim=1)
            output = self.linear(output_final)
        
        else:
            output = self.bert_model(
                input_ids=input_ids, 
                attention_mask=attn_mask, 
                token_type_ids=token_type_ids
            )
            output_dropout = self.dropout(output.pooler_output)
            output = self.linear(output_dropout)

        return output

In [20]:
# Load the model for evaluation on another test set
model = BERTClass()
model.load_state_dict(torch.load("bert_model.pt"))
model.to(device)
model.eval()

# Load the new test set
filename_test = "mus_lncRNA_multi6_seq.fasta"
sequences_test = SeqIO.parse(filename_test, "fasta")
X_test_new, y_test_new = [], []
for record in sequences_test:
    output = ' '.join(record.seq)
    X_test_new.append(output)
    y_test_new.append(record.id[:LABEL_LENGTH])

df = create_dataframe(y_test_new)

col_name = [f'label{i+1}' for i in range(LABEL_LENGTH)]
for col_n in col_name:
    df[col_n] = df[col_n].astype(str).astype(int)

df.insert(loc=0, column='sequence', value=X_test_new)
pd.set_option('display.max_rows', df.shape[0]+1)

LABEL_COLUMNS = ['label1', 'label2', 'label3']
df[LABEL_COLUMNS].sum()

X_test_new, y_test_new = df['sequence'], df[LABEL_COLUMNS]
X_test_new, y_test_new = X_test_new.to_numpy(), y_test_new.to_numpy()

X_test_new = process_dataset(X_test_new, max_length=1206)

# Convert data to tensors and create DataLoader
test_dataset_new = CustomDataset(X_test_new, y_test_new, tokenizer, MAX_LEN)
test_loader_new = torch.utils.data.DataLoader(
    test_dataset_new,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0
)

# Evaluation on the new test set
test_loss = 0
test_correct = 0
test_targets = []
test_outputs = []

with torch.no_grad():
    for batch_idx, data in enumerate(test_loader_new):
        features = data['features'].to(device, dtype=torch.long)
        mask = data['attention_mask'].to(device, dtype=torch.long)
        token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
        labels = data['labels'].to(device, dtype=torch.float)

        outputs = model(features, mask, token_type_ids)
        loss = loss_fn(outputs, labels)
        test_loss = test_loss + ((1 / (batch_idx + 1)) * loss.item())

        # Calculate test accuracy
        predicted_labels = torch.round(torch.sigmoid(outputs))
        test_correct += torch.sum(predicted_labels == labels).item()

        test_targets.extend(labels.cpu().detach().numpy().tolist())
        test_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist())

# Calculate average loss and accuracy on the new test set
test_loss = test_loss / len(test_loader_new)
test_accuracy = test_correct / (len(test_loader_new.dataset) * labels.shape[1])

# Calculate ROC scores on the new test set
test_targets = np.array(test_targets)
test_outputs = np.array(test_outputs)
test_roc_macro = roc_auc_score(test_targets, test_outputs, average='macro')
test_roc_micro = roc_auc_score(test_targets, test_outputs, average='micro')
test_ap = average_precision_score(test_targets, test_outputs)

# Print evaluation metrics on the new test set
print("Evaluation on the new test set:")
print("Test Loss: {:.4f} Test Accuracy: {:.4f} ROC Macro: {:.4f} ROC Micro: {:.4f} ap: {:.4f}"
      .format(test_loss, test_accuracy, test_roc_macro, test_roc_micro, test_ap))

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.weight', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


75
Evaluation on the new test set:
Test Loss: 0.5595 Test Accuracy: 0.4933 ROC Macro: 0.5430 ROC Micro: 0.5091 ap: 0.4601
