# Language model :ProtBert  


In [1]:
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt
import torch 
from torch.utils.data import Dataset
import torch.nn as nn
from torch.nn import Linear, ReLU, CrossEntropyLoss, Sequential, Conv2d, MaxPool2d, Module, Softmax, BatchNorm2d, Dropout
from torch.optim import Adam

from scipy.stats import spearmanr
from scipy.stats import rankdata


from transformers import BertModel, BertTokenizer
from sklearn.metrics import mean_squared_error
from sklearn import metrics

from transformers import AutoTokenizer, AutoModel, AutoConfig
from helpers import *

Global Constants

In [2]:
MODEL = 'Rostlab/prot_bert'
SAVE_PATH = 'ProtConfig/'

MAX_LEN = 512 # protein sequence max length 
BATCH_SIZE = 6
VER= 1

1. Load the dataset  lean_train_data.csv (from kaggle)

In [3]:

#### put in function ####
path = os.getcwd()
for i in range(3) :

    path = os.path.dirname(path)

path += '/data/'

train = pd.read_csv(path+'train_v1.csv')
test = pd.read_csv(path+'test.csv')
submission = pd.read_csv(path+'sample_submission.csv')


In [None]:
# add spaces betwwen Amnino acids letter to tokenize
def add_spaces(x):
    return " ".join(list(x))


train.protein_sequence = train.protein_sequence.map(add_spaces)
test.protein_sequence = test.protein_sequence.map(add_spaces)



In [None]:
# reset index (to pass to dataset)
train = train.reset_index(drop=True)
test  = test.reset_index(drop=True)

In [None]:
# load pretrained Protbert tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL)
tokenizer.save_pretrained(SAVE_PATH)

In [None]:
def prepare_input(tokenizer, text):
    # tokenize text (add special tokens and pad/truncate to max length)
    inputs = tokenizer.encode_plus(
        text, 
        return_tensors=None, 
        add_special_tokens=True, 
        max_length=MAX_LEN,
        pad_to_max_length=True,
        truncation=True
    )
    for k, v in inputs.items():
        inputs[k] = torch.tensor(v, dtype=torch.long)
    return inputs


class MutationDataset(Dataset):
    def __init__(self,tokenizer, df):
        self.tokenizer = tokenizer
       
        self.inputs1 = df['protein_sequence'].values
    
        self.pH = df['pH'].values
        self.labels = df['tm'].values

    def __len__(self):
        return len(self.inputs1)

    def __getitem__(self, item):

        #tokenize input texts
        inputs1 = prepare_input(self.tokenizer, self.inputs1[item])
        label = torch.tensor(self.labels[item], dtype=torch.float)
        pH = torch.tensor(self.pH[item],dtype = torch.float)
        
        return inputs1, label , pH
    

In [None]:
# ====================================================
# Model
# ====================================================
class MeanPooling(nn.Module):
    def __init__(self):
        super(MeanPooling, self).__init__()
        
    def forward(self, last_hidden_state, attention_mask):
        # expandig the attention mask to match the shape of the hidden states
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()

        #averaging the embeddings
        sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded, 1)
        sum_mask = input_mask_expanded.sum(1)
        sum_mask = torch.clamp(sum_mask, min=1e-9)
        mean_embeddings = sum_embeddings / sum_mask

        return mean_embeddings
    

class ProtBertStab(nn.Module):
    def __init__(self, config_path=None, pretrained=False):
        super().__init__()
       
        # for model loading
        if config_path is None:
            self.config = AutoConfig.from_pretrained(MODEL, output_hidden_states=True)
        else:
            # for model inference
            self.config = torch.load(config_path)
        if pretrained:
            # load pretrained model
            self.model = AutoModel.from_pretrained(MODEL, config=self.config)
        else:
            # load model from config
            self.model = AutoModel.from_config(self.config)
      


        self.pool = MeanPooling() # for mean pooling

        # modify last layer 
        self.lin = nn.Linear(self.config.hidden_size,1)

      
         
            
            
        
        
    def feature(self, inputs,position):
        
        outputs = self.model(**inputs)
        last_hidden_states = outputs[0]
        
        feature = self.pool(last_hidden_states, position)
        return feature

    def forward(self, inputs1):
        #get embedding from model 
        output = self.lin(self.feature(inputs1,inputs1['attention_mask']))
      
       
        #concatenate all the features with the difference between each two features (we study the difference in melting point)
       

       
        

        
  
        return output.squeeze(-1)

In [None]:
# Add to helper

class RMSELoss(nn.Module):
    def __init__(self, reduction='mean', eps=1e-9):
        super().__init__()
        self.mse = nn.MSELoss(reduction='none')
        self.reduction = reduction
        self.eps = eps

    def forward(self, y_pred, y_true):
        loss = torch.sqrt(self.mse(y_pred, y_true) + self.eps)
        if self.reduction == 'none':
            loss = loss
        elif self.reduction == 'sum':
            loss = loss.sum()
        elif self.reduction == 'mean':
            loss = loss.mean()
        return loss

Training the model 

In [None]:

from sklearn.model_selection import KFold

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
def train_epoch(model, optimizer, criterion, train_loader, epoch):
    model.train()
    rho = 0 
    train_loss = 0 
   
    for batch_idx, (inputs1, pH) in enumerate(train_loader):
        # inputs to device
        for k, v in inputs1.items():
            inputs1[k] = v.to(device)     
      
      
        target = target.to(device)
        pH = pH.to(device)

        #batch_size = target.size(0)

        
        output = model(inputs1,pH)
        loss = criterion(output, target)


        train_loss += loss.item()
        loss.backward()
        
        optimizer.step()
        # calculate Spearman's rank correlation coefficient
        p, _ = spearmanr(target.cpu().detach().numpy(), output.squeeze().cpu().detach().numpy())
        rho += p
        if batch_idx % 10 == 0:
            print(f"Train Epoch: {epoch} [{batch_idx * len(inputs1)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f} \t spearman: {p:.6f}")

    
    train_loss /= len(train_loader)  

    rho = rho / len(train_loader)

    print(   f"Train Epoch: {epoch} " f" loss={train_loss:0.2e} " f" rho={rho:0.2f} " )
    return train_loss , rho


def test_epoch(model, criterion, test_loader):
    model = model.eval()
    test_loss = 0
    rho = 0
    with torch.no_grad():
        for batch_idx, (inputs1, target , pH) in enumerate(test_loader):
            # inputs to device
           
            position = position.to(device)
            target = target.to(device)
            pH = pH.to(device)
          
            # predict
            output = model(inputs1,pH)
            test_loss += criterion(output, target).item()
          
            p =  spearmanr(target.cpu().detach().numpy(), output.cpu().detach().numpy()).correlation
            rho += p

            if batch_idx % 10 == 0:
                print(f"Test Epoch: [{batch_idx * len(inputs1)}/{len(test_loader.dataset)} ({100. * batch_idx / len(test_loader):.0f}%)]\tLoss: {test_loss:.6f} \t spearman: {p:.6f}")
            

    test_loss /= len(test_loader)
    rho = rho / len(test_loader)
    print(   f"Test Epoch: " f" loss={test_loss:0.2e} " f" rho={rho:0.2f} " )

    return test_loss ,rho



In [None]:

k_folds = 5
learning_rate = 1e-4
num_epochs = 4
kfold = KFold(n_splits=k_folds, shuffle=True)
dataset = MutationDataset(train)
train_loss_history = []
test_loss_history = []
train_rho_history = []
test_rho_history = []
for fold, (train_ids, test_ids) in enumerate(kfold.split(dataset)):

    # Print
    print(f'FOLD {fold}')
    print('--------------------------------')

    # Sample elements randomly from a given list of ids, no replacement.
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)

    # Define data loaders for training and testing data in this fold
    train_dl = torch.utils.data.DataLoader(
                      dataset, 
                      batch_size=BATCH_SIZE, sampler=train_subsampler)
    val_dl = torch.utils.data.DataLoader(
                      dataset,
                      batch_size=BATCH_SIZE, sampler=test_subsampler)

    model = ProtBertStab(pretrained=True)
    optimizer = Adam(model.parameters(), lr=learning_rate)
    # defining the loss function
    criterion = RMSELoss()
 
    model = model.to(device)
    criterion = criterion.to(device)

    
    for epoch in range(1, num_epochs + 1):
        train_loss , rho_train = train_epoch( model, optimizer, criterion, train_dl, epoch)
       

        test_loss , rho_test = test_epoch(model, criterion, val_dl)
        

        train_loss_history.append(train_loss)
        train_rho_history.append(rho_train)
        test_loss_history.append(test_loss)
        
        test_rho_history.append(rho_test)

    break  # for debug purposes
    
    
torch.save(model, f'ProtConfig/fold-{fold}_{VER}.pt')
    
    
    
 


# Plotting 

In [None]:

plt.plot(train_loss_history, label='train loss')
plt.plot(test_loss_history, label='test loss')
plt.xlabel('Epoch')
plt.ylabel('MSE')
plt.title(' train and test MSE Loss')
plt.legend()


In [None]:
#plot the train and test rho
plt.plot(train_rho_history, label='train rho')
plt.plot(test_rho_history, label='test rho')
plt.xlabel('Epoch')
plt.ylabel('rho')
plt.title(' train and test rho')
plt.legend()